Skip to content

Commit e611ad2

Browse files
LukeAVanDrieBenjaminBraunDev
authored andcommitted
feat(fc): Initial wiring of the flow control layer (kubernetes-sigs#1701)
* feat(fc): Initial wiring of the flow control layer This commit introduces the initial integration of the new Flow Control layer into the Endpoint Picker (EPP). The primary goal is to provide a mechanism for more sophisticated request admission control, queuing, and multitenancy management. This new layer is gated by the ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER feature flag. When enabled, the Director delegates admission control decisions to the new flowController component. The legacy saturation-based shedding logic is preserved for when the feature is disabled. Comprehensive unit tests for the admitRequest function have been added to validate the behavior of both the legacy and new flow control paths. * refactor: Abstract away admission control logic This commit refactors the admission control logic introduced in the previous commit. - Introduces an AdmissionController interface. - Implements LegacyAdmissionController for existing behavior. - Implements FlowControlAdmissionController for the new flow control path. - Moves flow control request adaptation into FlowControlAdmissionController. - Updates Director to use the AdmissionController interface. - Runner now injects the appropriate controller based on the feature flag. - Splits and refactors tests for better focus and isolation. This improves modularity, testability, and separation of concerns. * feat: Address feedback on admission control - Introduce requtil.IsSheddable() for clarity - Move FairnessID defaulting to handler package - Add test for default FairnessID * fix: admission.go boilerplate header
1 parent 2366f0c commit e611ad2

File tree

10 files changed

+766
-100
lines changed

10 files changed

+766
-100
lines changed

cmd/epp/runner/runner.go

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ import (
5151
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
5252
dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics"
5353
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
54+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
55+
fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller"
56+
fcregistry "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry"
5457
latencypredictor "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/latencypredictorasync"
5558
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
5659
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
@@ -71,11 +74,25 @@ import (
7174
)
7275

7376
const (
74-
// enableExperimentalDatalayerV2 defines the environment variable
75-
// used as feature flag for the pluggable data layer.
77+
// enableExperimentalDatalayerV2 defines the environment variable used as feature flag for the pluggable data layer.
7678
enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2"
79+
// enableExperimentalFlowControlLayer defines the environment variable used as a feature flag for the pluggable flow
80+
// control layer.
81+
enableExperimentalFlowControlLayer = "ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER"
7782
)
7883

84+
// TODO: this is hardcoded for POC only. This needs to be hooked up to our text-based config story.
85+
var flowControlConfig = flowcontrol.Config{
86+
Controller: fccontroller.Config{}, // Use all defaults.
87+
Registry: fcregistry.Config{
88+
// Define domain of accepted priority levels as this field is required. Use defaults for all optional fields.
89+
// TODO: this should not be hardcoded.
90+
PriorityBands: []fcregistry.PriorityBandConfig{
91+
{Priority: 0, PriorityName: "Default"},
92+
},
93+
},
94+
}
95+
7996
var (
8097
grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy")
8198
grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes")
@@ -298,7 +315,43 @@ func (r *Runner) Run(ctx context.Context) error {
298315

299316
saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)
300317

301-
director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig)
318+
// --- Admission Control Initialization ---
319+
enableFlowControl := env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog)
320+
var admissionController requestcontrol.AdmissionController
321+
if enableFlowControl {
322+
setupLog.Info("Initializing experimental Flow Control layer")
323+
fcCfg, err := flowControlConfig.ValidateAndApplyDefaults()
324+
if err != nil {
325+
setupLog.Error(err, "failed to initialize Flow Control layer")
326+
return fmt.Errorf("invalid Flow Control config: %w", err)
327+
}
328+
329+
registry, err := fcregistry.NewFlowRegistry(fcCfg.Registry, setupLog)
330+
if err != nil {
331+
return fmt.Errorf("failed to initialize Flow Registry: %w", err)
332+
}
333+
fc, err := fccontroller.NewFlowController(
334+
ctx,
335+
fcCfg.Controller,
336+
registry,
337+
saturationDetector,
338+
setupLog,
339+
)
340+
if err != nil {
341+
return fmt.Errorf("failed to initialize Flow Controller: %w", err)
342+
}
343+
go registry.Run(ctx)
344+
admissionController = requestcontrol.NewFlowControlAdmissionController(saturationDetector, fc)
345+
} else {
346+
setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control")
347+
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
348+
}
349+
350+
director := requestcontrol.NewDirectorWithConfig(
351+
datastore,
352+
scheduler,
353+
admissionController,
354+
r.requestControlConfig)
302355

303356
// --- Setup ExtProc Server Runner ---
304357
serverRunner := &runserver.ExtProcServerRunner{

pkg/epp/handlers/request.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ import (
2929
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
3030
)
3131

32+
const (
33+
// defaultFairnessID is the default fairness ID used when no ID is provided in the request.
34+
// This ensures that requests without explicit fairness identifiers are still grouped and managed by the Flow Control
35+
// system.
36+
defaultFairnessID = "default-flow"
37+
)
38+
3239
func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error {
3340
reqCtx.RequestReceivedTimestamp = time.Now()
3441

@@ -80,6 +87,11 @@ func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extP
8087
delete(reqCtx.Request.Headers, header.Key)
8188
}
8289
}
90+
91+
if reqCtx.FairnessID == "" {
92+
reqCtx.FairnessID = defaultFairnessID
93+
}
94+
8395
return nil
8496
}
8597

pkg/epp/handlers/request_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2323
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
24+
"github.com/stretchr/testify/assert"
2425
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
2526
)
2627

@@ -66,3 +67,32 @@ func TestHandleRequestHeaders(t *testing.T) {
6667
t.Errorf("expected fairness ID header to be removed from request headers, but it was not")
6768
}
6869
}
70+
71+
func TestHandleRequestHeaders_DefaultFairnessID(t *testing.T) {
72+
t.Parallel()
73+
74+
server := &StreamingServer{}
75+
reqCtx := &RequestContext{
76+
Request: &Request{
77+
Headers: make(map[string]string),
78+
},
79+
}
80+
81+
req := &extProcPb.ProcessingRequest_RequestHeaders{
82+
RequestHeaders: &extProcPb.HttpHeaders{
83+
Headers: &configPb.HeaderMap{
84+
Headers: []*configPb.HeaderValue{
85+
{
86+
Key: "x-test-header",
87+
Value: "test-value",
88+
},
89+
},
90+
},
91+
EndOfStream: false,
92+
},
93+
}
94+
95+
err := server.HandleRequestHeaders(reqCtx, req)
96+
assert.NoError(t, err, "expected no error")
97+
assert.Equal(t, defaultFairnessID, reqCtx.FairnessID, "expected fairness ID to be defaulted")
98+
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package requestcontrol
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
"sigs.k8s.io/controller-runtime/pkg/log"
24+
25+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
28+
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
29+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
30+
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
31+
)
32+
33+
// AdmissionController defines the interface for making admission control decisions.
34+
// Implementations of this interface determine whether an incoming inference request should be accepted or rejected
35+
// based on various criteria such as system load, fairness, priority, and available capacity.
36+
type AdmissionController interface {
37+
// Admit determines if a request should be admitted.
38+
// It is called by the Director for each incoming request.
39+
//
40+
// Args:
41+
// ctx: The request context, carrying deadlines, cancellation signals, and logger.
42+
// reqCtx: The handlers.RequestContext containing details about the incoming request.
43+
// candidatePods: A list of potential backend pods that can serve the request.
44+
// priority: The priority level of the request, as determined by the InferenceObjective.
45+
//
46+
// Returns:
47+
// - nil: If the request is admitted and should proceed to scheduling.
48+
// - errutil.Error: If the request is rejected.
49+
Admit(
50+
ctx context.Context,
51+
reqCtx *handlers.RequestContext,
52+
candidatePods []backendmetrics.PodMetrics,
53+
priority int,
54+
) error
55+
}
56+
57+
// saturationDetector defines the minimal interface required for checking if the backend pool is saturated.
58+
type saturationDetector interface {
59+
IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool
60+
}
61+
62+
// flowController defines the minimal interface required by FlowControlAdmissionController for enqueuing requests and
63+
// waiting for an admission outcome.
64+
type flowController interface {
65+
EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error)
66+
}
67+
68+
// rejectIfSheddableAndSaturated checks if a request should be immediately rejected because it's sheddable
69+
// (priority < 0) and the system is saturated.
70+
func rejectIfSheddableAndSaturated(
71+
ctx context.Context,
72+
sd saturationDetector,
73+
reqCtx *handlers.RequestContext,
74+
candidatePods []backendmetrics.PodMetrics,
75+
priority int,
76+
) error {
77+
if requtil.IsSheddable(priority) {
78+
logger := log.FromContext(ctx)
79+
if sd.IsSaturated(ctx, candidatePods) {
80+
logger.V(logutil.TRACE).Info("Request rejected: system saturated and request is sheddable",
81+
"requestID", reqCtx.SchedulingRequest.RequestId)
82+
return errutil.Error{
83+
Code: errutil.InferencePoolResourceExhausted,
84+
Msg: "system saturated, sheddable request dropped",
85+
}
86+
}
87+
}
88+
return nil
89+
}
90+
91+
// --- LegacyAdmissionController ---
92+
93+
// LegacyAdmissionController implements saturation-based admission control.
94+
// It rejects sheddable requests (priority < 0) if the saturationDetector indicates that the system is currently
95+
// saturated. Non-sheddable requests always bypass the saturation check.
96+
type LegacyAdmissionController struct {
97+
saturationDetector saturationDetector
98+
}
99+
100+
// NewLegacyAdmissionController creates a new LegacyAdmissionController.
101+
func NewLegacyAdmissionController(sd saturationDetector) *LegacyAdmissionController {
102+
return &LegacyAdmissionController{saturationDetector: sd}
103+
}
104+
105+
// Admit implements the AdmissionController interface for the legacy strategy.
106+
// It checks for saturation only for requests with priority < 0.
107+
func (lac *LegacyAdmissionController) Admit(
108+
ctx context.Context,
109+
reqCtx *handlers.RequestContext,
110+
candidatePods []backendmetrics.PodMetrics,
111+
priority int,
112+
) error {
113+
logger := log.FromContext(ctx)
114+
logger.V(logutil.TRACE).Info("Executing LegacyAdmissionController",
115+
"priority", priority, "fairnessID", reqCtx.FairnessID)
116+
if err := rejectIfSheddableAndSaturated(ctx, lac.saturationDetector, reqCtx, candidatePods, priority); err != nil {
117+
return err
118+
}
119+
logger.V(logutil.TRACE).Info("Request admitted", "requestID", reqCtx.SchedulingRequest.RequestId)
120+
return nil
121+
}
122+
123+
// --- FlowControlAdmissionController ---
124+
125+
// FlowControlAdmissionController delegates admission decisions to the Flow Control layer.
126+
// It first checks if the request is sheddable and the system is saturated, rejecting immediately if both conditions are
127+
// true. Otherwise, it uses the provided flowController to enqueue the request and await an outcome.
128+
type FlowControlAdmissionController struct {
129+
saturationDetector saturationDetector
130+
flowController flowController
131+
}
132+
133+
// NewFlowControlAdmissionController creates a new FlowControlAdmissionController.
134+
// It requires a SaturationDetector and a flowController instance.
135+
func NewFlowControlAdmissionController(sd saturationDetector, fc flowController) *FlowControlAdmissionController {
136+
return &FlowControlAdmissionController{
137+
saturationDetector: sd,
138+
flowController: fc,
139+
}
140+
}
141+
142+
// Admit implements the AdmissionController interface by checking for saturation on sheddable requests first, then
143+
// deferring to the Flow Control system.
144+
func (fcac *FlowControlAdmissionController) Admit(
145+
ctx context.Context,
146+
reqCtx *handlers.RequestContext,
147+
candidatePods []backendmetrics.PodMetrics,
148+
priority int,
149+
) error {
150+
logger := log.FromContext(ctx)
151+
logger.V(logutil.TRACE).Info("Executing FlowControlAdmissionController",
152+
"requestID", reqCtx.SchedulingRequest.RequestId, "priority", priority, "fairnessID", reqCtx.FairnessID)
153+
if err := rejectIfSheddableAndSaturated(ctx, fcac.saturationDetector, reqCtx, candidatePods, priority); err != nil {
154+
return err
155+
}
156+
157+
logger.V(logutil.TRACE).Info("Request proceeding to flow control", "requestID", reqCtx.SchedulingRequest.RequestId)
158+
159+
fcReq := &flowControlRequest{
160+
ctx: ctx,
161+
requestID: reqCtx.SchedulingRequest.RequestId,
162+
fairnessID: reqCtx.FairnessID,
163+
priority: priority,
164+
requestByteSize: uint64(reqCtx.RequestSize),
165+
candidatePods: candidatePods,
166+
}
167+
168+
outcome, err := fcac.flowController.EnqueueAndWait(fcReq)
169+
logger.V(logutil.DEBUG).Info("Flow control outcome",
170+
"requestID", reqCtx.SchedulingRequest.RequestId, "outcome", outcome, "error", err)
171+
return translateFlowControlOutcome(outcome, err)
172+
}
173+
174+
// flowControlRequest is an adapter that implements the types.FlowControlRequest interface.
175+
type flowControlRequest struct {
176+
ctx context.Context
177+
requestID string
178+
fairnessID string
179+
priority int
180+
requestByteSize uint64
181+
candidatePods []backendmetrics.PodMetrics
182+
}
183+
184+
var _ types.FlowControlRequest = &flowControlRequest{}
185+
186+
func (r *flowControlRequest) Context() context.Context { return r.ctx }
187+
func (r *flowControlRequest) ID() string { return r.requestID }
188+
func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default.
189+
func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize }
190+
func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics {
191+
return r.candidatePods
192+
}
193+
func (r *flowControlRequest) FlowKey() types.FlowKey {
194+
return types.FlowKey{ID: r.fairnessID, Priority: r.priority}
195+
}
196+
197+
// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error
198+
// contract used by the Director.
199+
func translateFlowControlOutcome(outcome types.QueueOutcome, err error) error {
200+
msg := "request rejected by flow control"
201+
if err != nil {
202+
msg = err.Error()
203+
}
204+
205+
switch outcome {
206+
case types.QueueOutcomeDispatched:
207+
return nil
208+
case types.QueueOutcomeRejectedCapacity:
209+
return errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: msg}
210+
case types.QueueOutcomeEvictedTTL:
211+
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "request timed out in queue: " + msg}
212+
case types.QueueOutcomeEvictedContextCancelled:
213+
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "client disconnected: " + msg}
214+
case types.QueueOutcomeRejectedOther, types.QueueOutcomeEvictedOther:
215+
return errutil.Error{Code: errutil.Internal, Msg: "internal flow control error: " + msg}
216+
default:
217+
return errutil.Error{Code: errutil.Internal, Msg: "unhandled flow control outcome: " + msg}
218+
}
219+
}

0 commit comments

Comments
 (0)