Skip to content

Commit 16ed2e2

Browse files
committed
more flow control
1 parent 2387722 commit 16ed2e2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+7252
-5704
lines changed

pkg/epp/flowcontroller/config.go

Lines changed: 135 additions & 149 deletions
Large diffs are not rendered by default.

pkg/epp/flowcontroller/config_test.go

Lines changed: 286 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,109 +1,317 @@
1-
/*
2-
Copyright 2025 The Kubernetes Authors.
3-
4-
Licensed under the Apache License, Version 2.0 (the "License");
5-
you may not use this file except in compliance with the License.
6-
You may obtain a copy of the License at
7-
8-
http://www.apache.org/licenses/LICENSE-2.0
9-
10-
Unless required by applicable law or agreed to in writing, software
11-
distributed under the License is distributed on an "AS IS" BASIS,
12-
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
See the License for the specific language governing permissions and
14-
limitations under the License.
15-
*/
16-
171
package flowcontroller
182

193
import (
204
"testing"
215
"time"
226

23-
"github.com/google/go-cmp/cmp"
24-
v1alpha2 "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
7+
"github.com/go-logr/logr"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
interd "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller/plugins/dispatch/interflow"
11+
intrad "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller/plugins/dispatch/intraflow"
12+
interp "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller/plugins/preemption/interflow"
13+
intrap "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller/plugins/preemption/intraflow"
14+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller/plugins/queue"
2515
)
2616

27-
func TestFlowControllerConfig_validateAndApplyDefaults(t *testing.T) {
17+
func TestFlowControllerConfig_ValidateAndApplyDefaults(t *testing.T) {
18+
logger := logr.Discard()
2819
tests := []struct {
2920
name string
30-
input *FlowControllerConfig
31-
expected *FlowControllerConfig
21+
input FlowControllerConfig
22+
expected FlowControllerConfig
23+
wantErr bool
3224
}{
3325
{
34-
name: "Empty config",
35-
input: &FlowControllerConfig{},
36-
expected: &FlowControllerConfig{
37-
MaxBytesPerCriticality: map[v1alpha2.Criticality]uint64{
38-
v1alpha2.Critical: DefaultCriticalCapacityBytes,
39-
v1alpha2.Standard: DefaultStandardCapacityBytes,
40-
v1alpha2.Sheddable: DefaultSheddableCapacityBytes,
41-
},
42-
MaxGlobalBytes: 0, // Default is 0
43-
DefaultQueueTTL: DefaultQueueTTL,
44-
ExpiryCleanupInterval: DefaultExpiryCleanupInterval,
45-
InterModelFairnessPolicy: PolicyFCFS,
46-
PreemptionStrategy: PolicyNone,
26+
name: "all values provided and valid",
27+
input: FlowControllerConfig{
28+
DefaultQueueTTL: 10 * time.Second,
29+
ExpiryCleanupInterval: 500 * time.Millisecond,
30+
MaxGlobalBytes: 1024,
31+
},
32+
expected: FlowControllerConfig{
33+
DefaultQueueTTL: 10 * time.Second,
34+
ExpiryCleanupInterval: 500 * time.Millisecond,
35+
MaxGlobalBytes: 1024,
36+
},
37+
wantErr: false,
38+
},
39+
{
40+
name: "DefaultQueueTTL zero, should default",
41+
input: FlowControllerConfig{
42+
DefaultQueueTTL: 0,
43+
ExpiryCleanupInterval: 500 * time.Millisecond,
44+
},
45+
expected: FlowControllerConfig{
46+
DefaultQueueTTL: DefaultFCQueueTTL,
47+
ExpiryCleanupInterval: 500 * time.Millisecond,
4748
},
49+
wantErr: false,
4850
},
4951
{
50-
name: "Partially filled config",
51-
input: &FlowControllerConfig{
52-
MaxBytesPerCriticality: map[v1alpha2.Criticality]uint64{
53-
v1alpha2.Critical: 100, // Custom critical
52+
name: "ExpiryCleanupInterval negative, should default",
53+
input: FlowControllerConfig{
54+
DefaultQueueTTL: 10 * time.Second,
55+
ExpiryCleanupInterval: -1 * time.Second,
56+
},
57+
expected: FlowControllerConfig{
58+
DefaultQueueTTL: 10 * time.Second,
59+
ExpiryCleanupInterval: DefaultFCExpiryCleanupInterval,
60+
},
61+
wantErr: false,
62+
},
63+
{
64+
name: "empty config, all should default",
65+
input: FlowControllerConfig{},
66+
expected: FlowControllerConfig{
67+
DefaultQueueTTL: DefaultFCQueueTTL,
68+
ExpiryCleanupInterval: DefaultFCExpiryCleanupInterval,
69+
MaxGlobalBytes: 0, // Default is 0
70+
},
71+
wantErr: false,
72+
},
73+
{
74+
name: "MaxGlobalBytes zero, remains zero",
75+
input: FlowControllerConfig{
76+
MaxGlobalBytes: 0,
77+
},
78+
expected: FlowControllerConfig{
79+
DefaultQueueTTL: DefaultFCQueueTTL,
80+
ExpiryCleanupInterval: DefaultFCExpiryCleanupInterval,
81+
MaxGlobalBytes: 0,
82+
},
83+
wantErr: false,
84+
},
85+
}
86+
87+
for _, tt := range tests {
88+
t.Run(tt.name, func(t *testing.T) {
89+
cfg := tt.input // Make a copy
90+
err := cfg.validateAndApplyDefaults(logger)
91+
if tt.wantErr {
92+
assert.Error(t, err)
93+
} else {
94+
assert.NoError(t, err)
95+
assert.Equal(t, tt.expected, cfg)
96+
}
97+
})
98+
}
99+
}
100+
101+
func TestFlowRegistryConfig_ValidateAndApplyDefaults(t *testing.T) {
102+
logger := logr.Discard()
103+
tests := []struct {
104+
name string
105+
input FlowRegistryConfig
106+
expected FlowRegistryConfig // For checking defaults on sub-configs
107+
wantErr bool
108+
errText string // Substring to check in error message
109+
}{
110+
{
111+
name: "empty PriorityBands, valid",
112+
input: FlowRegistryConfig{PriorityBands: []PriorityBandConfig{}},
113+
wantErr: false,
114+
},
115+
{
116+
name: "one valid PriorityBandConfig",
117+
input: FlowRegistryConfig{
118+
PriorityBands: []PriorityBandConfig{
119+
{Priority: 0, PriorityName: "Critical", MaxBytes: 500},
54120
},
55-
MaxGlobalBytes: 5000,
56-
DefaultQueueTTL: 5 * time.Second,
57-
InterModelFairnessPolicy: PolicyRoundRobin,
58-
},
59-
expected: &FlowControllerConfig{
60-
MaxBytesPerCriticality: map[v1alpha2.Criticality]uint64{
61-
v1alpha2.Critical: 100, // Keep custom
62-
v1alpha2.Standard: DefaultStandardCapacityBytes,
63-
v1alpha2.Sheddable: DefaultSheddableCapacityBytes,
121+
},
122+
expected: FlowRegistryConfig{ // Expected after sub-config defaults
123+
PriorityBands: []PriorityBandConfig{
124+
{
125+
Priority: 0,
126+
PriorityName: "Critical",
127+
InterFlowDispatchPolicy: interd.BestHeadPriorityScoreDispatchPolicyName,
128+
InterFlowPreemptionPolicy: interp.RoundRobinPreemptionPolicyName,
129+
IntraFlowDispatchPolicy: intrad.FCFSDispatchPolicyName,
130+
IntraFlowPreemptionPolicy: intrap.TailPreemptionPolicyName,
131+
QueueType: queue.ListQueueName,
132+
MaxBytes: 500,
133+
},
64134
},
65-
MaxGlobalBytes: 5000, // Keep custom
66-
DefaultQueueTTL: 5 * time.Second, // Keep custom
67-
ExpiryCleanupInterval: DefaultExpiryCleanupInterval,
68-
InterModelFairnessPolicy: PolicyRoundRobin, // Keep custom
69-
PreemptionStrategy: PolicyNone,
70135
},
136+
wantErr: false,
71137
},
72138
{
73-
name: "All values specified",
74-
input: &FlowControllerConfig{
75-
MaxBytesPerCriticality: map[v1alpha2.Criticality]uint64{
76-
v1alpha2.Critical: 10,
77-
v1alpha2.Standard: 20,
78-
v1alpha2.Sheddable: 30,
139+
name: "multiple valid PriorityBandConfigs",
140+
input: FlowRegistryConfig{
141+
PriorityBands: []PriorityBandConfig{
142+
{Priority: 0, PriorityName: "Critical", MaxBytes: 500},
143+
{Priority: 1, PriorityName: "Standard", MaxBytes: 0}, // MaxBytes will default
79144
},
80-
MaxGlobalBytes: 60,
81-
DefaultQueueTTL: 1 * time.Minute,
82-
ExpiryCleanupInterval: 10 * time.Second,
83-
InterModelFairnessPolicy: PolicyRoundRobin,
84-
PreemptionStrategy: PolicyEvictOldest,
85-
},
86-
expected: &FlowControllerConfig{ // Expect no changes
87-
MaxBytesPerCriticality: map[v1alpha2.Criticality]uint64{
88-
v1alpha2.Critical: 10,
89-
v1alpha2.Standard: 20,
90-
v1alpha2.Sheddable: 30,
145+
},
146+
expected: FlowRegistryConfig{
147+
PriorityBands: []PriorityBandConfig{
148+
{
149+
Priority: 0,
150+
PriorityName: "Critical",
151+
InterFlowDispatchPolicy: interd.BestHeadPriorityScoreDispatchPolicyName,
152+
InterFlowPreemptionPolicy: interp.RoundRobinPreemptionPolicyName,
153+
IntraFlowDispatchPolicy: intrad.FCFSDispatchPolicyName,
154+
IntraFlowPreemptionPolicy: intrap.TailPreemptionPolicyName,
155+
QueueType: queue.ListQueueName,
156+
MaxBytes: 500,
157+
},
158+
{
159+
Priority: 1,
160+
PriorityName: "Standard",
161+
InterFlowDispatchPolicy: interd.BestHeadPriorityScoreDispatchPolicyName,
162+
InterFlowPreemptionPolicy: interp.RoundRobinPreemptionPolicyName,
163+
IntraFlowDispatchPolicy: intrad.FCFSDispatchPolicyName,
164+
IntraFlowPreemptionPolicy: intrap.TailPreemptionPolicyName,
165+
QueueType: queue.ListQueueName,
166+
MaxBytes: DefaultPriorityBandMaxBytes, // Defaulted
167+
},
91168
},
92-
MaxGlobalBytes: 60,
93-
DefaultQueueTTL: 1 * time.Minute,
94-
ExpiryCleanupInterval: 10 * time.Second,
95-
InterModelFairnessPolicy: PolicyRoundRobin,
96-
PreemptionStrategy: PolicyEvictOldest,
97169
},
170+
wantErr: false,
171+
},
172+
{
173+
name: "one invalid PriorityBandConfig (missing PriorityName)",
174+
input: FlowRegistryConfig{
175+
PriorityBands: []PriorityBandConfig{
176+
{Priority: 0, PriorityName: ""}, // Invalid
177+
},
178+
},
179+
wantErr: true,
180+
errText: "PriorityName cannot be empty",
181+
},
182+
}
183+
184+
for _, tt := range tests {
185+
t.Run(tt.name, func(t *testing.T) {
186+
cfg := tt.input // Make a copy
187+
err := cfg.validateAndApplyDefaults(logger)
188+
if tt.wantErr {
189+
require.Error(t, err)
190+
if tt.errText != "" {
191+
assert.Contains(t, err.Error(), tt.errText)
192+
}
193+
} else {
194+
assert.NoError(t, err)
195+
assert.Equal(t, tt.expected, cfg)
196+
}
197+
})
198+
}
199+
}
200+
201+
func TestPriorityBandConfig_ValidateAndApplyDefaults(t *testing.T) {
202+
logger := logr.Discard()
203+
204+
tests := []struct {
205+
name string
206+
input PriorityBandConfig
207+
expected PriorityBandConfig
208+
wantErr bool
209+
errText string
210+
}{
211+
{
212+
name: "all values provided and valid",
213+
input: PriorityBandConfig{
214+
Priority: 0,
215+
PriorityName: "Critical",
216+
InterFlowDispatchPolicy: "CustomInterDispatch",
217+
InterFlowPreemptionPolicy: "CustomInterPreempt",
218+
IntraFlowDispatchPolicy: "CustomIntraDispatch",
219+
IntraFlowPreemptionPolicy: "CustomIntraPreempt",
220+
QueueType: "CustomQueue",
221+
MaxBytes: 1024,
222+
},
223+
expected: PriorityBandConfig{
224+
Priority: 0,
225+
PriorityName: "Critical",
226+
InterFlowDispatchPolicy: "CustomInterDispatch",
227+
InterFlowPreemptionPolicy: "CustomInterPreempt",
228+
IntraFlowDispatchPolicy: "CustomIntraDispatch",
229+
IntraFlowPreemptionPolicy: "CustomIntraPreempt",
230+
QueueType: "CustomQueue",
231+
MaxBytes: 1024,
232+
},
233+
wantErr: false,
234+
},
235+
{
236+
name: "empty policy/queue names, should default",
237+
input: PriorityBandConfig{
238+
Priority: 1,
239+
PriorityName: "Standard",
240+
MaxBytes: 512, // Valid MaxBytes
241+
},
242+
expected: PriorityBandConfig{
243+
Priority: 1,
244+
PriorityName: "Standard",
245+
InterFlowDispatchPolicy: interd.BestHeadPriorityScoreDispatchPolicyName,
246+
InterFlowPreemptionPolicy: interp.RoundRobinPreemptionPolicyName,
247+
IntraFlowDispatchPolicy: intrad.FCFSDispatchPolicyName,
248+
IntraFlowPreemptionPolicy: intrap.TailPreemptionPolicyName,
249+
QueueType: queue.ListQueueName,
250+
MaxBytes: 512,
251+
},
252+
wantErr: false,
253+
},
254+
{
255+
name: "PriorityName empty, should return error",
256+
input: PriorityBandConfig{
257+
Priority: 0,
258+
PriorityName: "",
259+
},
260+
wantErr: true,
261+
errText: "PriorityName cannot be empty",
262+
},
263+
{
264+
name: "MaxBytes zero, should default",
265+
input: PriorityBandConfig{
266+
Priority: 2,
267+
PriorityName: "Sheddable",
268+
MaxBytes: 0,
269+
},
270+
expected: PriorityBandConfig{
271+
Priority: 2,
272+
PriorityName: "Sheddable",
273+
InterFlowDispatchPolicy: interd.BestHeadPriorityScoreDispatchPolicyName,
274+
InterFlowPreemptionPolicy: interp.RoundRobinPreemptionPolicyName,
275+
IntraFlowDispatchPolicy: intrad.FCFSDispatchPolicyName,
276+
IntraFlowPreemptionPolicy: intrap.TailPreemptionPolicyName,
277+
QueueType: queue.ListQueueName,
278+
MaxBytes: DefaultPriorityBandMaxBytes,
279+
},
280+
wantErr: false,
281+
},
282+
{
283+
name: "MaxBytes negative (effectively zero for uint), should default",
284+
input: PriorityBandConfig{
285+
Priority: 2,
286+
PriorityName: "Sheddable",
287+
MaxBytes: uint64(^uint(0)), // MaxUint, but logic is <=0
288+
},
289+
expected: PriorityBandConfig{
290+
Priority: 2,
291+
PriorityName: "Sheddable",
292+
InterFlowDispatchPolicy: interd.BestHeadPriorityScoreDispatchPolicyName,
293+
InterFlowPreemptionPolicy: interp.RoundRobinPreemptionPolicyName,
294+
IntraFlowDispatchPolicy: intrad.FCFSDispatchPolicyName,
295+
IntraFlowPreemptionPolicy: intrap.TailPreemptionPolicyName,
296+
QueueType: queue.ListQueueName,
297+
MaxBytes: DefaultPriorityBandMaxBytes,
298+
},
299+
wantErr: false,
98300
},
99301
}
100302

101303
for _, tt := range tests {
102304
t.Run(tt.name, func(t *testing.T) {
103-
cfg := tt.input
104-
cfg.validateAndApplyDefaults()
105-
if diff := cmp.Diff(tt.expected, cfg); diff != "" {
106-
t.Errorf("validateAndApplyDefaults() mismatch (-want +got):\n%s", diff)
305+
cfg := tt.input // Make a copy
306+
err := cfg.validateAndApplyDefaults(logger)
307+
if tt.wantErr {
308+
require.Error(t, err)
309+
if tt.errText != "" {
310+
assert.Contains(t, err.Error(), tt.errText)
311+
}
312+
} else {
313+
assert.NoError(t, err)
314+
assert.Equal(t, tt.expected, cfg)
107315
}
108316
})
109317
}

0 commit comments

Comments
 (0)