Skip to content

Commit 82605c7

Browse files
committed
refactor: Abstract away admission control logic
This commit refactors the admission control logic introduced in the previous commit. - Introduces an AdmissionController interface. - Implements LegacyAdmissionController for existing behavior. - Implements FlowControlAdmissionController for the new flow control path. - Moves flow control request adaptation into FlowControlAdmissionController. - Updates Director to use the AdmissionController interface. - Runner now injects the appropriate controller based on the feature flag. - Splits and refactors tests for better focus and isolation. This improves modularity, testability, and separation of concerns.
1 parent 0b28f2d commit 82605c7

File tree

7 files changed

+567
-349
lines changed

7 files changed

+567
-349
lines changed

cmd/epp/runner/runner.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -288,46 +288,43 @@ func (r *Runner) Run(ctx context.Context) error {
288288

289289
saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog)
290290

291-
// --- Flow Control Initialization (Experimental) ---
292-
291+
// --- Admission Control Initialization ---
293292
enableFlowControl := env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog)
294-
var flowController *fccontroller.FlowController
293+
var admissionController requestcontrol.AdmissionController
295294
if enableFlowControl {
296295
setupLog.Info("Initializing experimental Flow Control layer")
297-
cfg, err := flowControlConfig.ValidateAndApplyDefaults()
296+
fcCfg, err := flowControlConfig.ValidateAndApplyDefaults()
298297
if err != nil {
299298
setupLog.Error(err, "failed to initialize Flow Control layer")
300299
return fmt.Errorf("invalid Flow Control config: %w", err)
301300
}
302301

303-
registry, err := fcregistry.NewFlowRegistry(cfg.Registry, setupLog)
302+
registry, err := fcregistry.NewFlowRegistry(fcCfg.Registry, setupLog)
304303
if err != nil {
305304
return fmt.Errorf("failed to initialize Flow Registry: %w", err)
306305
}
307306
fc, err := fccontroller.NewFlowController(
308307
ctx,
309-
cfg.Controller,
308+
fcCfg.Controller,
310309
registry,
311310
saturationDetector,
312311
setupLog,
313312
)
314313
if err != nil {
315314
return fmt.Errorf("failed to initialize Flow Controller: %w", err)
316315
}
317-
flowController = fc
318-
319316
go registry.Run(ctx)
317+
admissionController = requestcontrol.NewFlowControlAdmissionController(saturationDetector, fc)
320318
} else {
321-
setupLog.Info("Experimental Flow Control layer is disabled")
319+
setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control")
320+
admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector)
322321
}
323322

324323
director := requestcontrol.NewDirectorWithConfig(
325324
datastore,
326325
scheduler,
327-
saturationDetector,
328-
flowController,
329-
r.requestControlConfig,
330-
enableFlowControl)
326+
admissionController,
327+
r.requestControlConfig)
331328

332329
// --- Setup ExtProc Server Runner ---
333330
serverRunner := &runserver.ExtProcServerRunner{
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
package requestcontrol
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"sigs.k8s.io/controller-runtime/pkg/log"
8+
9+
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
10+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
11+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
12+
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
13+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
14+
)
15+
16+
// AdmissionController defines the interface for making admission control decisions.
17+
// Implementations of this interface determine whether an incoming inference request should be accepted or rejected
18+
// based on various criteria such as system load, fairness, priority, and available capacity.
19+
type AdmissionController interface {
20+
// Admit determines if a request should be admitted.
21+
// It is called by the Director for each incoming request.
22+
//
23+
// Args:
24+
// ctx: The request context, carrying deadlines, cancellation signals, and logger.
25+
// reqCtx: The handlers.RequestContext containing details about the incoming request.
26+
// candidatePods: A list of potential backend pods that can serve the request.
27+
// priority: The priority level of the request, as determined by the InferenceObjective.
28+
//
29+
// Returns:
30+
// - nil: If the request is admitted and should proceed to scheduling.
31+
// - errutil.Error: If the request is rejected.
32+
Admit(
33+
ctx context.Context,
34+
reqCtx *handlers.RequestContext,
35+
candidatePods []backendmetrics.PodMetrics,
36+
priority int,
37+
) error
38+
}
39+
40+
// saturationDetector defines the minimal interface required for checking if the backend pool is saturated.
41+
type saturationDetector interface {
42+
IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool
43+
}
44+
45+
// flowController defines the minimal interface required by FlowControlAdmissionController for enqueuing requests and
46+
// waiting for an admission outcome.
47+
type flowController interface {
48+
EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error)
49+
}
50+
51+
// rejectIfSheddableAndSaturated checks if a request should be immediately rejected because it's sheddable
52+
// (priority < 0) and the system is saturated.
53+
func rejectIfSheddableAndSaturated(
54+
ctx context.Context,
55+
sd saturationDetector,
56+
reqCtx *handlers.RequestContext,
57+
candidatePods []backendmetrics.PodMetrics,
58+
priority int,
59+
) error {
60+
if priority < 0 {
61+
logger := log.FromContext(ctx)
62+
if sd.IsSaturated(ctx, candidatePods) {
63+
logger.V(logutil.TRACE).Info("Request rejected: system saturated and request is sheddable",
64+
"requestID", reqCtx.SchedulingRequest.RequestId)
65+
return errutil.Error{
66+
Code: errutil.InferencePoolResourceExhausted,
67+
Msg: "system saturated, sheddable request dropped",
68+
}
69+
}
70+
}
71+
return nil
72+
}
73+
74+
// --- LegacyAdmissionController ---
75+
76+
// LegacyAdmissionController implements saturation-based admission control.
77+
// It rejects sheddable requests (priority < 0) if the saturationDetector indicates that the system is currently
78+
// saturated. Non-sheddable requests always bypass the saturation check.
79+
type LegacyAdmissionController struct {
80+
saturationDetector saturationDetector
81+
}
82+
83+
// NewLegacyAdmissionController creates a new LegacyAdmissionController.
84+
func NewLegacyAdmissionController(sd saturationDetector) *LegacyAdmissionController {
85+
return &LegacyAdmissionController{saturationDetector: sd}
86+
}
87+
88+
// Admit implements the AdmissionController interface for the legacy strategy.
89+
// It checks for saturation only for requests with priority < 0.
90+
func (lac *LegacyAdmissionController) Admit(
91+
ctx context.Context,
92+
reqCtx *handlers.RequestContext,
93+
candidatePods []backendmetrics.PodMetrics,
94+
priority int,
95+
) error {
96+
logger := log.FromContext(ctx)
97+
logger.V(logutil.TRACE).Info("Executing LegacyAdmissionController",
98+
"priority", priority, "fairnessID", reqCtx.FairnessID)
99+
if err := rejectIfSheddableAndSaturated(ctx, lac.saturationDetector, reqCtx, candidatePods, priority); err != nil {
100+
return err
101+
}
102+
logger.V(logutil.TRACE).Info("Request admitted", "requestID", reqCtx.SchedulingRequest.RequestId)
103+
return nil
104+
}
105+
106+
// --- FlowControlAdmissionController ---
107+
108+
const (
109+
// defaultFairnessID is the default fairness ID used when no ID is provided in the request.
110+
// This ensures that requests without explicit fairness identifiers are still grouped and managed by the Flow Control
111+
// system.
112+
defaultFairnessID = "default-flow"
113+
)
114+
115+
// FlowControlAdmissionController delegates admission decisions to the Flow Control layer.
116+
// It first checks if the request is sheddable and the system is saturated, rejecting immediately if both conditions are
117+
// true. Otherwise, it uses the provided flowController to enqueue the request and await an outcome.
118+
type FlowControlAdmissionController struct {
119+
saturationDetector saturationDetector
120+
flowController flowController
121+
}
122+
123+
// NewFlowControlAdmissionController creates a new FlowControlAdmissionController.
124+
// It requires a SaturationDetector and a flowController instance.
125+
func NewFlowControlAdmissionController(sd saturationDetector, fc flowController) *FlowControlAdmissionController {
126+
return &FlowControlAdmissionController{
127+
saturationDetector: sd,
128+
flowController: fc,
129+
}
130+
}
131+
132+
// Admit implements the AdmissionController interface by checking for saturation on sheddable requests first, then
133+
// deferring to the Flow Control system.
134+
func (fcac *FlowControlAdmissionController) Admit(
135+
ctx context.Context,
136+
reqCtx *handlers.RequestContext,
137+
candidatePods []backendmetrics.PodMetrics,
138+
priority int,
139+
) error {
140+
logger := log.FromContext(ctx)
141+
logger.V(logutil.TRACE).Info("Executing FlowControlAdmissionController",
142+
"requestID", reqCtx.SchedulingRequest.RequestId, "priority", priority, "fairnessID", reqCtx.FairnessID)
143+
if err := rejectIfSheddableAndSaturated(ctx, fcac.saturationDetector, reqCtx, candidatePods, priority); err != nil {
144+
return err
145+
}
146+
147+
logger.V(logutil.TRACE).Info("Request proceeding to flow control", "requestID", reqCtx.SchedulingRequest.RequestId)
148+
fairnessID := reqCtx.FairnessID
149+
if fairnessID == "" {
150+
logger.V(logutil.TRACE).Info("No fairnessID provided, using default",
151+
"requestID", reqCtx.SchedulingRequest.RequestId, "defaultFairnessID", defaultFairnessID)
152+
fairnessID = defaultFairnessID
153+
}
154+
155+
fcReq := &flowControlRequest{
156+
ctx: ctx,
157+
requestID: reqCtx.SchedulingRequest.RequestId,
158+
fairnessID: fairnessID,
159+
priority: priority,
160+
requestByteSize: uint64(reqCtx.RequestSize),
161+
candidatePods: candidatePods,
162+
}
163+
164+
outcome, err := fcac.flowController.EnqueueAndWait(fcReq)
165+
logger.V(logutil.DEBUG).Info("Flow control outcome",
166+
"requestID", reqCtx.SchedulingRequest.RequestId, "outcome", outcome, "error", err)
167+
return translateFlowControlOutcome(outcome, err)
168+
}
169+
170+
// flowControlRequest is an adapter that implements the types.FlowControlRequest interface.
171+
type flowControlRequest struct {
172+
ctx context.Context
173+
requestID string
174+
fairnessID string
175+
priority int
176+
requestByteSize uint64
177+
candidatePods []backendmetrics.PodMetrics
178+
}
179+
180+
var _ types.FlowControlRequest = &flowControlRequest{}
181+
182+
func (r *flowControlRequest) Context() context.Context { return r.ctx }
183+
func (r *flowControlRequest) ID() string { return r.requestID }
184+
func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default.
185+
func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize }
186+
func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics {
187+
return r.candidatePods
188+
}
189+
func (r *flowControlRequest) FlowKey() types.FlowKey {
190+
return types.FlowKey{ID: r.fairnessID, Priority: r.priority}
191+
}
192+
193+
// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error
194+
// contract used by the Director.
195+
func translateFlowControlOutcome(outcome types.QueueOutcome, err error) error {
196+
msg := "request rejected by flow control"
197+
if err != nil {
198+
msg = err.Error()
199+
}
200+
201+
switch outcome {
202+
case types.QueueOutcomeDispatched:
203+
return nil
204+
case types.QueueOutcomeRejectedCapacity:
205+
return errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: msg}
206+
case types.QueueOutcomeEvictedTTL:
207+
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "request timed out in queue: " + msg}
208+
case types.QueueOutcomeEvictedContextCancelled:
209+
return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "client disconnected: " + msg}
210+
case types.QueueOutcomeRejectedOther, types.QueueOutcomeEvictedOther:
211+
return errutil.Error{Code: errutil.Internal, Msg: "internal flow control error: " + msg}
212+
default:
213+
return errutil.Error{Code: errutil.Internal, Msg: "unhandled flow control outcome: " + msg}
214+
}
215+
}

0 commit comments

Comments
 (0)