Skip to content

Commit 6114ca5

Browse files
committed
refactor: Abstract away admission control logic
This commit refactors the admission control logic introduced in the previous commit. - Introduces an AdmissionController interface. - Implements LegacyAdmissionController for existing behavior. - Implements FlowControlAdmissionController for the new flow control path. - Moves flow control request adaptation into FlowControlAdmissionController. - Updates Director to use the AdmissionController interface. - Runner now injects the appropriate controller based on the feature flag. - Splits and refactors tests for better focus and isolation. This improves modularity, testability, and separation of concerns.
1 parent 0b28f2d commit 6114ca5

File tree

7 files changed

+583
-349
lines changed

7 files changed

+583
-349
lines changed

cmd/epp/runner/runner.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -288,46 +288,43 @@ func (r *Runner) Run(ctx context.Context) error {
288288

289289
saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)
290290

291-
// --- Flow Control Initialization (Experimental) ---
292-
291+
// --- Admission Control Initialization ---
293292
enableFlowControl := env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog)
294-
var flowController *fccontroller.FlowController
293+
var admissionController requestcontrol.AdmissionController
295294
if enableFlowControl {
296295
setupLog.Info("Initializing experimental Flow Control layer")
297-
cfg, err := flowControlConfig.ValidateAndApplyDefaults()
296+
fcCfg, err := flowControlConfig.ValidateAndApplyDefaults()
298297
if err != nil {
299298
setupLog.Error(err, "failed to initialize Flow Control layer")
300299
return fmt.Errorf("invalid Flow Control config: %w", err)
301300
}
302301

303-
registry, err := fcregistry.NewFlowRegistry(cfg.Registry, setupLog)
302+
registry, err := fcregistry.NewFlowRegistry(fcCfg.Registry, setupLog)
304303
if err != nil {
305304
return fmt.Errorf("failed to initialize Flow Registry: %w", err)
306305
}
307306
fc, err := fccontroller.NewFlowController(
308307
ctx,
309-
cfg.Controller,
308+
fcCfg.Controller,
310309
registry,
311310
saturationDetector,
312311
setupLog,
313312
)
314313
if err != nil {
315314
return fmt.Errorf("failed to initialize Flow Controller: %w", err)
316315
}
317-
flowController = fc
318-
319316
go registry.Run(ctx)
317+
admissionController = requestcontrol.NewFlowControlAdmissionController(saturationDetector, fc)
320318
} else {
321-
setupLog.Info("Experimental Flow Control layer is disabled")
319+
setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control")
320+
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
322321
}
323322

324323
director := requestcontrol.NewDirectorWithConfig(
325324
datastore,
326325
scheduler,
327-
saturationDetector,
328-
flowController,
329-
r.requestControlConfig,
330-
enableFlowControl)
326+
admissionController,
327+
r.requestControlConfig)
331328

332329
// --- Setup ExtProc Server Runner ---
333330
serverRunner := &runserver.ExtProcServerRunner{
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package requestcontrol
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
"sigs.k8s.io/controller-runtime/pkg/log"
24+
25+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
27+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
28+
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
29+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
30+
)
31+
32+
// AdmissionController defines the interface for making admission control decisions.
33+
// Implementations of this interface determine whether an incoming inference request should be accepted or rejected
34+
// based on various criteria such as system load, fairness, priority, and available capacity.
35+
type AdmissionController interface {
36+
// Admit determines if a request should be admitted.
37+
// It is called by the Director for each incoming request.
38+
//
39+
// Args:
40+
// ctx: The request context, carrying deadlines, cancellation signals, and logger.
41+
// reqCtx: The handlers.RequestContext containing details about the incoming request.
42+
// candidatePods: A list of potential backend pods that can serve the request.
43+
// priority: The priority level of the request, as determined by the InferenceObjective.
44+
//
45+
// Returns:
46+
// - nil: If the request is admitted and should proceed to scheduling.
47+
// - errutil.Error: If the request is rejected.
48+
Admit(
49+
ctx context.Context,
50+
reqCtx *handlers.RequestContext,
51+
candidatePods []backendmetrics.PodMetrics,
52+
priority int,
53+
) error
54+
}
55+
56+
// saturationDetector defines the minimal interface required for checking if the backend pool is saturated.
57+
type saturationDetector interface {
58+
IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool
59+
}
60+
61+
// flowController defines the minimal interface required by FlowControlAdmissionController for enqueuing requests and
62+
// waiting for an admission outcome.
63+
type flowController interface {
64+
EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error)
65+
}
66+
67+
// rejectIfSheddableAndSaturated checks if a request should be immediately rejected because it's sheddable
68+
// (priority < 0) and the system is saturated.
69+
func rejectIfSheddableAndSaturated(
70+
ctx context.Context,
71+
sd saturationDetector,
72+
reqCtx *handlers.RequestContext,
73+
candidatePods []backendmetrics.PodMetrics,
74+
priority int,
75+
) error {
76+
if priority < 0 {
77+
logger := log.FromContext(ctx)
78+
if sd.IsSaturated(ctx, candidatePods) {
79+
logger.V(logutil.TRACE).Info("Request rejected: system saturated and request is sheddable",
80+
"requestID", reqCtx.SchedulingRequest.RequestId)
81+
return errutil.Error{
82+
Code: errutil.InferencePoolResourceExhausted,
83+
Msg: "system saturated, sheddable request dropped",
84+
}
85+
}
86+
}
87+
return nil
88+
}
89+
90+
// --- LegacyAdmissionController ---
91+
92+
// LegacyAdmissionController implements saturation-based admission control.
93+
// It rejects sheddable requests (priority < 0) if the saturationDetector indicates that the system is currently
94+
// saturated. Non-sheddable requests always bypass the saturation check.
95+
type LegacyAdmissionController struct {
96+
saturationDetector saturationDetector
97+
}
98+
99+
// NewLegacyAdmissionController creates a new LegacyAdmissionController.
100+
func NewLegacyAdmissionController(sd saturationDetector) *LegacyAdmissionController {
101+
return &LegacyAdmissionController{saturationDetector: sd}
102+
}
103+
104+
// Admit implements the AdmissionController interface for the legacy strategy.
105+
// It checks for saturation only for requests with priority < 0.
106+
func (lac *LegacyAdmissionController) Admit(
107+
ctx context.Context,
108+
reqCtx *handlers.RequestContext,
109+
candidatePods []backendmetrics.PodMetrics,
110+
priority int,
111+
) error {
112+
logger := log.FromContext(ctx)
113+
logger.V(logutil.TRACE).Info("Executing LegacyAdmissionController",
114+
"priority", priority, "fairnessID", reqCtx.FairnessID)
115+
if err := rejectIfSheddableAndSaturated(ctx, lac.saturationDetector, reqCtx, candidatePods, priority); err != nil {
116+
return err
117+
}
118+
logger.V(logutil.TRACE).Info("Request admitted", "requestID", reqCtx.SchedulingRequest.RequestId)
119+
return nil
120+
}
121+
122+
// --- FlowControlAdmissionController ---
123+
124+
const (
125+
// defaultFairnessID is the default fairness ID used when no ID is provided in the request.
126+
// This ensures that requests without explicit fairness identifiers are still grouped and managed by the Flow Control
127+
// system.
128+
defaultFairnessID = "default-flow"
129+
)
130+
131+
// FlowControlAdmissionController delegates admission decisions to the Flow Control layer.
132+
// It first checks if the request is sheddable and the system is saturated, rejecting immediately if both conditions are
133+
// true. Otherwise, it uses the provided flowController to enqueue the request and await an outcome.
134+
type FlowControlAdmissionController struct {
135+
saturationDetector saturationDetector
136+
flowController flowController
137+
}
138+
139+
// NewFlowControlAdmissionController creates a new FlowControlAdmissionController.
140+
// It requires a SaturationDetector and a flowController instance.
141+
func NewFlowControlAdmissionController(sd saturationDetector, fc flowController) *FlowControlAdmissionController {
142+
return &FlowControlAdmissionController{
143+
saturationDetector: sd,
144+
flowController: fc,
145+
}
146+
}
147+
148+
// Admit implements the AdmissionController interface by checking for saturation on sheddable requests first, then
149+
// deferring to the Flow Control system.
150+
func (fcac *FlowControlAdmissionController) Admit(
151+
ctx context.Context,
152+
reqCtx *handlers.RequestContext,
153+
candidatePods []backendmetrics.PodMetrics,
154+
priority int,
155+
) error {
156+
logger := log.FromContext(ctx)
157+
logger.V(logutil.TRACE).Info("Executing FlowControlAdmissionController",
158+
"requestID", reqCtx.SchedulingRequest.RequestId, "priority", priority, "fairnessID", reqCtx.FairnessID)
159+
if err := rejectIfSheddableAndSaturated(ctx, fcac.saturationDetector, reqCtx, candidatePods, priority); err != nil {
160+
return err
161+
}
162+
163+
logger.V(logutil.TRACE).Info("Request proceeding to flow control", "requestID", reqCtx.SchedulingRequest.RequestId)
164+
fairnessID := reqCtx.FairnessID
165+
if fairnessID == "" {
166+
logger.V(logutil.TRACE).Info("No fairnessID provided, using default",
167+
"requestID", reqCtx.SchedulingRequest.RequestId, "defaultFairnessID", defaultFairnessID)
168+
fairnessID = defaultFairnessID
169+
}
170+
171+
fcReq := &flowControlRequest{
172+
ctx: ctx,
173+
requestID: reqCtx.SchedulingRequest.RequestId,
174+
fairnessID: fairnessID,
175+
priority: priority,
176+
requestByteSize: uint64(reqCtx.RequestSize),
177+
candidatePods: candidatePods,
178+
}
179+
180+
outcome, err := fcac.flowController.EnqueueAndWait(fcReq)
181+
logger.V(logutil.DEBUG).Info("Flow control outcome",
182+
"requestID", reqCtx.SchedulingRequest.RequestId, "outcome", outcome, "error", err)
183+
return translateFlowControlOutcome(outcome, err)
184+
}
185+
186+
// flowControlRequest is an adapter that implements the types.FlowControlRequest interface.
187+
type flowControlRequest struct {
188+
ctx context.Context
189+
requestID string
190+
fairnessID string
191+
priority int
192+
requestByteSize uint64
193+
candidatePods []backendmetrics.PodMetrics
194+
}
195+
196+
var _ types.FlowControlRequest = &flowControlRequest{}
197+
198+
func (r *flowControlRequest) Context() context.Context { return r.ctx }
199+
func (r *flowControlRequest) ID() string { return r.requestID }
200+
func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default.
201+
func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize }
202+
func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics {
203+
return r.candidatePods
204+
}
205+
func (r *flowControlRequest) FlowKey() types.FlowKey {
206+
return types.FlowKey{ID: r.fairnessID, Priority: r.priority}
207+
}
208+
209+
// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error
210+
// contract used by the Director.
211+
func translateFlowControlOutcome(outcome types.QueueOutcome, err error) error {
212+
msg := "request rejected by flow control"
213+
if err != nil {
214+
msg = err.Error()
215+
}
216+
217+
switch outcome {
218+
case types.QueueOutcomeDispatched:
219+
return nil
220+
case types.QueueOutcomeRejectedCapacity:
221+
return errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: msg}
222+
case types.QueueOutcomeEvictedTTL:
223+
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "request timed out in queue: " + msg}
224+
case types.QueueOutcomeEvictedContextCancelled:
225+
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "client disconnected: " + msg}
226+
case types.QueueOutcomeRejectedOther, types.QueueOutcomeEvictedOther:
227+
return errutil.Error{Code: errutil.Internal, Msg: "internal flow control error: " + msg}
228+
default:
229+
return errutil.Error{Code: errutil.Internal, Msg: "unhandled flow control outcome: " + msg}
230+
}
231+
}

0 commit comments

Comments
 (0)