From 846cbcfa12c60dd15dbca299da60b08eb2ccac68 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Sat, 8 Nov 2025 00:04:47 +0000 Subject: [PATCH] refactor: Flatten plugins/interflow structure Moves inter-flow policy implementations directly into the `plugins/interflow` package, removing the unnecessary nested directories. This simplifies the import paths and project structure. This is a no-op refactoring. --- .../interflow/dispatch => interflow}/README.md | 0 .../dispatch/besthead => interflow}/besthead.go | 5 ++--- .../besthead => interflow}/besthead_test.go | 7 +------ .../interflow/dispatch => interflow}/factory.go | 2 +- .../dispatch => interflow}/functional_test.go | 7 ++----- .../roundrobin => interflow}/roundrobin.go | 7 +++---- .../roundrobin => interflow}/roundrobin_test.go | 2 +- pkg/epp/flowcontrol/registry/config.go | 15 +++++++-------- pkg/epp/flowcontrol/registry/config_test.go | 4 ++-- pkg/epp/flowcontrol/registry/registry_test.go | 4 ++-- pkg/epp/flowcontrol/registry/shard_test.go | 6 +++--- 11 files changed, 24 insertions(+), 35 deletions(-) rename pkg/epp/flowcontrol/framework/plugins/{policies/interflow/dispatch => interflow}/README.md (100%) rename pkg/epp/flowcontrol/framework/plugins/{policies/interflow/dispatch/besthead => interflow}/besthead.go (93%) rename pkg/epp/flowcontrol/framework/plugins/{policies/interflow/dispatch/besthead => interflow}/besthead_test.go (98%) rename pkg/epp/flowcontrol/framework/plugins/{policies/interflow/dispatch => interflow}/factory.go (99%) rename pkg/epp/flowcontrol/framework/plugins/{policies/interflow/dispatch => interflow}/functional_test.go (90%) rename pkg/epp/flowcontrol/framework/plugins/{policies/interflow/dispatch/roundrobin => interflow}/roundrobin.go (94%) rename pkg/epp/flowcontrol/framework/plugins/{policies/interflow/dispatch/roundrobin => interflow}/roundrobin_test.go (99%) diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/README.md b/pkg/epp/flowcontrol/framework/plugins/interflow/README.md similarity index 100% rename from pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/README.md rename to pkg/epp/flowcontrol/framework/plugins/interflow/README.md diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go b/pkg/epp/flowcontrol/framework/plugins/interflow/besthead.go similarity index 93% rename from pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go rename to pkg/epp/flowcontrol/framework/plugins/interflow/besthead.go index 41033e755..ceab4adec 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead.go +++ b/pkg/epp/flowcontrol/framework/plugins/interflow/besthead.go @@ -16,13 +16,12 @@ limitations under the License. // Package besthead provides a `framework.InterFlowDispatchPolicy` that selects the queue containing the single "best" // item from across all queues in a priority band. -package besthead +package interflow import ( "fmt" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) @@ -30,7 +29,7 @@ import ( const BestHeadPolicyName = "BestHead" func init() { - dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(BestHeadPolicyName), + MustRegisterPolicy(RegisteredPolicyName(BestHeadPolicyName), func() (framework.InterFlowDispatchPolicy, error) { return newBestHead(), nil }) diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go b/pkg/epp/flowcontrol/framework/plugins/interflow/besthead_test.go similarity index 98% rename from pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go rename to pkg/epp/flowcontrol/framework/plugins/interflow/besthead_test.go index 5971d3f0c..7750bfbf6 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead/besthead_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/interflow/besthead_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package besthead +package interflow import ( "errors" @@ -36,11 +36,6 @@ const ( commonScoreType = "enqueue_time_ns_asc" ) -var ( - flow1Key = types.FlowKey{ID: flow1ID, Priority: 0} - flow2Key = types.FlowKey{ID: flow2ID, Priority: 0} -) - // enqueueTimeComparatorFunc is a test utility. Lower enqueue time is better. func enqueueTimeComparatorFunc(a, b types.QueueItemAccessor) bool { return a.EnqueueTime().Before(b.EnqueueTime()) diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/factory.go b/pkg/epp/flowcontrol/framework/plugins/interflow/factory.go similarity index 99% rename from pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/factory.go rename to pkg/epp/flowcontrol/framework/plugins/interflow/factory.go index 4cebd6c75..0f5bf159a 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/factory.go +++ b/pkg/epp/flowcontrol/framework/plugins/interflow/factory.go @@ -17,7 +17,7 @@ limitations under the License. // Package dispatch provides the factory and registration mechanism for all `framework.InterFlowDispatchPolicy` // implementations. // It allows new policies to be added to the system and instantiated by name. -package dispatch +package interflow import ( "fmt" diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go b/pkg/epp/flowcontrol/framework/plugins/interflow/functional_test.go similarity index 90% rename from pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go rename to pkg/epp/flowcontrol/framework/plugins/interflow/functional_test.go index 7649b324e..2dab10932 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/functional_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/interflow/functional_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package dispatch_test +package interflow import ( "testing" @@ -24,9 +24,6 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" - _ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead" - _ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) @@ -37,7 +34,7 @@ import ( func TestInterFlowDispatchPolicyConformance(t *testing.T) { t.Parallel() - for policyName, constructor := range dispatch.RegisteredPolicies { + for policyName, constructor := range RegisteredPolicies { t.Run(string(policyName), func(t *testing.T) { t.Parallel() diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin.go b/pkg/epp/flowcontrol/framework/plugins/interflow/roundrobin.go similarity index 94% rename from pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin.go rename to pkg/epp/flowcontrol/framework/plugins/interflow/roundrobin.go index c71b801dc..b4d6d541c 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin.go +++ b/pkg/epp/flowcontrol/framework/plugins/interflow/roundrobin.go @@ -16,14 +16,13 @@ limitations under the License. // Package roundrobin provides a `framework.InterFlowDispatchPolicy` that selects a queue from a priority band using a // simple round-robin strategy. -package roundrobin +package interflow import ( "slices" "sync" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" ) @@ -31,7 +30,7 @@ import ( const RoundRobinPolicyName = "RoundRobin" func init() { - dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(RoundRobinPolicyName), + MustRegisterPolicy(RegisteredPolicyName(RoundRobinPolicyName), func() (framework.InterFlowDispatchPolicy, error) { return newRoundRobin(), nil }) @@ -104,7 +103,7 @@ func (r *iterator) selectNextQueue(band framework.PriorityBandAccessor) framewor } numFlows := len(keys) - for i := 0; i < numFlows; i++ { + for i := range numFlows { currentIdx := (startIndex + i) % numFlows currentKey := keys[currentIdx] queue := band.Queue(currentKey.ID) diff --git a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin_test.go b/pkg/epp/flowcontrol/framework/plugins/interflow/roundrobin_test.go similarity index 99% rename from pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin_test.go rename to pkg/epp/flowcontrol/framework/plugins/interflow/roundrobin_test.go index dd4a02602..9f6c9e30a 100644 --- a/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin/roundrobin_test.go +++ b/pkg/epp/flowcontrol/framework/plugins/interflow/roundrobin_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package roundrobin +package interflow import ( "fmt" diff --git a/pkg/epp/flowcontrol/registry/config.go b/pkg/epp/flowcontrol/registry/config.go index 21757827f..6ded14db6 100644 --- a/pkg/epp/flowcontrol/registry/config.go +++ b/pkg/epp/flowcontrol/registry/config.go @@ -23,8 +23,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" - inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow" intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" @@ -41,7 +40,7 @@ const ( // defaultIntraFlowDispatchPolicy is the default policy for selecting items within a single flow's queue. defaultIntraFlowDispatchPolicy intra.RegisteredPolicyName = fcfs.FCFSPolicyName // defaultInterFlowDispatchPolicy is the default policy for selecting which flow's queue to service next. - defaultInterFlowDispatchPolicy inter.RegisteredPolicyName = besthead.BestHeadPolicyName + defaultInterFlowDispatchPolicy interflow.RegisteredPolicyName = interflow.BestHeadPolicyName // defaultQueue is the default queue implementation for flows. defaultQueue queue.RegisteredQueueName = queue.ListQueueName // defaultInitialShardCount is the default number of parallel shards to create when the registry is initialized. @@ -128,7 +127,7 @@ type PriorityBandConfig struct { // InterFlowDispatchPolicy specifies the name of the policy used to select which flow's queue to service next from // this band. // Optional: Defaults to `defaultInterFlowDispatchPolicy` ("BestHead"). - InterFlowDispatchPolicy inter.RegisteredPolicyName + InterFlowDispatchPolicy interflow.RegisteredPolicyName // Queue specifies the default name of the `framework.SafeQueue` implementation for flow queues in this band. // Optional: Defaults to `defaultQueue` ("ListQueue"). @@ -167,7 +166,7 @@ type ShardPriorityBandConfig struct { // IntraFlowDispatchPolicy is the name of the policy for dispatch within a flow's queue. IntraFlowDispatchPolicy intra.RegisteredPolicyName // InterFlowDispatchPolicy is the name of the policy for dispatch between flow queues. - InterFlowDispatchPolicy inter.RegisteredPolicyName + InterFlowDispatchPolicy interflow.RegisteredPolicyName // Queue is the name of the queue implementation to use. Queue queue.RegisteredQueueName // MaxBytes is this shard's partitioned portion of this band's global capacity limit. @@ -209,7 +208,7 @@ func (c *Config) ValidateAndApplyDefaults() (*Config, error) { // Ensure the DI factories are initialized for production use if `NewConfig` was called without options. if cfg.interFlowDispatchPolicyFactory == nil { - cfg.interFlowDispatchPolicyFactory = inter.NewPolicyFromName + cfg.interFlowDispatchPolicyFactory = interflow.NewPolicyFromName } if cfg.intraFlowDispatchPolicyFactory == nil { cfg.intraFlowDispatchPolicyFactory = intra.NewPolicyFromName @@ -362,9 +361,9 @@ type configOption func(*Config) // interFlowDispatchPolicyFactory defines the signature for a function that creates an // `framework.InterFlowDispatchPolicy` instance from its registered name. -// It serves as an abstraction over the concrete `inter.NewPolicyFromName` factory, enabling dependency injection for +// It serves as an abstraction over the concrete `interflow.NewPolicyFromName` factory, enabling dependency injection for // testing validation logic. -type interFlowDispatchPolicyFactory func(name inter.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) +type interFlowDispatchPolicyFactory func(name interflow.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) // intraFlowDispatchPolicyFactory defines the signature for a function that creates an // `framework.IntraFlowDispatchPolicy` instance from its registered name. diff --git a/pkg/epp/flowcontrol/registry/config_test.go b/pkg/epp/flowcontrol/registry/config_test.go index b95f013ac..05d8e5701 100644 --- a/pkg/epp/flowcontrol/registry/config_test.go +++ b/pkg/epp/flowcontrol/registry/config_test.go @@ -28,7 +28,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow" intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" @@ -82,7 +82,7 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) { Priority: 1, PriorityName: "Critical", IntraFlowDispatchPolicy: fcfs.FCFSPolicyName, - InterFlowDispatchPolicy: besthead.BestHeadPolicyName, + InterFlowDispatchPolicy: interflow.BestHeadPolicyName, Queue: queue.ListQueueName, MaxBytes: 500, }}, diff --git a/pkg/epp/flowcontrol/registry/registry_test.go b/pkg/epp/flowcontrol/registry/registry_test.go index b5bc322cb..494bce0dc 100644 --- a/pkg/epp/flowcontrol/registry/registry_test.go +++ b/pkg/epp/flowcontrol/registry/registry_test.go @@ -31,7 +31,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" - inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow" intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks" @@ -139,7 +139,7 @@ func TestFlowRegistry_New(t *testing.T) { t.Parallel() config, err := newConfig( Config{PriorityBands: []PriorityBandConfig{{Priority: highPriority, PriorityName: "A"}}}, - withInterFlowDispatchPolicyFactory(func(inter.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) { + withInterFlowDispatchPolicyFactory(func(interflow.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) { return nil, errors.New("injected factory failure") }), ) diff --git a/pkg/epp/flowcontrol/registry/shard_test.go b/pkg/epp/flowcontrol/registry/shard_test.go index 23bf81325..c58b71c66 100644 --- a/pkg/epp/flowcontrol/registry/shard_test.go +++ b/pkg/epp/flowcontrol/registry/shard_test.go @@ -28,7 +28,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework" - inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow" intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" @@ -73,7 +73,7 @@ func newShardTestHarness(t *testing.T) *shardTestHarness { "test-shard-1", shardConfig, logr.Discard(), statsPropagator.propagate, - inter.NewPolicyFromName, + interflow.NewPolicyFromName, ) require.NoError(t, err, "Test setup: newShard should not return an error with valid configuration") @@ -149,7 +149,7 @@ func TestShard_New(t *testing.T) { shardConfig, _ := newConfig(Config{PriorityBands: []PriorityBandConfig{ {Priority: highPriority, PriorityName: "High"}, }}) - failingFactory := func(inter.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) { + failingFactory := func(interflow.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) { return nil, errors.New("policy not found") } _, err := newShard("test-shard-1", shardConfig.partition(0, 1), logr.Discard(), nil, failingFactory)