From a46aaa10e078091afed2cd18d7cf11f9e43be874 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Fri, 7 Nov 2025 23:50:46 +0000 Subject: [PATCH] refactor: Flatten plugins/intraflow structure Moves intra-flow policy implementations directly into the `plugins/intraflow` package, removing the unnecessary nested directories. This simplifies the import paths and project structure. This is a no-op refactoring. --- .../dispatch => intraflow}/README.md | 0 .../dispatch => intraflow}/factory.go | 2 +- .../dispatch/fcfs => intraflow}/fcfs.go | 5 ++-- .../dispatch/fcfs => intraflow}/fcfs_test.go | 2 +- .../dispatch => intraflow}/functional_test.go | 6 ++--- pkg/epp/flowcontrol/registry/config.go | 21 ++++++++-------- pkg/epp/flowcontrol/registry/config_test.go | 25 +++++++++---------- pkg/epp/flowcontrol/registry/registry_test.go | 4 +-- pkg/epp/flowcontrol/registry/shard_test.go | 4 +-- 9 files changed, 32 insertions(+), 37 deletions(-) rename pkg/epp/flowcontrol/framework/plugins/{policies/intraflow/dispatch => intraflow}/README.md (100%) rename pkg/epp/flowcontrol/framework/plugins/{policies/intraflow/dispatch => intraflow}/factory.go (99%) rename pkg/epp/flowcontrol/framework/plugins/{policies/intraflow/dispatch/fcfs => intraflow}/fcfs.go (96%) rename pkg/epp/flowcontrol/framework/plugins/{policies/intraflow/dispatch/fcfs => intraflow}/fcfs_test.go (99%) rename pkg/epp/flowcontrol/framework/plugins/{policies/intraflow/dispatch => intraflow}/functional_test.go (90%) diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md b/pkg/epp/flowcontrol/framework/plugins/intraflow/README.md similarity index 100% rename from pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/README.md rename to pkg/epp/flowcontrol/framework/plugins/intraflow/README.md diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/factory.go b/pkg/epp/flowcontrol/framework/plugins/intraflow/factory.go similarity index 99% rename from pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/factory.go rename to pkg/epp/flowcontrol/framework/plugins/intraflow/factory.go index b55740cbc..8f476d3d0 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/factory.go +++ b/pkg/epp/flowcontrol/framework/plugins/intraflow/factory.go @@ -17,7 +17,7 @@ limitations under the License. // Package dispatch provides the factory and registration mechanism for all `framework.IntraFlowDispatchPolicy` // implementations. // It allows new policies to be added to the system and instantiated by name. -package dispatch +package intraflow import ( "fmt" diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go b/pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs.go similarity index 96% rename from pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go rename to pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs.go index 7addb9d13..ff6b38f76 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs.go +++ b/pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs.go @@ -15,13 +15,12 @@ limitations under the License. */ // Package fcfs provides a First-Come, First-Served implementation of the `framework.IntraFlowDispatchPolicy`. -package fcfs +package intraflow import ( "errors" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) @@ -55,7 +54,7 @@ import ( const FCFSPolicyName = "FCFS" func init() { - dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(FCFSPolicyName), + MustRegisterPolicy(RegisteredPolicyName(FCFSPolicyName), func() (framework.IntraFlowDispatchPolicy, error) { return newFCFS(), nil }) diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go b/pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs_test.go similarity index 99% rename from pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go rename to pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs_test.go index cc6bceecf..6e684ec46 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs/fcfs_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/intraflow/fcfs_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package fcfs +package intraflow import ( "testing" diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/intraflow/functional_test.go similarity index 90% rename from pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go rename to pkg/epp/flowcontrol/framework/plugins/intraflow/functional_test.go index 01e8a1c8e..1043b0794 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/intraflow/functional_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package dispatch_test +package intraflow import ( "testing" @@ -24,8 +24,6 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" - _ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs" ) // TestIntraFlowDispatchPolicyConformance is the main conformance test suite for `framework.IntraFlowDispatchPolicy` @@ -35,7 +33,7 @@ import ( func TestIntraFlowDispatchPolicyConformance(t *testing.T) { t.Parallel() - for policyName, constructor := range dispatch.RegisteredPolicies { + for policyName, constructor := range RegisteredPolicies { t.Run(string(policyName), func(t *testing.T) { t.Parallel() diff --git a/pkg/epp/flowcontrol/registry/config.go b/pkg/epp/flowcontrol/registry/config.go index 21757827f..b65bbf59f 100644 --- a/pkg/epp/flowcontrol/registry/config.go +++ b/pkg/epp/flowcontrol/registry/config.go @@ -23,10 +23,9 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow" inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead" - intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" ) @@ -39,7 +38,7 @@ const ( // It is set to 1 GB. defaultPriorityBandMaxBytes uint64 = 1_000_000_000 // defaultIntraFlowDispatchPolicy is the default policy for selecting items within a single flow's queue. - defaultIntraFlowDispatchPolicy intra.RegisteredPolicyName = fcfs.FCFSPolicyName + defaultIntraFlowDispatchPolicy intraflow.RegisteredPolicyName = intraflow.FCFSPolicyName // defaultInterFlowDispatchPolicy is the default policy for selecting which flow's queue to service next. defaultInterFlowDispatchPolicy inter.RegisteredPolicyName = besthead.BestHeadPolicyName // defaultQueue is the default queue implementation for flows. @@ -102,7 +101,7 @@ type Config struct { // Factory functions used for plugin instantiation during configuration validation. // These enable dependency injection for unit testing the validation logic. interFlowDispatchPolicyFactory interFlowDispatchPolicyFactory - intraFlowDispatchPolicyFactory intraFlowDispatchPolicyFactory + intraFlowDispatchPolicyFactory intraflowFlowDispatchPolicyFactory queueFactory queueFactory } @@ -123,7 +122,7 @@ type PriorityBandConfig struct { // IntraFlowDispatchPolicy specifies the default name of the policy used to select a request from within a single // flow's queue in this band. // Optional: Defaults to `defaultIntraFlowDispatchPolicy` ("FCFS"). - IntraFlowDispatchPolicy intra.RegisteredPolicyName + IntraFlowDispatchPolicy intraflow.RegisteredPolicyName // InterFlowDispatchPolicy specifies the name of the policy used to select which flow's queue to service next from // this band. @@ -165,7 +164,7 @@ type ShardPriorityBandConfig struct { // PriorityName is a unique human-readable name for this priority band. PriorityName string // IntraFlowDispatchPolicy is the name of the policy for dispatch within a flow's queue. - IntraFlowDispatchPolicy intra.RegisteredPolicyName + IntraFlowDispatchPolicy intraflow.RegisteredPolicyName // InterFlowDispatchPolicy is the name of the policy for dispatch between flow queues. InterFlowDispatchPolicy inter.RegisteredPolicyName // Queue is the name of the queue implementation to use. @@ -212,7 +211,7 @@ func (c *Config) ValidateAndApplyDefaults() (*Config, error) { cfg.interFlowDispatchPolicyFactory = inter.NewPolicyFromName } if cfg.intraFlowDispatchPolicyFactory == nil { - cfg.intraFlowDispatchPolicyFactory = intra.NewPolicyFromName + cfg.intraFlowDispatchPolicyFactory = intraflow.NewPolicyFromName } if cfg.queueFactory == nil { cfg.queueFactory = queue.NewQueueFromName @@ -366,11 +365,11 @@ type configOption func(*Config) // testing validation logic. type interFlowDispatchPolicyFactory func(name inter.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) -// intraFlowDispatchPolicyFactory defines the signature for a function that creates an +// intraflowFlowDispatchPolicyFactory defines the signature for a function that creates an // `framework.IntraFlowDispatchPolicy` instance from its registered name. -// It serves as an abstraction over the concrete `intra.NewPolicyFromName` factory, enabling dependency injection for +// It serves as an abstraction over the concrete `intraflow.NewPolicyFromName` factory, enabling dependency injection for // testing validation logic. -type intraFlowDispatchPolicyFactory func(name intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) +type intraflowFlowDispatchPolicyFactory func(name intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) // queueFactory defines the signature for a function that creates a `framework.SafeQueue` instance from its registered // name and a given `framework.ItemComparator`. @@ -395,7 +394,7 @@ func withInterFlowDispatchPolicyFactory(factory interFlowDispatchPolicyFactory) // creating `framework.IntraFlowDispatchPolicy` instances. // This is used exclusively for testing validation logic. // test-only -func withIntraFlowDispatchPolicyFactory(factory intraFlowDispatchPolicyFactory) configOption { +func withIntraFlowDispatchPolicyFactory(factory intraflowFlowDispatchPolicyFactory) configOption { return func(c *Config) { c.intraFlowDispatchPolicyFactory = factory } diff --git a/pkg/epp/flowcontrol/registry/config_test.go b/pkg/epp/flowcontrol/registry/config_test.go index b95f013ac..bb2d68320 100644 --- a/pkg/epp/flowcontrol/registry/config_test.go +++ b/pkg/epp/flowcontrol/registry/config_test.go @@ -28,9 +28,8 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead" - intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" ) @@ -81,7 +80,7 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) { PriorityBands: []PriorityBandConfig{{ Priority: 1, PriorityName: "Critical", - IntraFlowDispatchPolicy: fcfs.FCFSPolicyName, + IntraFlowDispatchPolicy: intraflow.FCFSPolicyName, InterFlowDispatchPolicy: besthead.BestHeadPolicyName, Queue: queue.ListQueueName, MaxBytes: 500, @@ -103,12 +102,12 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) { PriorityBands: []PriorityBandConfig{{ Priority: 1, PriorityName: "High", - IntraFlowDispatchPolicy: intra.RegisteredPolicyName("policy-without-req"), + IntraFlowDispatchPolicy: intraflow.RegisteredPolicyName("policy-without-req"), }}, }, opts: []configOption{ withIntraFlowDispatchPolicyFactory( - func(_ intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { + func(_ intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { return &mocks.MockIntraFlowDispatchPolicy{ NameV: "policy-without-req", RequiredQueueCapabilitiesV: []framework.QueueCapability{}, @@ -123,13 +122,13 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) { PriorityBands: []PriorityBandConfig{{ Priority: 1, PriorityName: "High", - IntraFlowDispatchPolicy: intra.RegisteredPolicyName("policy-with-reqs"), + IntraFlowDispatchPolicy: intraflow.RegisteredPolicyName("policy-with-reqs"), Queue: queue.RegisteredQueueName("queue-with-reqs-capability"), }}, }, opts: []configOption{ withIntraFlowDispatchPolicyFactory( - func(_ intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { + func(_ intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { return &mocks.MockIntraFlowDispatchPolicy{ RequiredQueueCapabilitiesV: []framework.QueueCapability{"capability-A", "capability-B"}, }, nil @@ -193,12 +192,12 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) { PriorityBands: []PriorityBandConfig{{ Priority: 1, PriorityName: "High", - IntraFlowDispatchPolicy: intra.RegisteredPolicyName("policy-with-req"), + IntraFlowDispatchPolicy: intraflow.RegisteredPolicyName("policy-with-req"), Queue: queue.ListQueueName, }}, }, opts: []configOption{withIntraFlowDispatchPolicyFactory( - func(_ intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { + func(_ intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { return &mocks.MockIntraFlowDispatchPolicy{ RequiredQueueCapabilitiesV: []framework.QueueCapability{framework.QueueCapability("required-capability")}, }, nil @@ -213,13 +212,13 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) { Priority: 1, PriorityName: "High", Queue: queue.RegisteredQueueName("failing-queue"), - IntraFlowDispatchPolicy: intra.RegisteredPolicyName("policy-with-req"), + IntraFlowDispatchPolicy: intraflow.RegisteredPolicyName("policy-with-req"), }}, }, expectErr: true, opts: []configOption{ withIntraFlowDispatchPolicyFactory( // Forces queue instance creation for validating capabilities. - func(name intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { + func(name intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { return &mocks.MockIntraFlowDispatchPolicy{ NameV: string(name), RequiredQueueCapabilitiesV: []framework.QueueCapability{"required-capability"}, @@ -237,11 +236,11 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) { PriorityBands: []PriorityBandConfig{{ Priority: 1, PriorityName: "High", - IntraFlowDispatchPolicy: intra.RegisteredPolicyName("failing-policy"), + IntraFlowDispatchPolicy: intraflow.RegisteredPolicyName("failing-policy"), }}, }, opts: []configOption{withIntraFlowDispatchPolicyFactory( - func(_ intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { + func(_ intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { return nil, errors.New("policy creation failed") })}, expectErr: true, diff --git a/pkg/epp/flowcontrol/registry/registry_test.go b/pkg/epp/flowcontrol/registry/registry_test.go index b5bc322cb..38e1d9292 100644 --- a/pkg/epp/flowcontrol/registry/registry_test.go +++ b/pkg/epp/flowcontrol/registry/registry_test.go @@ -31,8 +31,8 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow" inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" - intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" ) @@ -190,7 +190,7 @@ func TestFlowRegistry_WithConnection_AndHandle(t *testing.T) { t.Run("ShouldFail_WhenJITFails", func(t *testing.T) { t.Parallel() h := newRegistryTestHarness(t, harnessOptions{}) - h.fr.config.intraFlowDispatchPolicyFactory = func(intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { + h.fr.config.intraFlowDispatchPolicyFactory = func(intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) { return nil, errors.New("injected factory failure") } key := types.FlowKey{ID: "test-flow", Priority: highPriority} diff --git a/pkg/epp/flowcontrol/registry/shard_test.go b/pkg/epp/flowcontrol/registry/shard_test.go index 23bf81325..ad98e5c0f 100644 --- a/pkg/epp/flowcontrol/registry/shard_test.go +++ b/pkg/epp/flowcontrol/registry/shard_test.go @@ -28,8 +28,8 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow" inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" - intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" @@ -96,7 +96,7 @@ func newShardTestHarness(t *testing.T) *shardTestHarness { func (h *shardTestHarness) synchronizeFlow(key types.FlowKey) { h.t.Helper() spec := types.FlowSpecification{Key: key} - policy, err := intra.NewPolicyFromName(defaultIntraFlowDispatchPolicy) + policy, err := intraflow.NewPolicyFromName(defaultIntraFlowDispatchPolicy) require.NoError(h.t, err, "Helper synchronizeFlow: failed to create real intra-flow policy for synchronization") q, err := queue.NewQueueFromName(defaultQueue, policy.Comparator()) require.NoError(h.t, err, "Helper synchronizeFlow: failed to create real queue for synchronization")