From 0b28f2dae39644e7d818afa9e1df2ebba931eee2 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Fri, 10 Oct 2025 23:33:45 +0000 Subject: [PATCH 1/4] feat(fc): Initial wiring of the flow control layer This commit introduces the initial integration of the new Flow Control layer into the Endpoint Picker (EPP). The primary goal is to provide a mechanism for more sophisticated request admission control, queuing, and multitenancy management. This new layer is gated by the ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER feature flag. When enabled, the Director delegates admission control decisions to the new flowController component. The legacy saturation-based shedding logic is preserved for when the feature is disabled. Comprehensive unit tests for the admitRequest function have been added to validate the behavior of both the legacy and new flow control paths. --- cmd/epp/runner/runner.go | 62 ++++++++- pkg/epp/requestcontrol/director.go | 107 ++++++++++++-- pkg/epp/requestcontrol/director_test.go | 178 +++++++++++++++++++++++- test/integration/epp/hermetic_test.go | 2 +- 4 files changed, 327 insertions(+), 22 deletions(-) diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 5ee785c3c..a5a034eb4 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -50,6 +50,9 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol" + fccontroller "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/controller" + fcregistry "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/registry" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" @@ -68,11 +71,25 @@ import ( ) const ( - // enableExperimentalDatalayerV2 defines the environment variable - // used as feature flag for the pluggable data layer. + // enableExperimentalDatalayerV2 defines the environment variable used as feature flag for the pluggable data layer. enableExperimentalDatalayerV2 = "ENABLE_EXPERIMENTAL_DATALAYER_V2" + // enableExperimentalFlowControlLayer defines the environment variable used as a feature flag for the pluggable flow + // control layer. + enableExperimentalFlowControlLayer = "ENABLE_EXPERIMENTAL_FLOW_CONTROL_LAYER" ) +// TODO: this is hardcoded for POC only. This needs to be hooked up to our text-based config story. +var flowControlConfig = flowcontrol.Config{ + Controller: fccontroller.Config{}, // Use all defaults. + Registry: fcregistry.Config{ + // Define domain of accepted priority levels as this field is required. Use defaults for all optional fields. + // TODO: this should not be hardcoded. + PriorityBands: []fcregistry.PriorityBandConfig{ + {Priority: 0, PriorityName: "Default"}, + }, + }, +} + var ( grpcPort = flag.Int("grpc-port", runserver.DefaultGrpcPort, "The gRPC port used for communicating with Envoy proxy") grpcHealthPort = flag.Int("grpc-health-port", runserver.DefaultGrpcHealthPort, "The port used for gRPC liveness and readiness probes") @@ -271,7 +288,46 @@ func (r *Runner) Run(ctx context.Context) error { saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog) - director := requestcontrol.NewDirectorWithConfig(datastore, scheduler, saturationDetector, r.requestControlConfig) + // --- Flow Control Initialization (Experimental) --- + + enableFlowControl := env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog) + var flowController *fccontroller.FlowController + if enableFlowControl { + setupLog.Info("Initializing experimental Flow Control layer") + cfg, err := flowControlConfig.ValidateAndApplyDefaults() + if err != nil { + setupLog.Error(err, "failed to initialize Flow Control layer") + return fmt.Errorf("invalid Flow Control config: %w", err) + } + + registry, err := fcregistry.NewFlowRegistry(cfg.Registry, setupLog) + if err != nil { + return fmt.Errorf("failed to initialize Flow Registry: %w", err) + } + fc, err := fccontroller.NewFlowController( + ctx, + cfg.Controller, + registry, + saturationDetector, + setupLog, + ) + if err != nil { + return fmt.Errorf("failed to initialize Flow Controller: %w", err) + } + flowController = fc + + go registry.Run(ctx) + } else { + setupLog.Info("Experimental Flow Control layer is disabled") + } + + director := requestcontrol.NewDirectorWithConfig( + datastore, + scheduler, + saturationDetector, + flowController, + r.requestControlConfig, + enableFlowControl) // --- Setup ExtProc Server Runner --- serverRunner := &runserver.ExtProcServerRunner{ diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index a3e2d6d13..3cabdb429 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -33,6 +33,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" @@ -59,15 +60,29 @@ type SaturationDetector interface { IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool } +// flowController defines the minimal interface required by the Director for flow control. +type flowController interface { + EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error) +} + // NewDirectorWithConfig creates a new Director instance with all dependencies. -func NewDirectorWithConfig(datastore Datastore, scheduler Scheduler, saturationDetector SaturationDetector, config *Config) *Director { +func NewDirectorWithConfig( + datastore Datastore, + scheduler Scheduler, + saturationDetector SaturationDetector, + fc flowController, + config *Config, + enableFlowControl bool, +) *Director { return &Director{ datastore: datastore, scheduler: scheduler, saturationDetector: saturationDetector, + flowController: fc, preRequestPlugins: config.preRequestPlugins, postResponsePlugins: config.postResponsePlugins, defaultPriority: 0, // define default priority explicitly + enableFlowControl: enableFlowControl, } } @@ -76,12 +91,14 @@ type Director struct { datastore Datastore scheduler Scheduler saturationDetector SaturationDetector + flowController flowController preRequestPlugins []PreRequest postResponsePlugins []PostResponse // we just need a pointer to an int variable since priority is a pointer in InferenceObjective // no need to set this in the constructor, since the value we want is the default int val // and value types cannot be nil - defaultPriority int + defaultPriority int + enableFlowControl bool } // HandleRequest orchestrates the request lifecycle. @@ -141,7 +158,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo } // Admission Control check - if err := d.admitRequest(ctx, candidatePods, *infObjective.Spec.Priority, reqCtx.FairnessID); err != nil { + if err := d.admitRequest(ctx, reqCtx, candidatePods, *infObjective.Spec.Priority); err != nil { return reqCtx, err } @@ -209,27 +226,43 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet // admitRequest handles admission control to decide whether or not to accept the request // based on the request priority and saturation state. -func (d *Director) admitRequest(ctx context.Context, candidatePods []backendmetrics.PodMetrics, requestPriority int, fairnessID string) error { +func (d *Director) admitRequest(ctx context.Context, reqCtx *handlers.RequestContext, candidatePods []backendmetrics.PodMetrics, requestPriority int) error { loggerTrace := log.FromContext(ctx).V(logutil.TRACE) - loggerTrace.Info("Entering Flow Control", "priority", requestPriority, "fairnessID", fairnessID) + loggerTrace.Info("Entering admission control", "priority", requestPriority, "fairnessID", reqCtx.FairnessID, "flowControlEnabled", d.flowController != nil) // This will be removed in favor of a more robust implementation (Flow Control) in the very near future. // TODO: Make this a configurable value. // Tracking issue https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1347 - if requestPriority >= 0 { - loggerTrace.Info("Non-sheddable request bypassing saturation check.") - return nil - } - - if d.saturationDetector.IsSaturated(ctx, candidatePods) { + isSheddable := requestPriority < 0 + if isSheddable && d.saturationDetector.IsSaturated(ctx, candidatePods) { return errutil.Error{ Code: errutil.InferencePoolResourceExhausted, Msg: "system saturated, sheddable request dropped", } } - return nil + if !d.enableFlowControl { + loggerTrace.Info("Non-sheddable request bypassing saturation check.") + return nil + } + + fairnessID := reqCtx.FairnessID + if fairnessID == "" { + fairnessID = "default-flow" + } + + fcReq := &flowControlRequest{ + ctx: ctx, + requestID: reqCtx.SchedulingRequest.RequestId, + fairnessID: fairnessID, + priority: requestPriority, + requestByteSize: uint64(reqCtx.RequestSize), + candidatePods: candidatePods, + } + + outcome, err := d.flowController.EnqueueAndWait(fcReq) + return translateFlowControlOutcome(outcome, err) } // prepareRequest populates the RequestContext and calls the registered PreRequest plugins @@ -323,3 +356,53 @@ func (d *Director) runPostResponsePlugins(ctx context.Context, request *scheduli loggerDebug.Info("Completed running post-response plugin successfully", "plugin", plugin.TypedName()) } } + +// --- Flow Control Integration --- + +// flowControlRequest is an adapter that implements the types.FlowControlRequest interface, wrapping the director's +// internal request context. +type flowControlRequest struct { + ctx context.Context + requestID string + fairnessID string + priority int + requestByteSize uint64 + candidatePods []backendmetrics.PodMetrics +} + +var _ types.FlowControlRequest = &flowControlRequest{} + +func (r *flowControlRequest) Context() context.Context { return r.ctx } +func (r *flowControlRequest) ID() string { return r.requestID } +func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default. +func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize } +func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics { + return r.candidatePods +} +func (r *flowControlRequest) FlowKey() types.FlowKey { + return types.FlowKey{ID: r.fairnessID, Priority: r.priority} +} + +// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error +// contract. +func translateFlowControlOutcome(outcome types.QueueOutcome, err error) error { + msg := "request rejected by flow control" + if err != nil { + msg = err.Error() + } + + switch outcome { + case types.QueueOutcomeDispatched: + return nil + case types.QueueOutcomeRejectedCapacity: + return errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: msg} + case types.QueueOutcomeEvictedTTL: + return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "request timed out in queue: " + msg} + case types.QueueOutcomeEvictedContextCancelled: + return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "client disconnected: " + msg} + case types.QueueOutcomeRejectedOther, types.QueueOutcomeEvictedOther: + return errutil.Error{Code: errutil.Internal, Msg: msg} + default: + return errutil.Error{Code: errutil.Internal, Msg: "unhandled flow control outcome: " + msg} + } +} diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index a0cb7c325..125872144 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -26,6 +26,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -38,6 +39,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" + fctypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" @@ -71,8 +73,12 @@ type mockDatastore struct { pods []backendmetrics.PodMetrics } -func (ds *mockDatastore) PoolGet() (*v1.InferencePool, error) { return nil, nil } -func (ds *mockDatastore) ObjectiveGet(_ string) *v1alpha2.InferenceObjective { return nil } +func (ds *mockDatastore) PoolGet() (*v1.InferencePool, error) { + return nil, nil +} +func (ds *mockDatastore) ObjectiveGet(_ string) *v1alpha2.InferenceObjective { + return nil +} func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics { res := []backendmetrics.PodMetrics{} for _, pod := range ds.pods { @@ -84,6 +90,15 @@ func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) return res } +type mockFlowController struct { + outcome fctypes.QueueOutcome + err error +} + +func (m *mockFlowController) EnqueueAndWait(_ fctypes.FlowControlRequest) (fctypes.QueueOutcome, error) { + return m.outcome, m.err +} + func TestDirector_HandleRequest(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) @@ -363,7 +378,6 @@ func TestDirector_HandleRequest(t *testing.T) { mockSaturationDetector: &mockSaturationDetector{isSaturated: false}, wantErrCode: errutil.BadRequest, }, - { name: "prompt or messages not found, expect err", reqBodyMap: map[string]any{"model": model}, @@ -410,7 +424,7 @@ func TestDirector_HandleRequest(t *testing.T) { if test.schedulerMockSetup != nil { test.schedulerMockSetup(mockSched) } - director := NewDirectorWithConfig(ds, mockSched, test.mockSaturationDetector, NewConfig()) + director := NewDirectorWithConfig(ds, mockSched, test.mockSaturationDetector, nil, NewConfig(), false) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -458,6 +472,158 @@ func TestDirector_HandleRequest(t *testing.T) { } } +func TestAdmitRequest(t *testing.T) { + t.Parallel() + ctx := logutil.NewTestLoggerIntoContext(context.Background()) + candidatePods := []backendmetrics.PodMetrics{} + reqCtx := &handlers.RequestContext{ + SchedulingRequest: &schedulingtypes.LLMRequest{}, + } + + t.Run("flow control disabled", func(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + priority int + isSaturated bool + expectErr bool + expectErrCode string + expectErrSubstr string + }{ + { + name: "should admit non-sheddable request when saturated", + priority: 0, + isSaturated: true, + expectErr: false, + }, + { + name: "should admit sheddable request when not saturated", + priority: -1, + isSaturated: false, + expectErr: false, + }, + { + name: "should reject sheddable request when saturated", + priority: -1, + isSaturated: true, + expectErr: true, + expectErrCode: errutil.InferencePoolResourceExhausted, + expectErrSubstr: "system saturated, sheddable request dropped", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + // --- ARRANGE --- + saturationDetector := &mockSaturationDetector{isSaturated: tc.isSaturated} + director := NewDirectorWithConfig(nil, nil, saturationDetector, nil, NewConfig(), false) // FC disabled + + // --- ACT --- + err := director.admitRequest(ctx, reqCtx, candidatePods, tc.priority) + + // --- ASSERT --- + if !tc.expectErr { + assert.NoError(t, err, "expected no error for this scenario") + } else { + require.Error(t, err, "expected an error for this scenario") + var e errutil.Error + if assert.ErrorAs(t, err, &e, "error should be of type errutil.Error") { + assert.Equal(t, tc.expectErrCode, e.Code, "error code should match expected") + assert.Contains(t, e.Msg, tc.expectErrSubstr, "error message should contain expected substring") + } + } + }) + } + }) + + t.Run("flow control enabled", func(t *testing.T) { + t.Parallel() + testCases := []struct { + name string + fcOutcome fctypes.QueueOutcome + fcErr error + expectErr bool + expectErrCode string + expectErrSubstr string + }{ + { + name: "should admit when flow controller dispatches", + fcOutcome: fctypes.QueueOutcomeDispatched, + expectErr: false, + }, + { + name: "should reject on capacity rejection", + fcOutcome: fctypes.QueueOutcomeRejectedCapacity, + expectErr: true, + expectErrCode: errutil.InferencePoolResourceExhausted, + expectErrSubstr: "request rejected by flow control", + }, + { + name: "should reject on TTL eviction", + fcOutcome: fctypes.QueueOutcomeEvictedTTL, + fcErr: errors.New("timeout"), + expectErr: true, + expectErrCode: errutil.ServiceUnavailable, + expectErrSubstr: "request timed out in queue: timeout", + }, + { + name: "should reject on context cancelled eviction", + fcOutcome: fctypes.QueueOutcomeEvictedContextCancelled, + expectErr: true, + expectErrCode: errutil.ServiceUnavailable, + expectErrSubstr: "client disconnected", + }, + { + name: "should return internal error on other rejection", + fcOutcome: fctypes.QueueOutcomeRejectedOther, + expectErr: true, + expectErrCode: errutil.Internal, + expectErrSubstr: "request rejected by flow control", + }, + { + name: "should return internal error on other eviction", + fcOutcome: fctypes.QueueOutcomeEvictedOther, + fcErr: errors.New("internal error"), + expectErr: true, + expectErrCode: errutil.Internal, + expectErrSubstr: "internal error", + }, + { + name: "should return internal error on unhandled outcome", + fcOutcome: fctypes.QueueOutcomeNotYetFinalized, + expectErr: true, + expectErrCode: errutil.Internal, + expectErrSubstr: "unhandled flow control outcome", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + // --- ARRANGE --- + fc := &mockFlowController{outcome: tc.fcOutcome, err: tc.fcErr} + director := NewDirectorWithConfig(nil, nil, nil, fc, NewConfig(), true) // FC enabled + + // --- ACT --- + err := director.admitRequest(ctx, reqCtx, candidatePods, 0) + + // --- ASSERT --- + if !tc.expectErr { + assert.NoError(t, err, "expected no error for this scenario") + } else { + require.Error(t, err, "expected an error for this scenario") + var e errutil.Error + if assert.ErrorAs(t, err, &e, "error should be of type errutil.Error") { + assert.Equal(t, tc.expectErrCode, e.Code, "error code should match expected") + assert.Contains(t, e.Msg, tc.expectErrSubstr, "error message should contain expected substring") + } + } + }) + } + }) +} + // TestGetCandidatePodsForScheduling is testing getCandidatePodsForScheduling and more specifically the functionality of SubsetFilter. func TestGetCandidatePodsForScheduling(t *testing.T) { var makeFilterMetadata = func(data []any) map[string]any { @@ -529,7 +695,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) { ds := &mockDatastore{pods: testInput} for _, test := range tests { t.Run(test.name, func(t *testing.T) { - director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockSaturationDetector{}, NewConfig()) + director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockSaturationDetector{}, nil, NewConfig(), false) got := director.getCandidatePodsForScheduling(context.Background(), test.metadata) @@ -598,7 +764,7 @@ func TestDirector_HandleResponse(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil) mockSched := &mockScheduler{} - director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithPostResponsePlugins(pr1)) + director := NewDirectorWithConfig(ds, mockSched, nil, nil, NewConfig().WithPostResponsePlugins(pr1), false) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 3dc42f8ba..0abcf2462 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -1180,7 +1180,7 @@ func BeforeSuite() func() { } detector := saturationdetector.NewDetector(sdConfig, logger.WithName("saturation-detector")) serverRunner.SaturationDetector = detector - serverRunner.Director = requestcontrol.NewDirectorWithConfig(serverRunner.Datastore, scheduler, detector, requestcontrol.NewConfig()) + serverRunner.Director = requestcontrol.NewDirectorWithConfig(serverRunner.Datastore, scheduler, detector, nil, requestcontrol.NewConfig(), false) serverRunner.SecureServing = false if err := serverRunner.SetupWithManager(context.Background(), mgr); err != nil { From 6114ca509a5ff1609cb7e1652d3b26b28c7f09fa Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Mon, 13 Oct 2025 04:00:23 +0000 Subject: [PATCH 2/4] refactor: Abstract away admission control logic This commit refactors the admission control logic introduced in the previous commit. - Introduces an AdmissionController interface. - Implements LegacyAdmissionController for existing behavior. - Implements FlowControlAdmissionController for the new flow control path. - Moves flow control request adaptation into FlowControlAdmissionController. - Updates Director to use the AdmissionController interface. - Runner now injects the appropriate controller based on the feature flag. - Splits and refactors tests for better focus and isolation. This improves modularity, testability, and separation of concerns. --- cmd/epp/runner/runner.go | 23 +- pkg/epp/requestcontrol/admission.go | 231 +++++++++++++++++++ pkg/epp/requestcontrol/admission_test.go | 282 +++++++++++++++++++++++ pkg/epp/requestcontrol/director.go | 130 ++--------- pkg/epp/requestcontrol/director_test.go | 260 ++++----------------- pkg/epp/server/runserver.go | 3 +- test/integration/epp/hermetic_test.go | 3 +- 7 files changed, 583 insertions(+), 349 deletions(-) create mode 100644 pkg/epp/requestcontrol/admission.go create mode 100644 pkg/epp/requestcontrol/admission_test.go diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index a5a034eb4..06293c70c 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -288,25 +288,24 @@ func (r *Runner) Run(ctx context.Context) error { saturationDetector := saturationdetector.NewDetector(sdConfig, setupLog) - // --- Flow Control Initialization (Experimental) --- - + // --- Admission Control Initialization --- enableFlowControl := env.GetEnvBool(enableExperimentalFlowControlLayer, false, setupLog) - var flowController *fccontroller.FlowController + var admissionController requestcontrol.AdmissionController if enableFlowControl { setupLog.Info("Initializing experimental Flow Control layer") - cfg, err := flowControlConfig.ValidateAndApplyDefaults() + fcCfg, err := flowControlConfig.ValidateAndApplyDefaults() if err != nil { setupLog.Error(err, "failed to initialize Flow Control layer") return fmt.Errorf("invalid Flow Control config: %w", err) } - registry, err := fcregistry.NewFlowRegistry(cfg.Registry, setupLog) + registry, err := fcregistry.NewFlowRegistry(fcCfg.Registry, setupLog) if err != nil { return fmt.Errorf("failed to initialize Flow Registry: %w", err) } fc, err := fccontroller.NewFlowController( ctx, - cfg.Controller, + fcCfg.Controller, registry, saturationDetector, setupLog, @@ -314,20 +313,18 @@ func (r *Runner) Run(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to initialize Flow Controller: %w", err) } - flowController = fc - go registry.Run(ctx) + admissionController = requestcontrol.NewFlowControlAdmissionController(saturationDetector, fc) } else { - setupLog.Info("Experimental Flow Control layer is disabled") + setupLog.Info("Experimental Flow Control layer is disabled, using legacy admission control") + admissionController = requestcontrol.NewLegacyAdmissionController(saturationDetector) } director := requestcontrol.NewDirectorWithConfig( datastore, scheduler, - saturationDetector, - flowController, - r.requestControlConfig, - enableFlowControl) + admissionController, + r.requestControlConfig) // --- Setup ExtProc Server Runner --- serverRunner := &runserver.ExtProcServerRunner{ diff --git a/pkg/epp/requestcontrol/admission.go b/pkg/epp/requestcontrol/admission.go new file mode 100644 index 000000000..85faab5bc --- /dev/null +++ b/pkg/epp/requestcontrol/admission.go @@ -0,0 +1,231 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package requestcontrol + +import ( + "context" + "time" + + "sigs.k8s.io/controller-runtime/pkg/log" + + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +// AdmissionController defines the interface for making admission control decisions. +// Implementations of this interface determine whether an incoming inference request should be accepted or rejected +// based on various criteria such as system load, fairness, priority, and available capacity. +type AdmissionController interface { + // Admit determines if a request should be admitted. + // It is called by the Director for each incoming request. + // + // Args: + // ctx: The request context, carrying deadlines, cancellation signals, and logger. + // reqCtx: The handlers.RequestContext containing details about the incoming request. + // candidatePods: A list of potential backend pods that can serve the request. + // priority: The priority level of the request, as determined by the InferenceObjective. + // + // Returns: + // - nil: If the request is admitted and should proceed to scheduling. + // - errutil.Error: If the request is rejected. + Admit( + ctx context.Context, + reqCtx *handlers.RequestContext, + candidatePods []backendmetrics.PodMetrics, + priority int, + ) error +} + +// saturationDetector defines the minimal interface required for checking if the backend pool is saturated. +type saturationDetector interface { + IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool +} + +// flowController defines the minimal interface required by FlowControlAdmissionController for enqueuing requests and +// waiting for an admission outcome. +type flowController interface { + EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error) +} + +// rejectIfSheddableAndSaturated checks if a request should be immediately rejected because it's sheddable +// (priority < 0) and the system is saturated. +func rejectIfSheddableAndSaturated( + ctx context.Context, + sd saturationDetector, + reqCtx *handlers.RequestContext, + candidatePods []backendmetrics.PodMetrics, + priority int, +) error { + if priority < 0 { + logger := log.FromContext(ctx) + if sd.IsSaturated(ctx, candidatePods) { + logger.V(logutil.TRACE).Info("Request rejected: system saturated and request is sheddable", + "requestID", reqCtx.SchedulingRequest.RequestId) + return errutil.Error{ + Code: errutil.InferencePoolResourceExhausted, + Msg: "system saturated, sheddable request dropped", + } + } + } + return nil +} + +// --- LegacyAdmissionController --- + +// LegacyAdmissionController implements saturation-based admission control. +// It rejects sheddable requests (priority < 0) if the saturationDetector indicates that the system is currently +// saturated. Non-sheddable requests always bypass the saturation check. +type LegacyAdmissionController struct { + saturationDetector saturationDetector +} + +// NewLegacyAdmissionController creates a new LegacyAdmissionController. +func NewLegacyAdmissionController(sd saturationDetector) *LegacyAdmissionController { + return &LegacyAdmissionController{saturationDetector: sd} +} + +// Admit implements the AdmissionController interface for the legacy strategy. +// It checks for saturation only for requests with priority < 0. +func (lac *LegacyAdmissionController) Admit( + ctx context.Context, + reqCtx *handlers.RequestContext, + candidatePods []backendmetrics.PodMetrics, + priority int, +) error { + logger := log.FromContext(ctx) + logger.V(logutil.TRACE).Info("Executing LegacyAdmissionController", + "priority", priority, "fairnessID", reqCtx.FairnessID) + if err := rejectIfSheddableAndSaturated(ctx, lac.saturationDetector, reqCtx, candidatePods, priority); err != nil { + return err + } + logger.V(logutil.TRACE).Info("Request admitted", "requestID", reqCtx.SchedulingRequest.RequestId) + return nil +} + +// --- FlowControlAdmissionController --- + +const ( + // defaultFairnessID is the default fairness ID used when no ID is provided in the request. + // This ensures that requests without explicit fairness identifiers are still grouped and managed by the Flow Control + // system. + defaultFairnessID = "default-flow" +) + +// FlowControlAdmissionController delegates admission decisions to the Flow Control layer. +// It first checks if the request is sheddable and the system is saturated, rejecting immediately if both conditions are +// true. Otherwise, it uses the provided flowController to enqueue the request and await an outcome. +type FlowControlAdmissionController struct { + saturationDetector saturationDetector + flowController flowController +} + +// NewFlowControlAdmissionController creates a new FlowControlAdmissionController. +// It requires a SaturationDetector and a flowController instance. +func NewFlowControlAdmissionController(sd saturationDetector, fc flowController) *FlowControlAdmissionController { + return &FlowControlAdmissionController{ + saturationDetector: sd, + flowController: fc, + } +} + +// Admit implements the AdmissionController interface by checking for saturation on sheddable requests first, then +// deferring to the Flow Control system. +func (fcac *FlowControlAdmissionController) Admit( + ctx context.Context, + reqCtx *handlers.RequestContext, + candidatePods []backendmetrics.PodMetrics, + priority int, +) error { + logger := log.FromContext(ctx) + logger.V(logutil.TRACE).Info("Executing FlowControlAdmissionController", + "requestID", reqCtx.SchedulingRequest.RequestId, "priority", priority, "fairnessID", reqCtx.FairnessID) + if err := rejectIfSheddableAndSaturated(ctx, fcac.saturationDetector, reqCtx, candidatePods, priority); err != nil { + return err + } + + logger.V(logutil.TRACE).Info("Request proceeding to flow control", "requestID", reqCtx.SchedulingRequest.RequestId) + fairnessID := reqCtx.FairnessID + if fairnessID == "" { + logger.V(logutil.TRACE).Info("No fairnessID provided, using default", + "requestID", reqCtx.SchedulingRequest.RequestId, "defaultFairnessID", defaultFairnessID) + fairnessID = defaultFairnessID + } + + fcReq := &flowControlRequest{ + ctx: ctx, + requestID: reqCtx.SchedulingRequest.RequestId, + fairnessID: fairnessID, + priority: priority, + requestByteSize: uint64(reqCtx.RequestSize), + candidatePods: candidatePods, + } + + outcome, err := fcac.flowController.EnqueueAndWait(fcReq) + logger.V(logutil.DEBUG).Info("Flow control outcome", + "requestID", reqCtx.SchedulingRequest.RequestId, "outcome", outcome, "error", err) + return translateFlowControlOutcome(outcome, err) +} + +// flowControlRequest is an adapter that implements the types.FlowControlRequest interface. +type flowControlRequest struct { + ctx context.Context + requestID string + fairnessID string + priority int + requestByteSize uint64 + candidatePods []backendmetrics.PodMetrics +} + +var _ types.FlowControlRequest = &flowControlRequest{} + +func (r *flowControlRequest) Context() context.Context { return r.ctx } +func (r *flowControlRequest) ID() string { return r.requestID } +func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default. +func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize } +func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics { + return r.candidatePods +} +func (r *flowControlRequest) FlowKey() types.FlowKey { + return types.FlowKey{ID: r.fairnessID, Priority: r.priority} +} + +// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error +// contract used by the Director. +func translateFlowControlOutcome(outcome types.QueueOutcome, err error) error { + msg := "request rejected by flow control" + if err != nil { + msg = err.Error() + } + + switch outcome { + case types.QueueOutcomeDispatched: + return nil + case types.QueueOutcomeRejectedCapacity: + return errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: msg} + case types.QueueOutcomeEvictedTTL: + return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "request timed out in queue: " + msg} + case types.QueueOutcomeEvictedContextCancelled: + return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "client disconnected: " + msg} + case types.QueueOutcomeRejectedOther, types.QueueOutcomeEvictedOther: + return errutil.Error{Code: errutil.Internal, Msg: "internal flow control error: " + msg} + default: + return errutil.Error{Code: errutil.Internal, Msg: "unhandled flow control outcome: " + msg} + } +} diff --git a/pkg/epp/requestcontrol/admission_test.go b/pkg/epp/requestcontrol/admission_test.go new file mode 100644 index 000000000..002c50f06 --- /dev/null +++ b/pkg/epp/requestcontrol/admission_test.go @@ -0,0 +1,282 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package requestcontrol + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + fctypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" + schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" + errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" +) + +// --- Mocks --- + +type mockSaturationDetector struct { + isSaturated bool +} + +func (m *mockSaturationDetector) IsSaturated(_ context.Context, _ []backendmetrics.PodMetrics) bool { + return m.isSaturated +} + +type mockFlowController struct { + outcome fctypes.QueueOutcome + err error + called bool +} + +func (m *mockFlowController) EnqueueAndWait(_ fctypes.FlowControlRequest) (fctypes.QueueOutcome, error) { + m.called = true + return m.outcome, m.err +} + +func TestLegacyAdmissionController_Admit(t *testing.T) { + t.Parallel() + ctx := logutil.NewTestLoggerIntoContext(context.Background()) + candidatePods := []backendmetrics.PodMetrics{} + reqCtx := &handlers.RequestContext{ + SchedulingRequest: &schedulingtypes.LLMRequest{RequestId: "test-req"}, + } + + testCases := []struct { + name string + priority int + isSaturated bool + expectErr bool + expectErrCode string + expectErrSubstr string + }{ + { + name: "non_sheddable_saturated_admit", + priority: 0, + isSaturated: true, + expectErr: false, + }, + { + name: "sheddable_not_saturated_admit", + priority: -1, + isSaturated: false, + expectErr: false, + }, + { + name: "sheddable_saturated_reject", + priority: -1, + isSaturated: true, + expectErr: true, + expectErrCode: errutil.InferencePoolResourceExhausted, + expectErrSubstr: "system saturated, sheddable request dropped", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + saturationDetector := &mockSaturationDetector{isSaturated: tc.isSaturated} + ac := NewLegacyAdmissionController(saturationDetector) + + err := ac.Admit(ctx, reqCtx, candidatePods, tc.priority) + + if !tc.expectErr { + assert.NoError(t, err, "Admit() should not have returned an error for scenario: %s", tc.name) + } else { + require.Error(t, err, "Admit() should have returned an error for scenario: %s", tc.name) + var e errutil.Error + if assert.ErrorAs(t, err, &e, "error should be of type errutil.Error") { + assert.Equal(t, tc.expectErrCode, e.Code, "incorrect error code for scenario: %s", tc.name) + assert.Contains(t, e.Msg, tc.expectErrSubstr, "incorrect error message substring for scenario: %s", tc.name) + } + } + }) + } +} + +func TestFlowControlRequestAdapter(t *testing.T) { + t.Parallel() + ctx := context.Background() + candidatePods := []backendmetrics.PodMetrics{&backendmetrics.FakePodMetrics{}} + + testCases := []struct { + name string + requestID string + fairnessID string + priority int + requestByteSize uint64 + expectFlowKey fctypes.FlowKey + }{ + { + name: "simple", + requestID: "req-1", + fairnessID: "flow-1", + priority: 10, + requestByteSize: 1024, + expectFlowKey: fctypes.FlowKey{ID: "flow-1", Priority: 10}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + fcReq := &flowControlRequest{ + ctx: ctx, + requestID: tc.requestID, + fairnessID: tc.fairnessID, + priority: tc.priority, + requestByteSize: tc.requestByteSize, + candidatePods: candidatePods, + } + + assert.Equal(t, ctx, fcReq.Context(), "Context() mismatch") + assert.Equal(t, tc.requestID, fcReq.ID(), "ID() mismatch") + assert.Equal(t, tc.requestByteSize, fcReq.ByteSize(), "ByteSize() mismatch") + assert.Equal(t, candidatePods, fcReq.CandidatePodsForScheduling(), "CandidatePodsForScheduling() mismatch") + assert.Equal(t, tc.expectFlowKey, fcReq.FlowKey(), "FlowKey() mismatch") + assert.Zero(t, fcReq.InitialEffectiveTTL(), "InitialEffectiveTTL() should be zero") + }) + } +} +func TestFlowControlAdmissionController_Admit(t *testing.T) { + t.Parallel() + ctx := logutil.NewTestLoggerIntoContext(context.Background()) + candidatePods := []backendmetrics.PodMetrics{} + + reqCtx := &handlers.RequestContext{ + SchedulingRequest: &schedulingtypes.LLMRequest{RequestId: "test-req"}, + } + + testCases := []struct { + name string + priority int + isSaturated bool + fcOutcome fctypes.QueueOutcome + fcErr error + expectErr bool + expectErrCode string + expectErrSubstr string + expectFCSkipped bool + }{ + { + name: "sheddable_saturated_reject", + priority: -1, + isSaturated: true, + expectErr: true, + expectErrCode: errutil.InferencePoolResourceExhausted, + expectErrSubstr: "system saturated, sheddable request dropped", + expectFCSkipped: true, + }, + { + name: "sheddable_not_saturated_dispatch", + priority: -1, + isSaturated: false, + fcOutcome: fctypes.QueueOutcomeDispatched, + expectErr: false, + }, + { + name: "non_sheddable_saturated_dispatch", + priority: 0, + isSaturated: true, + fcOutcome: fctypes.QueueOutcomeDispatched, + expectErr: false, + }, + { + name: "fc_reject_capacity", + priority: 0, + fcOutcome: fctypes.QueueOutcomeRejectedCapacity, + expectErr: true, + expectErrCode: errutil.InferencePoolResourceExhausted, + expectErrSubstr: "request rejected by flow control", + }, + { + name: "fc_evict_ttl", + priority: 0, + fcOutcome: fctypes.QueueOutcomeEvictedTTL, + fcErr: errors.New("timeout"), + expectErr: true, + expectErrCode: errutil.ServiceUnavailable, + expectErrSubstr: "request timed out in queue: timeout", + }, + { + name: "fc_evict_context_cancelled", + priority: 0, + fcOutcome: fctypes.QueueOutcomeEvictedContextCancelled, + expectErr: true, + expectErrCode: errutil.ServiceUnavailable, + expectErrSubstr: "client disconnected", + }, + { + name: "fc_reject_other", + priority: 0, + fcOutcome: fctypes.QueueOutcomeRejectedOther, + expectErr: true, + expectErrCode: errutil.Internal, + expectErrSubstr: "internal flow control error", + }, + { + name: "fc_evict_other", + priority: 0, + fcOutcome: fctypes.QueueOutcomeEvictedOther, + fcErr: errors.New("internal error"), + expectErr: true, + expectErrCode: errutil.Internal, + expectErrSubstr: "internal flow control error: internal error", + }, + { + name: "fc_unhandled_outcome", + priority: 0, + fcOutcome: fctypes.QueueOutcomeNotYetFinalized, + expectErr: true, + expectErrCode: errutil.Internal, + expectErrSubstr: "unhandled flow control outcome", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + sd := &mockSaturationDetector{isSaturated: tc.isSaturated} + fc := &mockFlowController{outcome: tc.fcOutcome, err: tc.fcErr} + ac := NewFlowControlAdmissionController(sd, fc) + + err := ac.Admit(ctx, reqCtx, candidatePods, tc.priority) + + if tc.expectFCSkipped { + assert.False(t, fc.called, "FlowController should not have been called for scenario: %s", tc.name) + } else { + assert.True(t, fc.called, "FlowController should have been called for scenario: %s", tc.name) + } + + if !tc.expectErr { + assert.NoError(t, err, "Admit() returned an unexpected error for scenario: %s", tc.name) + } else { + require.Error(t, err, "Admit() should have returned an error for scenario: %s", tc.name) + var e errutil.Error + if assert.ErrorAs(t, err, &e, "error should be of type errutil.Error") { + assert.Equal(t, tc.expectErrCode, e.Code, "incorrect error code for scenario: %s", tc.name) + assert.Contains(t, e.Msg, tc.expectErrSubstr, "incorrect error message substring for scenario: %s", tc.name) + } + } + }) + } +} diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 3cabdb429..2f8c7389c 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -33,7 +33,6 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" @@ -55,50 +54,42 @@ type Scheduler interface { Schedule(ctx context.Context, request *schedulingtypes.LLMRequest, candidatePods []schedulingtypes.Pod) (result *schedulingtypes.SchedulingResult, err error) } -// SaturationDetector provides a signal indicating whether the backends are considered saturated. -type SaturationDetector interface { - IsSaturated(ctx context.Context, candidatePods []backendmetrics.PodMetrics) bool -} - -// flowController defines the minimal interface required by the Director for flow control. -type flowController interface { - EnqueueAndWait(req types.FlowControlRequest) (types.QueueOutcome, error) -} - // NewDirectorWithConfig creates a new Director instance with all dependencies. func NewDirectorWithConfig( datastore Datastore, scheduler Scheduler, - saturationDetector SaturationDetector, - fc flowController, + admissionController AdmissionController, config *Config, - enableFlowControl bool, ) *Director { return &Director{ datastore: datastore, scheduler: scheduler, - saturationDetector: saturationDetector, - flowController: fc, + admissionController: admissionController, preRequestPlugins: config.preRequestPlugins, postResponsePlugins: config.postResponsePlugins, defaultPriority: 0, // define default priority explicitly - enableFlowControl: enableFlowControl, } } -// Director orchestrates the request handling flow, including scheduling. +// Director orchestrates the request handling flow after initial parsing by the handler. +// Its responsibilities include: +// - Retrieving request metadata and relevant objectives. +// - Determining candidate pods. +// - Performing admission control via the AdmissionController. +// - Scheduling the request to target pod(s) via the Scheduler. +// - Running PreRequest plugins. +// - Preparing the request context for the Envoy ext_proc filter to route the request. +// - Running PostResponse plugins. type Director struct { datastore Datastore scheduler Scheduler - saturationDetector SaturationDetector - flowController flowController + admissionController AdmissionController preRequestPlugins []PreRequest postResponsePlugins []PostResponse // we just need a pointer to an int variable since priority is a pointer in InferenceObjective // no need to set this in the constructor, since the value we want is the default int val // and value types cannot be nil - defaultPriority int - enableFlowControl bool + defaultPriority int } // HandleRequest orchestrates the request lifecycle. @@ -157,8 +148,8 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"} } - // Admission Control check - if err := d.admitRequest(ctx, reqCtx, candidatePods, *infObjective.Spec.Priority); err != nil { + if err := d.admissionController.Admit(ctx, reqCtx, candidatePods, *infObjective.Spec.Priority); err != nil { + logger.V(logutil.DEFAULT).Info("Request rejected by admission control", "error", err) return reqCtx, err } @@ -224,47 +215,6 @@ func (d *Director) getCandidatePodsForScheduling(ctx context.Context, requestMet return podFilteredList } -// admitRequest handles admission control to decide whether or not to accept the request -// based on the request priority and saturation state. -func (d *Director) admitRequest(ctx context.Context, reqCtx *handlers.RequestContext, candidatePods []backendmetrics.PodMetrics, requestPriority int) error { - loggerTrace := log.FromContext(ctx).V(logutil.TRACE) - - loggerTrace.Info("Entering admission control", "priority", requestPriority, "fairnessID", reqCtx.FairnessID, "flowControlEnabled", d.flowController != nil) - - // This will be removed in favor of a more robust implementation (Flow Control) in the very near future. - // TODO: Make this a configurable value. - // Tracking issue https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1347 - isSheddable := requestPriority < 0 - if isSheddable && d.saturationDetector.IsSaturated(ctx, candidatePods) { - return errutil.Error{ - Code: errutil.InferencePoolResourceExhausted, - Msg: "system saturated, sheddable request dropped", - } - } - - if !d.enableFlowControl { - loggerTrace.Info("Non-sheddable request bypassing saturation check.") - return nil - } - - fairnessID := reqCtx.FairnessID - if fairnessID == "" { - fairnessID = "default-flow" - } - - fcReq := &flowControlRequest{ - ctx: ctx, - requestID: reqCtx.SchedulingRequest.RequestId, - fairnessID: fairnessID, - priority: requestPriority, - requestByteSize: uint64(reqCtx.RequestSize), - candidatePods: candidatePods, - } - - outcome, err := d.flowController.EnqueueAndWait(fcReq) - return translateFlowControlOutcome(outcome, err) -} - // prepareRequest populates the RequestContext and calls the registered PreRequest plugins // for allowing plugging customized logic based on the scheduling result. func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestContext, result *schedulingtypes.SchedulingResult) (*handlers.RequestContext, error) { @@ -356,53 +306,3 @@ func (d *Director) runPostResponsePlugins(ctx context.Context, request *scheduli loggerDebug.Info("Completed running post-response plugin successfully", "plugin", plugin.TypedName()) } } - -// --- Flow Control Integration --- - -// flowControlRequest is an adapter that implements the types.FlowControlRequest interface, wrapping the director's -// internal request context. -type flowControlRequest struct { - ctx context.Context - requestID string - fairnessID string - priority int - requestByteSize uint64 - candidatePods []backendmetrics.PodMetrics -} - -var _ types.FlowControlRequest = &flowControlRequest{} - -func (r *flowControlRequest) Context() context.Context { return r.ctx } -func (r *flowControlRequest) ID() string { return r.requestID } -func (r *flowControlRequest) InitialEffectiveTTL() time.Duration { return 0 } // Use controller default. -func (r *flowControlRequest) ByteSize() uint64 { return r.requestByteSize } -func (r *flowControlRequest) CandidatePodsForScheduling() []backendmetrics.PodMetrics { - return r.candidatePods -} -func (r *flowControlRequest) FlowKey() types.FlowKey { - return types.FlowKey{ID: r.fairnessID, Priority: r.priority} -} - -// translateFlowControlOutcome maps the context-rich outcome of the Flow Control layer to the public errutil.Error -// contract. -func translateFlowControlOutcome(outcome types.QueueOutcome, err error) error { - msg := "request rejected by flow control" - if err != nil { - msg = err.Error() - } - - switch outcome { - case types.QueueOutcomeDispatched: - return nil - case types.QueueOutcomeRejectedCapacity: - return errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: msg} - case types.QueueOutcomeEvictedTTL: - return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "request timed out in queue: " + msg} - case types.QueueOutcomeEvictedContextCancelled: - return errutil.Error{Code: errutil.ServiceUnavailable, Msg: "client disconnected: " + msg} - case types.QueueOutcomeRejectedOther, types.QueueOutcomeEvictedOther: - return errutil.Error{Code: errutil.Internal, Msg: msg} - default: - return errutil.Error{Code: errutil.Internal, Msg: "unhandled flow control outcome: " + msg} - } -} diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 125872144..777e28704 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -26,7 +26,6 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -39,7 +38,6 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" - fctypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" @@ -52,12 +50,17 @@ import ( // --- Mocks --- -type mockSaturationDetector struct { - isSaturated bool +type mockAdmissionController struct { + admitErr error } -func (m *mockSaturationDetector) IsSaturated(_ context.Context, _ []backendmetrics.PodMetrics) bool { - return m.isSaturated +func (m *mockAdmissionController) Admit( + _ context.Context, + _ *handlers.RequestContext, + _ []backendmetrics.PodMetrics, + _ int, +) error { + return m.admitErr } type mockScheduler struct { @@ -90,15 +93,6 @@ func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) return res } -type mockFlowController struct { - outcome fctypes.QueueOutcome - err error -} - -func (m *mockFlowController) EnqueueAndWait(_ fctypes.FlowControlRequest) (fctypes.QueueOutcome, error) { - return m.outcome, m.err -} - func TestDirector_HandleRequest(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) @@ -202,23 +196,23 @@ func TestDirector_HandleRequest(t *testing.T) { } tests := []struct { - name string - reqBodyMap map[string]any - mockSaturationDetector *mockSaturationDetector - inferenceObjectiveName string - schedulerMockSetup func(m *mockScheduler) - wantErrCode string // Expected errutil code string - wantReqCtx *handlers.RequestContext // Fields to check in the returned RequestContext - wantMutatedBodyModel string // Expected model in reqCtx.Request.Body after PostDispatch - targetModelName string // Expected model name after target model resolution + name string + reqBodyMap map[string]any + mockAdmissionController *mockAdmissionController + inferenceObjectiveName string + schedulerMockSetup func(m *mockScheduler) + wantErrCode string // Expected errutil code string + wantReqCtx *handlers.RequestContext // Fields to check in the returned RequestContext + wantMutatedBodyModel string // Expected model in reqCtx.Request.Body after PostDispatch + targetModelName string // Expected model name after target model resolution }{ { - name: "successful completions request (critical, saturation ignored)", + name: "successful completions request", reqBodyMap: map[string]any{ "model": model, "prompt": "critical prompt", }, - mockSaturationDetector: &mockSaturationDetector{isSaturated: true}, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleResults = defaultSuccessfulScheduleResults }, @@ -236,7 +230,7 @@ func TestDirector_HandleRequest(t *testing.T) { targetModelName: model, }, { - name: "successful chat completions request (default critical, saturation ignored)", + name: "successful chat completions request", reqBodyMap: map[string]any{ "model": model, "messages": []any{ @@ -246,7 +240,7 @@ func TestDirector_HandleRequest(t *testing.T) { }, }, }, - mockSaturationDetector: &mockSaturationDetector{isSaturated: true}, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleResults = defaultSuccessfulScheduleResults }, @@ -262,7 +256,7 @@ func TestDirector_HandleRequest(t *testing.T) { targetModelName: model, }, { - name: "successful chat completions request with multiple messages (critical, saturation ignored)", + name: "successful chat completions request with multiple messages", reqBodyMap: map[string]any{ "model": model, "messages": []any{ @@ -276,6 +270,7 @@ func TestDirector_HandleRequest(t *testing.T) { }, }, }, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleResults = defaultSuccessfulScheduleResults }, @@ -292,36 +287,13 @@ func TestDirector_HandleRequest(t *testing.T) { inferenceObjectiveName: objectiveName, targetModelName: model, }, - { - name: "successful completions request (sheddable, not saturated)", - reqBodyMap: map[string]any{ - "model": modelSheddable, - "prompt": "sheddable prompt", - }, - mockSaturationDetector: &mockSaturationDetector{isSaturated: false}, - schedulerMockSetup: func(m *mockScheduler) { - m.scheduleResults = defaultSuccessfulScheduleResults - }, - wantReqCtx: &handlers.RequestContext{ - ObjectiveKey: objectiveNameSheddable, - TargetModelName: modelSheddable, - TargetPod: &backend.Pod{ - NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"}, - Address: "192.168.1.100", - }, - TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", - }, - wantMutatedBodyModel: modelSheddable, - inferenceObjectiveName: objectiveNameSheddable, - targetModelName: modelSheddable, - }, { name: "successful request with target model resolution", reqBodyMap: map[string]any{ "model": modelWithResolvedTarget, "prompt": "prompt for target resolution", }, - mockSaturationDetector: &mockSaturationDetector{isSaturated: false}, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleResults = defaultSuccessfulScheduleResults }, @@ -357,26 +329,26 @@ func TestDirector_HandleRequest(t *testing.T) { "model": "food-review-1", "prompt": "test prompt", }, - mockSaturationDetector: &mockSaturationDetector{isSaturated: false}, - inferenceObjectiveName: "food-review-1", - targetModelName: "food-review-1", + mockAdmissionController: &mockAdmissionController{admitErr: nil}, + inferenceObjectiveName: "food-review-1", + targetModelName: "food-review-1", }, { - name: "request dropped (sheddable, saturated)", + name: "request rejected by admission controller", reqBodyMap: map[string]any{ "model": modelSheddable, "prompt": "sheddable prompt", }, - inferenceObjectiveName: objectiveNameSheddable, - mockSaturationDetector: &mockSaturationDetector{isSaturated: true}, - wantErrCode: errutil.InferencePoolResourceExhausted, + inferenceObjectiveName: objectiveNameSheddable, + mockAdmissionController: &mockAdmissionController{admitErr: errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: "simulated admission rejection"}}, + wantErrCode: errutil.InferencePoolResourceExhausted, }, { - name: "model not found, expect err", - reqBodyMap: map[string]any{"prompt": "p"}, - mockSaturationDetector: &mockSaturationDetector{isSaturated: false}, - wantErrCode: errutil.BadRequest, + name: "model not found, expect err", + reqBodyMap: map[string]any{"prompt": "p"}, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, + wantErrCode: errutil.BadRequest, }, { name: "prompt or messages not found, expect err", @@ -397,6 +369,7 @@ func TestDirector_HandleRequest(t *testing.T) { "model": model, "prompt": "prompt that causes scheduler error", }, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleErr = errors.New("simulated scheduler failure") }, @@ -409,6 +382,7 @@ func TestDirector_HandleRequest(t *testing.T) { "model": model, "prompt": "prompt for nil,nil scheduler return", }, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleResults = nil m.scheduleErr = nil @@ -424,7 +398,7 @@ func TestDirector_HandleRequest(t *testing.T) { if test.schedulerMockSetup != nil { test.schedulerMockSetup(mockSched) } - director := NewDirectorWithConfig(ds, mockSched, test.mockSaturationDetector, nil, NewConfig(), false) + director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, NewConfig()) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -472,158 +446,6 @@ func TestDirector_HandleRequest(t *testing.T) { } } -func TestAdmitRequest(t *testing.T) { - t.Parallel() - ctx := logutil.NewTestLoggerIntoContext(context.Background()) - candidatePods := []backendmetrics.PodMetrics{} - reqCtx := &handlers.RequestContext{ - SchedulingRequest: &schedulingtypes.LLMRequest{}, - } - - t.Run("flow control disabled", func(t *testing.T) { - t.Parallel() - testCases := []struct { - name string - priority int - isSaturated bool - expectErr bool - expectErrCode string - expectErrSubstr string - }{ - { - name: "should admit non-sheddable request when saturated", - priority: 0, - isSaturated: true, - expectErr: false, - }, - { - name: "should admit sheddable request when not saturated", - priority: -1, - isSaturated: false, - expectErr: false, - }, - { - name: "should reject sheddable request when saturated", - priority: -1, - isSaturated: true, - expectErr: true, - expectErrCode: errutil.InferencePoolResourceExhausted, - expectErrSubstr: "system saturated, sheddable request dropped", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - // --- ARRANGE --- - saturationDetector := &mockSaturationDetector{isSaturated: tc.isSaturated} - director := NewDirectorWithConfig(nil, nil, saturationDetector, nil, NewConfig(), false) // FC disabled - - // --- ACT --- - err := director.admitRequest(ctx, reqCtx, candidatePods, tc.priority) - - // --- ASSERT --- - if !tc.expectErr { - assert.NoError(t, err, "expected no error for this scenario") - } else { - require.Error(t, err, "expected an error for this scenario") - var e errutil.Error - if assert.ErrorAs(t, err, &e, "error should be of type errutil.Error") { - assert.Equal(t, tc.expectErrCode, e.Code, "error code should match expected") - assert.Contains(t, e.Msg, tc.expectErrSubstr, "error message should contain expected substring") - } - } - }) - } - }) - - t.Run("flow control enabled", func(t *testing.T) { - t.Parallel() - testCases := []struct { - name string - fcOutcome fctypes.QueueOutcome - fcErr error - expectErr bool - expectErrCode string - expectErrSubstr string - }{ - { - name: "should admit when flow controller dispatches", - fcOutcome: fctypes.QueueOutcomeDispatched, - expectErr: false, - }, - { - name: "should reject on capacity rejection", - fcOutcome: fctypes.QueueOutcomeRejectedCapacity, - expectErr: true, - expectErrCode: errutil.InferencePoolResourceExhausted, - expectErrSubstr: "request rejected by flow control", - }, - { - name: "should reject on TTL eviction", - fcOutcome: fctypes.QueueOutcomeEvictedTTL, - fcErr: errors.New("timeout"), - expectErr: true, - expectErrCode: errutil.ServiceUnavailable, - expectErrSubstr: "request timed out in queue: timeout", - }, - { - name: "should reject on context cancelled eviction", - fcOutcome: fctypes.QueueOutcomeEvictedContextCancelled, - expectErr: true, - expectErrCode: errutil.ServiceUnavailable, - expectErrSubstr: "client disconnected", - }, - { - name: "should return internal error on other rejection", - fcOutcome: fctypes.QueueOutcomeRejectedOther, - expectErr: true, - expectErrCode: errutil.Internal, - expectErrSubstr: "request rejected by flow control", - }, - { - name: "should return internal error on other eviction", - fcOutcome: fctypes.QueueOutcomeEvictedOther, - fcErr: errors.New("internal error"), - expectErr: true, - expectErrCode: errutil.Internal, - expectErrSubstr: "internal error", - }, - { - name: "should return internal error on unhandled outcome", - fcOutcome: fctypes.QueueOutcomeNotYetFinalized, - expectErr: true, - expectErrCode: errutil.Internal, - expectErrSubstr: "unhandled flow control outcome", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - // --- ARRANGE --- - fc := &mockFlowController{outcome: tc.fcOutcome, err: tc.fcErr} - director := NewDirectorWithConfig(nil, nil, nil, fc, NewConfig(), true) // FC enabled - - // --- ACT --- - err := director.admitRequest(ctx, reqCtx, candidatePods, 0) - - // --- ASSERT --- - if !tc.expectErr { - assert.NoError(t, err, "expected no error for this scenario") - } else { - require.Error(t, err, "expected an error for this scenario") - var e errutil.Error - if assert.ErrorAs(t, err, &e, "error should be of type errutil.Error") { - assert.Equal(t, tc.expectErrCode, e.Code, "error code should match expected") - assert.Contains(t, e.Msg, tc.expectErrSubstr, "error message should contain expected substring") - } - } - }) - } - }) -} - // TestGetCandidatePodsForScheduling is testing getCandidatePodsForScheduling and more specifically the functionality of SubsetFilter. func TestGetCandidatePodsForScheduling(t *testing.T) { var makeFilterMetadata = func(data []any) map[string]any { @@ -695,7 +517,7 @@ func TestGetCandidatePodsForScheduling(t *testing.T) { ds := &mockDatastore{pods: testInput} for _, test := range tests { t.Run(test.name, func(t *testing.T) { - director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockSaturationDetector{}, nil, NewConfig(), false) + director := NewDirectorWithConfig(ds, &mockScheduler{}, &mockAdmissionController{}, NewConfig()) got := director.getCandidatePodsForScheduling(context.Background(), test.metadata) @@ -764,7 +586,7 @@ func TestDirector_HandleResponse(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) ds := datastore.NewDatastore(t.Context(), nil) mockSched := &mockScheduler{} - director := NewDirectorWithConfig(ds, mockSched, nil, nil, NewConfig().WithPostResponsePlugins(pr1), false) + director := NewDirectorWithConfig(ds, mockSched, &mockAdmissionController{}, NewConfig().WithPostResponsePlugins(pr1)) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index 870acc70b..1d9d1d114 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -42,6 +42,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector" ) // ExtProcServerRunner provides methods to manage an external process server. @@ -56,7 +57,7 @@ type ExtProcServerRunner struct { RefreshPrometheusMetricsInterval time.Duration MetricsStalenessThreshold time.Duration Director *requestcontrol.Director - SaturationDetector requestcontrol.SaturationDetector + SaturationDetector *saturationdetector.Detector UseExperimentalDatalayerV2 bool // Pluggable data layer feature flag // This should only be used in tests. We won't need this once we do not inject metrics in the tests. diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 0abcf2462..c2e100f79 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -1180,7 +1180,8 @@ func BeforeSuite() func() { } detector := saturationdetector.NewDetector(sdConfig, logger.WithName("saturation-detector")) serverRunner.SaturationDetector = detector - serverRunner.Director = requestcontrol.NewDirectorWithConfig(serverRunner.Datastore, scheduler, detector, nil, requestcontrol.NewConfig(), false) + admissionController := requestcontrol.NewLegacyAdmissionController(detector) + serverRunner.Director = requestcontrol.NewDirectorWithConfig(serverRunner.Datastore, scheduler, admissionController, requestcontrol.NewConfig()) serverRunner.SecureServing = false if err := serverRunner.SetupWithManager(context.Background(), mgr); err != nil { From 2f14af215aff800dd168eaceab9be9cf9ad07fd7 Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Mon, 13 Oct 2025 22:44:43 +0000 Subject: [PATCH 3/4] feat: Address feedback on admission control - Introduce requtil.IsSheddable() for clarity - Move FairnessID defaulting to handler package - Add test for default FairnessID --- pkg/epp/handlers/request.go | 12 ++++++++++++ pkg/epp/handlers/request_test.go | 30 +++++++++++++++++++++++++++++ pkg/epp/requestcontrol/admission.go | 18 +++-------------- pkg/epp/util/request/sheddable.go | 22 +++++++++++++++++++++ 4 files changed, 67 insertions(+), 15 deletions(-) create mode 100644 pkg/epp/util/request/sheddable.go diff --git a/pkg/epp/handlers/request.go b/pkg/epp/handlers/request.go index 7f8122195..0e04289a7 100644 --- a/pkg/epp/handlers/request.go +++ b/pkg/epp/handlers/request.go @@ -29,6 +29,13 @@ import ( errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" ) +const ( + // defaultFairnessID is the default fairness ID used when no ID is provided in the request. + // This ensures that requests without explicit fairness identifiers are still grouped and managed by the Flow Control + // system. + defaultFairnessID = "default-flow" +) + func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest_RequestHeaders) error { reqCtx.RequestReceivedTimestamp = time.Now() @@ -80,6 +87,11 @@ func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extP delete(reqCtx.Request.Headers, header.Key) } } + + if reqCtx.FairnessID == "" { + reqCtx.FairnessID = defaultFairnessID + } + return nil } diff --git a/pkg/epp/handlers/request_test.go b/pkg/epp/handlers/request_test.go index 4ae207803..a3ef90cb4 100644 --- a/pkg/epp/handlers/request_test.go +++ b/pkg/epp/handlers/request_test.go @@ -21,6 +21,7 @@ import ( configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/stretchr/testify/assert" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" ) @@ -66,3 +67,32 @@ func TestHandleRequestHeaders(t *testing.T) { t.Errorf("expected fairness ID header to be removed from request headers, but it was not") } } + +func TestHandleRequestHeaders_DefaultFairnessID(t *testing.T) { + t.Parallel() + + server := &StreamingServer{} + reqCtx := &RequestContext{ + Request: &Request{ + Headers: make(map[string]string), + }, + } + + req := &extProcPb.ProcessingRequest_RequestHeaders{ + RequestHeaders: &extProcPb.HttpHeaders{ + Headers: &configPb.HeaderMap{ + Headers: []*configPb.HeaderValue{ + { + Key: "x-test-header", + Value: "test-value", + }, + }, + }, + EndOfStream: false, + }, + } + + err := server.HandleRequestHeaders(reqCtx, req) + assert.NoError(t, err, "expected no error") + assert.Equal(t, defaultFairnessID, reqCtx.FairnessID, "expected fairness ID to be defaulted") +} diff --git a/pkg/epp/requestcontrol/admission.go b/pkg/epp/requestcontrol/admission.go index 85faab5bc..f3b1919d4 100644 --- a/pkg/epp/requestcontrol/admission.go +++ b/pkg/epp/requestcontrol/admission.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" ) // AdmissionController defines the interface for making admission control decisions. @@ -73,7 +74,7 @@ func rejectIfSheddableAndSaturated( candidatePods []backendmetrics.PodMetrics, priority int, ) error { - if priority < 0 { + if requtil.IsSheddable(priority) { logger := log.FromContext(ctx) if sd.IsSaturated(ctx, candidatePods) { logger.V(logutil.TRACE).Info("Request rejected: system saturated and request is sheddable", @@ -121,13 +122,6 @@ func (lac *LegacyAdmissionController) Admit( // --- FlowControlAdmissionController --- -const ( - // defaultFairnessID is the default fairness ID used when no ID is provided in the request. - // This ensures that requests without explicit fairness identifiers are still grouped and managed by the Flow Control - // system. - defaultFairnessID = "default-flow" -) - // FlowControlAdmissionController delegates admission decisions to the Flow Control layer. // It first checks if the request is sheddable and the system is saturated, rejecting immediately if both conditions are // true. Otherwise, it uses the provided flowController to enqueue the request and await an outcome. @@ -161,17 +155,11 @@ func (fcac *FlowControlAdmissionController) Admit( } logger.V(logutil.TRACE).Info("Request proceeding to flow control", "requestID", reqCtx.SchedulingRequest.RequestId) - fairnessID := reqCtx.FairnessID - if fairnessID == "" { - logger.V(logutil.TRACE).Info("No fairnessID provided, using default", - "requestID", reqCtx.SchedulingRequest.RequestId, "defaultFairnessID", defaultFairnessID) - fairnessID = defaultFairnessID - } fcReq := &flowControlRequest{ ctx: ctx, requestID: reqCtx.SchedulingRequest.RequestId, - fairnessID: fairnessID, + fairnessID: reqCtx.FairnessID, priority: priority, requestByteSize: uint64(reqCtx.RequestSize), candidatePods: candidatePods, diff --git a/pkg/epp/util/request/sheddable.go b/pkg/epp/util/request/sheddable.go new file mode 100644 index 000000000..c2f32c1f2 --- /dev/null +++ b/pkg/epp/util/request/sheddable.go @@ -0,0 +1,22 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package request + +// IsSheddable determines if a request is considered sheddable based on its priority. +func IsSheddable(priority int) bool { + return priority < 0 +} From 0919d3fc35dd205aeaf0838d51753144cb69598d Mon Sep 17 00:00:00 2001 From: Luke Van Drie Date: Mon, 13 Oct 2025 23:19:21 +0000 Subject: [PATCH 4/4] fix: admission.go boilerplate header --- pkg/epp/requestcontrol/admission.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/epp/requestcontrol/admission.go b/pkg/epp/requestcontrol/admission.go index f3b1919d4..383d2844a 100644 --- a/pkg/epp/requestcontrol/admission.go +++ b/pkg/epp/requestcontrol/admission.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,