Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,20 @@ limitations under the License.

// Package besthead provides a `framework.InterFlowDispatchPolicy` that selects the queue containing the single "best"
// item from across all queues in a priority band.
package besthead
package interflow

import (
"fmt"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
)

// BestHeadPolicyName is the name of the Best Head policy implementation.
const BestHeadPolicyName = "BestHead"

func init() {
dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(BestHeadPolicyName),
MustRegisterPolicy(RegisteredPolicyName(BestHeadPolicyName),
func() (framework.InterFlowDispatchPolicy, error) {
return newBestHead(), nil
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package besthead
package interflow

import (
"errors"
Expand All @@ -36,11 +36,6 @@ const (
commonScoreType = "enqueue_time_ns_asc"
)

var (
flow1Key = types.FlowKey{ID: flow1ID, Priority: 0}
flow2Key = types.FlowKey{ID: flow2ID, Priority: 0}
)

// enqueueTimeComparatorFunc is a test utility. Lower enqueue time is better.
func enqueueTimeComparatorFunc(a, b types.QueueItemAccessor) bool {
return a.EnqueueTime().Before(b.EnqueueTime())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
// Package dispatch provides the factory and registration mechanism for all `framework.InterFlowDispatchPolicy`
// implementations.
// It allows new policies to be added to the system and instantiated by name.
package dispatch
package interflow

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package dispatch_test
package interflow

import (
"testing"
Expand All @@ -24,9 +24,6 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
_ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead"
_ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/roundrobin"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
)

Expand All @@ -37,7 +34,7 @@ import (
func TestInterFlowDispatchPolicyConformance(t *testing.T) {
t.Parallel()

for policyName, constructor := range dispatch.RegisteredPolicies {
for policyName, constructor := range RegisteredPolicies {
t.Run(string(policyName), func(t *testing.T) {
t.Parallel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@ limitations under the License.

// Package roundrobin provides a `framework.InterFlowDispatchPolicy` that selects a queue from a priority band using a
// simple round-robin strategy.
package roundrobin
package interflow

import (
"slices"
"sync"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
)

// RoundRobinPolicyName is the name of the Round Robin policy implementation.
const RoundRobinPolicyName = "RoundRobin"

func init() {
dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(RoundRobinPolicyName),
MustRegisterPolicy(RegisteredPolicyName(RoundRobinPolicyName),
func() (framework.InterFlowDispatchPolicy, error) {
return newRoundRobin(), nil
})
Expand Down Expand Up @@ -104,7 +103,7 @@ func (r *iterator) selectNextQueue(band framework.PriorityBandAccessor) framewor
}

numFlows := len(keys)
for i := 0; i < numFlows; i++ {
for i := range numFlows {
currentIdx := (startIndex + i) % numFlows
currentKey := keys[currentIdx]
queue := band.Queue(currentKey.ID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package roundrobin
package interflow

import (
"fmt"
Expand Down
15 changes: 7 additions & 8 deletions pkg/epp/flowcontrol/registry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
Expand All @@ -41,7 +40,7 @@ const (
// defaultIntraFlowDispatchPolicy is the default policy for selecting items within a single flow's queue.
defaultIntraFlowDispatchPolicy intra.RegisteredPolicyName = fcfs.FCFSPolicyName
// defaultInterFlowDispatchPolicy is the default policy for selecting which flow's queue to service next.
defaultInterFlowDispatchPolicy inter.RegisteredPolicyName = besthead.BestHeadPolicyName
defaultInterFlowDispatchPolicy interflow.RegisteredPolicyName = interflow.BestHeadPolicyName
// defaultQueue is the default queue implementation for flows.
defaultQueue queue.RegisteredQueueName = queue.ListQueueName
// defaultInitialShardCount is the default number of parallel shards to create when the registry is initialized.
Expand Down Expand Up @@ -128,7 +127,7 @@ type PriorityBandConfig struct {
// InterFlowDispatchPolicy specifies the name of the policy used to select which flow's queue to service next from
// this band.
// Optional: Defaults to `defaultInterFlowDispatchPolicy` ("BestHead").
InterFlowDispatchPolicy inter.RegisteredPolicyName
InterFlowDispatchPolicy interflow.RegisteredPolicyName

// Queue specifies the default name of the `framework.SafeQueue` implementation for flow queues in this band.
// Optional: Defaults to `defaultQueue` ("ListQueue").
Expand Down Expand Up @@ -167,7 +166,7 @@ type ShardPriorityBandConfig struct {
// IntraFlowDispatchPolicy is the name of the policy for dispatch within a flow's queue.
IntraFlowDispatchPolicy intra.RegisteredPolicyName
// InterFlowDispatchPolicy is the name of the policy for dispatch between flow queues.
InterFlowDispatchPolicy inter.RegisteredPolicyName
InterFlowDispatchPolicy interflow.RegisteredPolicyName
// Queue is the name of the queue implementation to use.
Queue queue.RegisteredQueueName
// MaxBytes is this shard's partitioned portion of this band's global capacity limit.
Expand Down Expand Up @@ -209,7 +208,7 @@ func (c *Config) ValidateAndApplyDefaults() (*Config, error) {

// Ensure the DI factories are initialized for production use if `NewConfig` was called without options.
if cfg.interFlowDispatchPolicyFactory == nil {
cfg.interFlowDispatchPolicyFactory = inter.NewPolicyFromName
cfg.interFlowDispatchPolicyFactory = interflow.NewPolicyFromName
}
if cfg.intraFlowDispatchPolicyFactory == nil {
cfg.intraFlowDispatchPolicyFactory = intra.NewPolicyFromName
Expand Down Expand Up @@ -362,9 +361,9 @@ type configOption func(*Config)

// interFlowDispatchPolicyFactory defines the signature for a function that creates an
// `framework.InterFlowDispatchPolicy` instance from its registered name.
// It serves as an abstraction over the concrete `inter.NewPolicyFromName` factory, enabling dependency injection for
// It serves as an abstraction over the concrete `interflow.NewPolicyFromName` factory, enabling dependency injection for
// testing validation logic.
type interFlowDispatchPolicyFactory func(name inter.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error)
type interFlowDispatchPolicyFactory func(name interflow.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error)

// intraFlowDispatchPolicyFactory defines the signature for a function that creates an
// `framework.IntraFlowDispatchPolicy` instance from its registered name.
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/flowcontrol/registry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
Priority: 1,
PriorityName: "Critical",
IntraFlowDispatchPolicy: fcfs.FCFSPolicyName,
InterFlowDispatchPolicy: besthead.BestHeadPolicyName,
InterFlowDispatchPolicy: interflow.BestHeadPolicyName,
Queue: queue.ListQueueName,
MaxBytes: 500,
}},
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/flowcontrol/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestFlowRegistry_New(t *testing.T) {
t.Parallel()
config, err := newConfig(
Config{PriorityBands: []PriorityBandConfig{{Priority: highPriority, PriorityName: "A"}}},
withInterFlowDispatchPolicyFactory(func(inter.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) {
withInterFlowDispatchPolicyFactory(func(interflow.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) {
return nil, errors.New("injected factory failure")
}),
)
Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/flowcontrol/registry/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
Expand Down Expand Up @@ -73,7 +73,7 @@ func newShardTestHarness(t *testing.T) *shardTestHarness {
"test-shard-1",
shardConfig, logr.Discard(),
statsPropagator.propagate,
inter.NewPolicyFromName,
interflow.NewPolicyFromName,
)
require.NoError(t, err, "Test setup: newShard should not return an error with valid configuration")

Expand Down Expand Up @@ -149,7 +149,7 @@ func TestShard_New(t *testing.T) {
shardConfig, _ := newConfig(Config{PriorityBands: []PriorityBandConfig{
{Priority: highPriority, PriorityName: "High"},
}})
failingFactory := func(inter.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) {
failingFactory := func(interflow.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error) {
return nil, errors.New("policy not found")
}
_, err := newShard("test-shard-1", shardConfig.partition(0, 1), logr.Discard(), nil, failingFactory)
Expand Down