Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
// Package dispatch provides the factory and registration mechanism for all `framework.IntraFlowDispatchPolicy`
// implementations.
// It allows new policies to be added to the system and instantiated by name.
package dispatch
package intraflow

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ limitations under the License.
*/

// Package fcfs provides a First-Come, First-Served implementation of the `framework.IntraFlowDispatchPolicy`.
package fcfs
package intraflow

import (
"errors"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
)

Expand Down Expand Up @@ -55,7 +54,7 @@ import (
const FCFSPolicyName = "FCFS"

func init() {
dispatch.MustRegisterPolicy(dispatch.RegisteredPolicyName(FCFSPolicyName),
MustRegisterPolicy(RegisteredPolicyName(FCFSPolicyName),
func() (framework.IntraFlowDispatchPolicy, error) {
return newFCFS(), nil
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package fcfs
package intraflow

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package dispatch_test
package intraflow

import (
"testing"
Expand All @@ -24,8 +24,6 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
frameworkmocks "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
_ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
)

// TestIntraFlowDispatchPolicyConformance is the main conformance test suite for `framework.IntraFlowDispatchPolicy`
Expand All @@ -35,7 +33,7 @@ import (
func TestIntraFlowDispatchPolicyConformance(t *testing.T) {
t.Parallel()

for policyName, constructor := range dispatch.RegisteredPolicies {
for policyName, constructor := range RegisteredPolicies {
t.Run(string(policyName), func(t *testing.T) {
t.Parallel()

Expand Down
21 changes: 10 additions & 11 deletions pkg/epp/flowcontrol/registry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow"
inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
)

Expand All @@ -39,7 +38,7 @@ const (
// It is set to 1 GB.
defaultPriorityBandMaxBytes uint64 = 1_000_000_000
// defaultIntraFlowDispatchPolicy is the default policy for selecting items within a single flow's queue.
defaultIntraFlowDispatchPolicy intra.RegisteredPolicyName = fcfs.FCFSPolicyName
defaultIntraFlowDispatchPolicy intraflow.RegisteredPolicyName = intraflow.FCFSPolicyName
// defaultInterFlowDispatchPolicy is the default policy for selecting which flow's queue to service next.
defaultInterFlowDispatchPolicy inter.RegisteredPolicyName = besthead.BestHeadPolicyName
// defaultQueue is the default queue implementation for flows.
Expand Down Expand Up @@ -102,7 +101,7 @@ type Config struct {
// Factory functions used for plugin instantiation during configuration validation.
// These enable dependency injection for unit testing the validation logic.
interFlowDispatchPolicyFactory interFlowDispatchPolicyFactory
intraFlowDispatchPolicyFactory intraFlowDispatchPolicyFactory
intraFlowDispatchPolicyFactory intraflowFlowDispatchPolicyFactory
queueFactory queueFactory
}

Expand All @@ -123,7 +122,7 @@ type PriorityBandConfig struct {
// IntraFlowDispatchPolicy specifies the default name of the policy used to select a request from within a single
// flow's queue in this band.
// Optional: Defaults to `defaultIntraFlowDispatchPolicy` ("FCFS").
IntraFlowDispatchPolicy intra.RegisteredPolicyName
IntraFlowDispatchPolicy intraflow.RegisteredPolicyName

// InterFlowDispatchPolicy specifies the name of the policy used to select which flow's queue to service next from
// this band.
Expand Down Expand Up @@ -165,7 +164,7 @@ type ShardPriorityBandConfig struct {
// PriorityName is a unique human-readable name for this priority band.
PriorityName string
// IntraFlowDispatchPolicy is the name of the policy for dispatch within a flow's queue.
IntraFlowDispatchPolicy intra.RegisteredPolicyName
IntraFlowDispatchPolicy intraflow.RegisteredPolicyName
// InterFlowDispatchPolicy is the name of the policy for dispatch between flow queues.
InterFlowDispatchPolicy inter.RegisteredPolicyName
// Queue is the name of the queue implementation to use.
Expand Down Expand Up @@ -212,7 +211,7 @@ func (c *Config) ValidateAndApplyDefaults() (*Config, error) {
cfg.interFlowDispatchPolicyFactory = inter.NewPolicyFromName
}
if cfg.intraFlowDispatchPolicyFactory == nil {
cfg.intraFlowDispatchPolicyFactory = intra.NewPolicyFromName
cfg.intraFlowDispatchPolicyFactory = intraflow.NewPolicyFromName
}
if cfg.queueFactory == nil {
cfg.queueFactory = queue.NewQueueFromName
Expand Down Expand Up @@ -366,11 +365,11 @@ type configOption func(*Config)
// testing validation logic.
type interFlowDispatchPolicyFactory func(name inter.RegisteredPolicyName) (framework.InterFlowDispatchPolicy, error)

// intraFlowDispatchPolicyFactory defines the signature for a function that creates an
// intraflowFlowDispatchPolicyFactory defines the signature for a function that creates an
// `framework.IntraFlowDispatchPolicy` instance from its registered name.
// It serves as an abstraction over the concrete `intra.NewPolicyFromName` factory, enabling dependency injection for
// It serves as an abstraction over the concrete `intraflow.NewPolicyFromName` factory, enabling dependency injection for
// testing validation logic.
type intraFlowDispatchPolicyFactory func(name intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error)
type intraflowFlowDispatchPolicyFactory func(name intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error)

// queueFactory defines the signature for a function that creates a `framework.SafeQueue` instance from its registered
// name and a given `framework.ItemComparator`.
Expand All @@ -395,7 +394,7 @@ func withInterFlowDispatchPolicyFactory(factory interFlowDispatchPolicyFactory)
// creating `framework.IntraFlowDispatchPolicy` instances.
// This is used exclusively for testing validation logic.
// test-only
func withIntraFlowDispatchPolicyFactory(factory intraFlowDispatchPolicyFactory) configOption {
func withIntraFlowDispatchPolicyFactory(factory intraflowFlowDispatchPolicyFactory) configOption {
return func(c *Config) {
c.intraFlowDispatchPolicyFactory = factory
}
Expand Down
25 changes: 12 additions & 13 deletions pkg/epp/flowcontrol/registry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/mocks"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch/besthead"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch/fcfs"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
)

Expand Down Expand Up @@ -81,7 +80,7 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
PriorityBands: []PriorityBandConfig{{
Priority: 1,
PriorityName: "Critical",
IntraFlowDispatchPolicy: fcfs.FCFSPolicyName,
IntraFlowDispatchPolicy: intraflow.FCFSPolicyName,
InterFlowDispatchPolicy: besthead.BestHeadPolicyName,
Queue: queue.ListQueueName,
MaxBytes: 500,
Expand All @@ -103,12 +102,12 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
PriorityBands: []PriorityBandConfig{{
Priority: 1,
PriorityName: "High",
IntraFlowDispatchPolicy: intra.RegisteredPolicyName("policy-without-req"),
IntraFlowDispatchPolicy: intraflow.RegisteredPolicyName("policy-without-req"),
}},
},
opts: []configOption{
withIntraFlowDispatchPolicyFactory(
func(_ intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
func(_ intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
return &mocks.MockIntraFlowDispatchPolicy{
NameV: "policy-without-req",
RequiredQueueCapabilitiesV: []framework.QueueCapability{},
Expand All @@ -123,13 +122,13 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
PriorityBands: []PriorityBandConfig{{
Priority: 1,
PriorityName: "High",
IntraFlowDispatchPolicy: intra.RegisteredPolicyName("policy-with-reqs"),
IntraFlowDispatchPolicy: intraflow.RegisteredPolicyName("policy-with-reqs"),
Queue: queue.RegisteredQueueName("queue-with-reqs-capability"),
}},
},
opts: []configOption{
withIntraFlowDispatchPolicyFactory(
func(_ intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
func(_ intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
return &mocks.MockIntraFlowDispatchPolicy{
RequiredQueueCapabilitiesV: []framework.QueueCapability{"capability-A", "capability-B"},
}, nil
Expand Down Expand Up @@ -193,12 +192,12 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
PriorityBands: []PriorityBandConfig{{
Priority: 1,
PriorityName: "High",
IntraFlowDispatchPolicy: intra.RegisteredPolicyName("policy-with-req"),
IntraFlowDispatchPolicy: intraflow.RegisteredPolicyName("policy-with-req"),
Queue: queue.ListQueueName,
}},
},
opts: []configOption{withIntraFlowDispatchPolicyFactory(
func(_ intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
func(_ intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
return &mocks.MockIntraFlowDispatchPolicy{
RequiredQueueCapabilitiesV: []framework.QueueCapability{framework.QueueCapability("required-capability")},
}, nil
Expand All @@ -213,13 +212,13 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
Priority: 1,
PriorityName: "High",
Queue: queue.RegisteredQueueName("failing-queue"),
IntraFlowDispatchPolicy: intra.RegisteredPolicyName("policy-with-req"),
IntraFlowDispatchPolicy: intraflow.RegisteredPolicyName("policy-with-req"),
}},
},
expectErr: true,
opts: []configOption{
withIntraFlowDispatchPolicyFactory( // Forces queue instance creation for validating capabilities.
func(name intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
func(name intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
return &mocks.MockIntraFlowDispatchPolicy{
NameV: string(name),
RequiredQueueCapabilitiesV: []framework.QueueCapability{"required-capability"},
Expand All @@ -237,11 +236,11 @@ func TestConfig_ValidateAndApplyDefaults(t *testing.T) {
PriorityBands: []PriorityBandConfig{{
Priority: 1,
PriorityName: "High",
IntraFlowDispatchPolicy: intra.RegisteredPolicyName("failing-policy"),
IntraFlowDispatchPolicy: intraflow.RegisteredPolicyName("failing-policy"),
}},
},
opts: []configOption{withIntraFlowDispatchPolicyFactory(
func(_ intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
func(_ intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
return nil, errors.New("policy creation failed")
})},
expectErr: true,
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/flowcontrol/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow"
inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
)
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestFlowRegistry_WithConnection_AndHandle(t *testing.T) {
t.Run("ShouldFail_WhenJITFails", func(t *testing.T) {
t.Parallel()
h := newRegistryTestHarness(t, harnessOptions{})
h.fr.config.intraFlowDispatchPolicyFactory = func(intra.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
h.fr.config.intraFlowDispatchPolicyFactory = func(intraflow.RegisteredPolicyName) (framework.IntraFlowDispatchPolicy, error) {
return nil, errors.New("injected factory failure")
}
key := types.FlowKey{ID: "test-flow", Priority: highPriority}
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/flowcontrol/registry/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/intraflow"
inter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/interflow/dispatch"
intra "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/policies/intraflow/dispatch"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/queue"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types/mocks"
Expand Down Expand Up @@ -96,7 +96,7 @@ func newShardTestHarness(t *testing.T) *shardTestHarness {
func (h *shardTestHarness) synchronizeFlow(key types.FlowKey) {
h.t.Helper()
spec := types.FlowSpecification{Key: key}
policy, err := intra.NewPolicyFromName(defaultIntraFlowDispatchPolicy)
policy, err := intraflow.NewPolicyFromName(defaultIntraFlowDispatchPolicy)
require.NoError(h.t, err, "Helper synchronizeFlow: failed to create real intra-flow policy for synchronization")
q, err := queue.NewQueueFromName(defaultQueue, policy.Comparator())
require.NoError(h.t, err, "Helper synchronizeFlow: failed to create real queue for synchronization")
Expand Down