From b43c44d9719fb739a16fc9fdfe7efbd5eb27e4c4 Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Thu, 23 Oct 2025 01:17:38 +0000 Subject: [PATCH 01/12] Refactor director to split into smaller functions --- pkg/epp/requestcontrol/director.go | 56 ++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index f6f7deebe..74f1eb8ce 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -26,6 +26,7 @@ import ( "strings" "time" + "github.com/go-logr/logr" "sigs.k8s.io/controller-runtime/pkg/log" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" @@ -89,42 +90,59 @@ type Director struct { defaultPriority int } -// HandleRequest orchestrates the request lifecycle. -// It always returns the requestContext even in the error case, as the request context is used in error handling. -func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { - logger := log.FromContext(ctx) +// getInferenceObjective creates inferenceObjective based on reqCtx. +func (d *Director) getInferenceObjective(logger logr.Logger, reqCtx *handlers.RequestContext) *v1alpha2.InferenceObjective { + infObjective := d.datastore.ObjectiveGet(reqCtx.ObjectiveKey) + if infObjective == nil { + logger.V(logutil.VERBOSE).Info("No associated InferenceObjective found, using default", "objectiveKey", reqCtx.ObjectiveKey) + infObjective = &v1alpha2.InferenceObjective{ + Spec: v1alpha2.InferenceObjectiveSpec{ + Priority: &d.defaultPriority, + }, + } + } else if infObjective.Spec.Priority == nil { + // Default to 0 if not specified. + infObjective.Spec.Priority = &d.defaultPriority + } + return infObjective +} - // Parse Request, Resolve Target Models, and Determine Parameters +// resolveTargetModel is a helper that resolves targetModel +// and updates the reqCtx and ctx. +func (d *Director) resolveTargetModel(reqCtx *handlers.RequestContext) error { requestBodyMap := reqCtx.Request.Body var ok bool reqCtx.IncomingModelName, ok = requestBodyMap["model"].(string) - if !ok { - return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request body"} + return errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request body"} } if reqCtx.TargetModelName == "" { // Default to incoming model name reqCtx.TargetModelName = reqCtx.IncomingModelName } reqCtx.Request.Body["model"] = reqCtx.TargetModelName + return nil +} + +// HandleRequest orchestrates the request lifecycle. +// It always returns the requestContext even in the error case, as the request context is used in error handling. +func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { + logger := log.FromContext(ctx) + + // Resolve target model and update req context. + err := d.resolveTargetModel(reqCtx) + if err != nil { + return reqCtx, err + } + // Parse request body. requestBody, err := requtil.ExtractRequestBody(reqCtx.Request.Body) if err != nil { return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: fmt.Errorf("failed to extract request data: %w", err).Error()} } - infObjective := d.datastore.ObjectiveGet(reqCtx.ObjectiveKey) - if infObjective == nil { - logger.V(logutil.VERBOSE).Info("No associated InferenceObjective found, using default", "objectiveKey", reqCtx.ObjectiveKey) - infObjective = &v1alpha2.InferenceObjective{ - Spec: v1alpha2.InferenceObjectiveSpec{ - Priority: &d.defaultPriority, - }, - } - } else if infObjective.Spec.Priority == nil { - // Default to 0 if not specified. - infObjective.Spec.Priority = &d.defaultPriority - } + // Parse inference objective. + infObjective := d.getInferenceObjective(logger, reqCtx) // Prepare LLMRequest (needed for both saturation detection and Scheduler) reqCtx.SchedulingRequest = &schedulingtypes.LLMRequest{ From ab723fe148fedd9c748f2a6847385c10c711ad7d Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Thu, 30 Oct 2025 19:16:50 +0000 Subject: [PATCH 02/12] Add AdmitRequest and PrepareData plugins --- pkg/epp/datalayer/attributemap.go | 2 + pkg/epp/requestcontrol/director.go | 36 +++++++- pkg/epp/requestcontrol/plugins.go | 12 +++ .../requestcontrol/request_control_config.go | 4 + pkg/epp/scheduling/types/types.go | 86 +++++++++++++++++++ 5 files changed, 139 insertions(+), 1 deletion(-) diff --git a/pkg/epp/datalayer/attributemap.go b/pkg/epp/datalayer/attributemap.go index 614bf57bb..8b213915e 100644 --- a/pkg/epp/datalayer/attributemap.go +++ b/pkg/epp/datalayer/attributemap.go @@ -20,6 +20,8 @@ import ( "sync" ) +// TODO(rahulgurnani): Deprecate this AttributeMap in favor of AttributeMap added in scheduling layer. + // Cloneable types support cloning of the value. type Cloneable interface { Clone() Cloneable diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 74f1eb8ce..9d9dd618e 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -36,6 +36,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" @@ -167,8 +168,18 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo logger.V(logutil.DEFAULT).Info("Request rejected by admission control", "error", err) return reqCtx, err } + copyOfCandidatePods := d.toSchedulerPodMetrics(candidatePods) - result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, d.toSchedulerPodMetrics(candidatePods)) + // Prepare per request data + d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, copyOfCandidatePods) + + // Run admit request plugins + if !d.runAdmitRequestPlugins(ctx, reqCtx.SchedulingRequest, copyOfCandidatePods) { + logger.V(logutil.DEFAULT).Info("Request cannot be admitted") + return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "request cannot be admitted"} + } + + result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, copyOfCandidatePods) if err != nil { return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()} } @@ -333,6 +344,29 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling } } +func (d *Director) runPrepareDataPlugins(ctx context.Context, + request *schedulingtypes.LLMRequest, pods []types.Pod) { + loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) + for _, plugin := range d.requestControlPlugins.prepareDataPlugins { + loggerDebug.Info("Running PrepareData plugin", "plugin", plugin.TypedName()) + plugin.PrepareData(ctx, request, pods) + loggerDebug.Info("Completed running PrepareData plugin successfully", "plugin", plugin.TypedName()) + } +} + +func (d *Director) runAdmitRequestPlugins(ctx context.Context, + request *schedulingtypes.LLMRequest, pods []types.Pod) bool { + loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) + for _, plugin := range d.requestControlPlugins.admitRequestPlugins { + loggerDebug.Info("Running AdmitRequest plugin", "plugin", plugin.TypedName()) + if !plugin.Admit(ctx, request, pods) { + return false + } + loggerDebug.Info("Completed running AdmitRequest plugin successfully", "plugin", plugin.TypedName()) + } + return true +} + func (d *Director) runResponseReceivedPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) for _, plugin := range d.requestControlPlugins.responseReceivedPlugins { diff --git a/pkg/epp/requestcontrol/plugins.go b/pkg/epp/requestcontrol/plugins.go index 30f31f070..e35299d36 100644 --- a/pkg/epp/requestcontrol/plugins.go +++ b/pkg/epp/requestcontrol/plugins.go @@ -57,3 +57,15 @@ type ResponseComplete interface { plugins.Plugin ResponseComplete(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) } + +// PrepareData is called by the director before scheduling requests. +type PrepareData interface { + plugins.Plugin + PrepareData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) +} + +// AdmitRequest is called by the director after the PrepareData plugins and before scheduling. +type AdmitRequest interface { + plugins.Plugin + Admit(ctx context.Context, request *types.LLMRequest, pods []types.Pod) bool +} diff --git a/pkg/epp/requestcontrol/request_control_config.go b/pkg/epp/requestcontrol/request_control_config.go index ffa6c6609..c492b8355 100644 --- a/pkg/epp/requestcontrol/request_control_config.go +++ b/pkg/epp/requestcontrol/request_control_config.go @@ -23,6 +23,8 @@ import ( // NewConfig creates a new Config object and returns its pointer. func NewConfig() *Config { return &Config{ + admitRequestPlugins: []AdmitRequest{}, + prepareDataPlugins: []PrepareData{}, preRequestPlugins: []PreRequest{}, responseReceivedPlugins: []ResponseReceived{}, responseStreamingPlugins: []ResponseStreaming{}, @@ -32,6 +34,8 @@ func NewConfig() *Config { // Config provides a configuration for the requestcontrol plugins. type Config struct { + admitRequestPlugins []AdmitRequest + prepareDataPlugins []PrepareData preRequestPlugins []PreRequest responseReceivedPlugins []ResponseReceived responseStreamingPlugins []ResponseStreaming diff --git a/pkg/epp/scheduling/types/types.go b/pkg/epp/scheduling/types/types.go index 6f9bec8ad..2983c2f05 100644 --- a/pkg/epp/scheduling/types/types.go +++ b/pkg/epp/scheduling/types/types.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "strings" + "sync" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" @@ -191,6 +192,9 @@ type Pod interface { GetPod() *backend.Pod GetMetrics() *backendmetrics.MetricsState String() string + Put(key string, value Cloneable) + Get(key string) (Cloneable, bool) + Keys() []string } type ScoredPod struct { @@ -217,6 +221,19 @@ func (pm *PodMetrics) GetMetrics() *backendmetrics.MetricsState { type PodMetrics struct { *backend.Pod *backendmetrics.MetricsState + AttributeMap +} + +func (pm *PodMetrics) Put(key string, value Cloneable) { + pm.AttributeMap.Put(key, value) +} + +func (pm *PodMetrics) Get(key string) (Cloneable, bool) { + return pm.AttributeMap.Get(key) +} + +func (pm *PodMetrics) Keys() []string { + return pm.AttributeMap.Keys() } // ProfileRunResult captures the profile run result. @@ -229,3 +246,72 @@ type SchedulingResult struct { ProfileResults map[string]*ProfileRunResult PrimaryProfileName string } + +// Cloneable types support cloning of the value. +type Cloneable interface { + Clone() Cloneable +} + +// AttributeMap is used to store flexible metadata or traits +// across different aspects of an inference server. +// Stored values must be Cloneable. +type AttributeMap interface { + Put(string, Cloneable) + Get(string) (Cloneable, bool) + Keys() []string +} + +// Attributes provides a goroutine-safe implementation of AttributeMap. +type Attributes struct { + data sync.Map // key: attribute name (string), value: attribute value (opaque, Cloneable) +} + +// NewAttributes returns a new instance of Attributes. +func NewAttributes() *Attributes { + return &Attributes{} +} + +// Put adds or updates an attribute in the map. +func (a *Attributes) Put(key string, value Cloneable) { + if value != nil { + a.data.Store(key, value) // TODO: Clone into map to ensure isolation + } +} + +// Get retrieves an attribute by key, returning a cloned copy. +func (a *Attributes) Get(key string) (Cloneable, bool) { + val, ok := a.data.Load(key) + if !ok { + return nil, false + } + if cloneable, ok := val.(Cloneable); ok { + return cloneable.Clone(), true + } + return nil, false +} + +// Keys returns all keys in the attribute map. +func (a *Attributes) Keys() []string { + var keys []string + a.data.Range(func(key, _ any) bool { + if sk, ok := key.(string); ok { + keys = append(keys, sk) + } + return true + }) + return keys +} + +// Clone creates a deep copy of the entire attribute map. +func (a *Attributes) Clone() *Attributes { + clone := NewAttributes() + a.data.Range(func(key, value any) bool { + if sk, ok := key.(string); ok { + if v, ok := value.(Cloneable); ok { + clone.Put(sk, v) + } + } + return true + }) + return clone +} From 8cd3b612ad0b05088004b02e2c948392a632fd3f Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Fri, 31 Oct 2025 16:05:24 +0000 Subject: [PATCH 03/12] Add unit tests and comments --- pkg/epp/requestcontrol/director.go | 13 +- pkg/epp/requestcontrol/director_test.go | 118 +++++++++++++++++- .../requestcontrol/request_control_config.go | 12 ++ 3 files changed, 136 insertions(+), 7 deletions(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 9d9dd618e..9f312dc0c 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -91,7 +91,7 @@ type Director struct { defaultPriority int } -// getInferenceObjective creates inferenceObjective based on reqCtx. +// getInferenceObjective fetches the inferenceObjective from the datastore otherwise creates a new one based on reqCtx. func (d *Director) getInferenceObjective(logger logr.Logger, reqCtx *handlers.RequestContext) *v1alpha2.InferenceObjective { infObjective := d.datastore.ObjectiveGet(reqCtx.ObjectiveKey) if infObjective == nil { @@ -109,7 +109,7 @@ func (d *Director) getInferenceObjective(logger logr.Logger, reqCtx *handlers.Re } // resolveTargetModel is a helper that resolves targetModel -// and updates the reqCtx and ctx. +// and updates the reqCtx. func (d *Director) resolveTargetModel(reqCtx *handlers.RequestContext) error { requestBodyMap := reqCtx.Request.Body var ok bool @@ -168,18 +168,19 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo logger.V(logutil.DEFAULT).Info("Request rejected by admission control", "error", err) return reqCtx, err } - copyOfCandidatePods := d.toSchedulerPodMetrics(candidatePods) + snapshotOfCandidatePods := d.toSchedulerPodMetrics(candidatePods) // Prepare per request data - d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, copyOfCandidatePods) + // TODO(rahulgurnani): Add retries and timeout in the preparedata step. + d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) // Run admit request plugins - if !d.runAdmitRequestPlugins(ctx, reqCtx.SchedulingRequest, copyOfCandidatePods) { + if !d.runAdmitRequestPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) { logger.V(logutil.DEFAULT).Info("Request cannot be admitted") return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "request cannot be admitted"} } - result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, copyOfCandidatePods) + result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) if err != nil { return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()} } diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 8cb9c91a5..89b5d77a0 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -93,6 +93,45 @@ func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) return res } +type mockPrepareRequestDataPlugin struct { + tn plugins.TypedName + prepareDataCalled bool +} + +func newmockPrepareRequestDataPlugin(name string) *mockPrepareRequestDataPlugin { + return &mockPrepareRequestDataPlugin{ + tn: plugins.TypedName{Type: "mock-prepare-request-data", Name: name}, + } +} + +func (m *mockPrepareRequestDataPlugin) TypedName() plugins.TypedName { + return m.tn +} + +func (m *mockPrepareRequestDataPlugin) PrepareData(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) { + m.prepareDataCalled = true +} + +type mockAdmitRequestPlugins struct { + tn plugins.TypedName + admitRequestCalled bool +} + +func newmockAdmitRequestPlugins(name string) *mockAdmitRequestPlugins { + return &mockAdmitRequestPlugins{ + tn: plugins.TypedName{Type: "mock-admit-data", Name: name}, + } +} + +func (m *mockAdmitRequestPlugins) TypedName() plugins.TypedName { + return m.tn +} + +func (m *mockAdmitRequestPlugins) Admit(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) bool { + m.admitRequestCalled = true + return true +} + func TestDirector_HandleRequest(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) @@ -211,6 +250,10 @@ func TestDirector_HandleRequest(t *testing.T) { wantReqCtx *handlers.RequestContext // Fields to check in the returned RequestContext wantMutatedBodyModel string // Expected model in reqCtx.Request.Body after PostDispatch targetModelName string // Expected model name after target model resolution + prepareDataCalled bool + admitRequestCalled bool + prepareDataPlugins *mockPrepareRequestDataPlugin + admitRequestPlugins *mockAdmitRequestPlugins }{ { name: "successful completions request", @@ -265,6 +308,66 @@ func TestDirector_HandleRequest(t *testing.T) { wantMutatedBodyModel: model, targetModelName: model, }, + { + name: "successful chat completions request with prepare data plugins", + reqBodyMap: map[string]any{ + "model": model, + "messages": []any{ + map[string]any{ + "role": "user", + "content": "critical prompt", + }, + }, + }, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, + schedulerMockSetup: func(m *mockScheduler) { + m.scheduleResults = defaultSuccessfulScheduleResults + }, + wantReqCtx: &handlers.RequestContext{ + TargetModelName: model, + TargetPod: &backend.Pod{ + NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"}, + Address: "192.168.1.100", + Port: "8000", + MetricsHost: "192.168.1.100:8000", + }, + TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", + }, + wantMutatedBodyModel: model, + targetModelName: model, + prepareDataCalled: true, + prepareDataPlugins: newmockPrepareRequestDataPlugin("test-plugin"), + }, + { + name: "successful chat completions request with admit request plugins", + reqBodyMap: map[string]any{ + "model": model, + "messages": []any{ + map[string]any{ + "role": "user", + "content": "critical prompt", + }, + }, + }, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, + schedulerMockSetup: func(m *mockScheduler) { + m.scheduleResults = defaultSuccessfulScheduleResults + }, + wantReqCtx: &handlers.RequestContext{ + TargetModelName: model, + TargetPod: &backend.Pod{ + NamespacedName: types.NamespacedName{Namespace: "default", Name: "pod1"}, + Address: "192.168.1.100", + Port: "8000", + MetricsHost: "192.168.1.100:8000", + }, + TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", + }, + wantMutatedBodyModel: model, + targetModelName: model, + admitRequestCalled: true, + admitRequestPlugins: newmockAdmitRequestPlugins("test-plugin"), + }, { name: "successful chat completions request with multiple messages", reqBodyMap: map[string]any{ @@ -414,7 +517,14 @@ func TestDirector_HandleRequest(t *testing.T) { if test.schedulerMockSetup != nil { test.schedulerMockSetup(mockSched) } - director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, NewConfig()) + config := NewConfig() + if test.prepareDataPlugins != nil { + config = config.WithPrepareDataPlugins(test.prepareDataPlugins) + } + if test.admitRequestPlugins != nil { + config = config.WithAdmitRequestPlugins(test.admitRequestPlugins) + } + director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, config) reqCtx := &handlers.RequestContext{ Request: &handlers.Request{ @@ -458,6 +568,12 @@ func TestDirector_HandleRequest(t *testing.T) { assert.Equal(t, test.wantMutatedBodyModel, returnedReqCtx.Request.Body["model"], "Mutated reqCtx.Request.Body model mismatch") } + if test.admitRequestPlugins != nil { + assert.True(t, test.admitRequestPlugins.admitRequestCalled, "AdmitRequestPlugins not called") + } + if test.prepareDataPlugins != nil { + assert.True(t, test.prepareDataPlugins.prepareDataCalled, "PrepareDataPlugins not called") + } }) } } diff --git a/pkg/epp/requestcontrol/request_control_config.go b/pkg/epp/requestcontrol/request_control_config.go index c492b8355..40618c215 100644 --- a/pkg/epp/requestcontrol/request_control_config.go +++ b/pkg/epp/requestcontrol/request_control_config.go @@ -70,6 +70,18 @@ func (c *Config) WithResponseCompletePlugins(plugins ...ResponseComplete) *Confi return c } +// WithPrepareDataPlugins sets the given plugins as the PrepareData plugins. +func (c *Config) WithPrepareDataPlugins(plugins ...PrepareData) *Config { + c.prepareDataPlugins = plugins + return c +} + +// WithAdmitRequestPlugins sets the given plugins as the AdmitRequest plugins. +func (c *Config) WithAdmitRequestPlugins(plugins ...AdmitRequest) *Config { + c.admitRequestPlugins = plugins + return c +} + // AddPlugins adds the given plugins to the Config. // The type of each plugin is checked and added to the corresponding list of plugins in the Config. // If a plugin implements multiple plugin interfaces, it will be added to each corresponding list. From 303e4013e05b3af0a9d3340f62c64d77a1a5cb25 Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Fri, 31 Oct 2025 23:43:17 +0000 Subject: [PATCH 04/12] Add comments --- pkg/epp/requestcontrol/plugins.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/epp/requestcontrol/plugins.go b/pkg/epp/requestcontrol/plugins.go index e35299d36..7bbe08071 100644 --- a/pkg/epp/requestcontrol/plugins.go +++ b/pkg/epp/requestcontrol/plugins.go @@ -59,12 +59,14 @@ type ResponseComplete interface { } // PrepareData is called by the director before scheduling requests. +// PrepareData plugin is implemented by data producers which produce data from different sources. type PrepareData interface { plugins.Plugin PrepareData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) } // AdmitRequest is called by the director after the PrepareData plugins and before scheduling. +// AdmitRequest plugin is implemented by plugins for admission control. These plugins need to implement Admit method. type AdmitRequest interface { plugins.Plugin Admit(ctx context.Context, request *types.LLMRequest, pods []types.Pod) bool From 9c5276bac690af38673c0ad2ecb12349fcb2ce25 Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Mon, 3 Nov 2025 02:38:55 +0000 Subject: [PATCH 05/12] Address review comments --- pkg/epp/requestcontrol/director.go | 15 +++++++-------- pkg/epp/requestcontrol/director_test.go | 4 ++-- pkg/epp/requestcontrol/plugins.go | 3 ++- pkg/epp/scheduling/types/types.go | 14 +------------- 4 files changed, 12 insertions(+), 24 deletions(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 9f312dc0c..5630d39c8 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -26,7 +26,6 @@ import ( "strings" "time" - "github.com/go-logr/logr" "sigs.k8s.io/controller-runtime/pkg/log" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" @@ -92,10 +91,10 @@ type Director struct { } // getInferenceObjective fetches the inferenceObjective from the datastore otherwise creates a new one based on reqCtx. -func (d *Director) getInferenceObjective(logger logr.Logger, reqCtx *handlers.RequestContext) *v1alpha2.InferenceObjective { +func (d *Director) getInferenceObjective(ctx context.Context, reqCtx *handlers.RequestContext) *v1alpha2.InferenceObjective { infObjective := d.datastore.ObjectiveGet(reqCtx.ObjectiveKey) if infObjective == nil { - logger.V(logutil.VERBOSE).Info("No associated InferenceObjective found, using default", "objectiveKey", reqCtx.ObjectiveKey) + log.FromContext(ctx).V(logutil.VERBOSE).Info("No associated InferenceObjective found, using default", "objectiveKey", reqCtx.ObjectiveKey) infObjective = &v1alpha2.InferenceObjective{ Spec: v1alpha2.InferenceObjectiveSpec{ Priority: &d.defaultPriority, @@ -108,8 +107,7 @@ func (d *Director) getInferenceObjective(logger logr.Logger, reqCtx *handlers.Re return infObjective } -// resolveTargetModel is a helper that resolves targetModel -// and updates the reqCtx. +// resolveTargetModel is a helper to update reqCtx with target model based on request. func (d *Director) resolveTargetModel(reqCtx *handlers.RequestContext) error { requestBodyMap := reqCtx.Request.Body var ok bool @@ -143,7 +141,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo } // Parse inference objective. - infObjective := d.getInferenceObjective(logger, reqCtx) + infObjective := d.getInferenceObjective(ctx, reqCtx) // Prepare LLMRequest (needed for both saturation detection and Scheduler) reqCtx.SchedulingRequest = &schedulingtypes.LLMRequest{ @@ -163,7 +161,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo if len(candidatePods) == 0 { return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"} } - + // TODO(rahulgurnani/lukevandrie): Perhaps, refactor/implement Admit plugin for Admission control. if err := d.admissionController.Admit(ctx, reqCtx, candidatePods, *infObjective.Spec.Priority); err != nil { logger.V(logutil.DEFAULT).Info("Request rejected by admission control", "error", err) return reqCtx, err @@ -360,7 +358,8 @@ func (d *Director) runAdmitRequestPlugins(ctx context.Context, loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) for _, plugin := range d.requestControlPlugins.admitRequestPlugins { loggerDebug.Info("Running AdmitRequest plugin", "plugin", plugin.TypedName()) - if !plugin.Admit(ctx, request, pods) { + if denyReason := plugin.Admit(ctx, request, pods); denyReason != "" { + loggerDebug.Info("AdmitRequest plugin denied the request", "plugin", plugin.TypedName(), "reason", denyReason) return false } loggerDebug.Info("Completed running AdmitRequest plugin successfully", "plugin", plugin.TypedName()) diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 89b5d77a0..559978def 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -127,9 +127,9 @@ func (m *mockAdmitRequestPlugins) TypedName() plugins.TypedName { return m.tn } -func (m *mockAdmitRequestPlugins) Admit(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) bool { +func (m *mockAdmitRequestPlugins) Admit(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) string { m.admitRequestCalled = true - return true + return "" } func TestDirector_HandleRequest(t *testing.T) { diff --git a/pkg/epp/requestcontrol/plugins.go b/pkg/epp/requestcontrol/plugins.go index 7bbe08071..8e6b91cdb 100644 --- a/pkg/epp/requestcontrol/plugins.go +++ b/pkg/epp/requestcontrol/plugins.go @@ -69,5 +69,6 @@ type PrepareData interface { // AdmitRequest plugin is implemented by plugins for admission control. These plugins need to implement Admit method. type AdmitRequest interface { plugins.Plugin - Admit(ctx context.Context, request *types.LLMRequest, pods []types.Pod) bool + // Admit returns the denial reason if the request is denied. If the request is allowed, it returns an empty string. + Admit(ctx context.Context, request *types.LLMRequest, pods []types.Pod) string } diff --git a/pkg/epp/scheduling/types/types.go b/pkg/epp/scheduling/types/types.go index 2983c2f05..e87250eb8 100644 --- a/pkg/epp/scheduling/types/types.go +++ b/pkg/epp/scheduling/types/types.go @@ -254,7 +254,7 @@ type Cloneable interface { // AttributeMap is used to store flexible metadata or traits // across different aspects of an inference server. -// Stored values must be Cloneable. +// Stored values must be Cloneable. This is a per-request snapshot of the attributes. type AttributeMap interface { Put(string, Cloneable) Get(string) (Cloneable, bool) @@ -290,18 +290,6 @@ func (a *Attributes) Get(key string) (Cloneable, bool) { return nil, false } -// Keys returns all keys in the attribute map. -func (a *Attributes) Keys() []string { - var keys []string - a.data.Range(func(key, _ any) bool { - if sk, ok := key.(string); ok { - keys = append(keys, sk) - } - return true - }) - return keys -} - // Clone creates a deep copy of the entire attribute map. func (a *Attributes) Clone() *Attributes { clone := NewAttributes() From 75de637d2d30116844cbf80e2312efcd78e4fc9a Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Mon, 3 Nov 2025 21:50:29 +0000 Subject: [PATCH 06/12] Make PrepareData step time bound and execute all preparedata plugins in parallel --- pkg/epp/requestcontrol/director.go | 53 +++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 9 deletions(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 5630d39c8..9e813a0d5 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -24,6 +24,7 @@ import ( "math/rand" "net" "strings" + "sync" "time" "sigs.k8s.io/controller-runtime/pkg/log" @@ -42,6 +43,11 @@ import ( requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request" ) +const ( + prepareDataTimeout = 200 * time.Millisecond + prepareDataMaxRetries = 3 +) + // Datastore defines the interface required by the Director. type Datastore interface { PoolGet() (*v1.InferencePool, error) @@ -108,19 +114,19 @@ func (d *Director) getInferenceObjective(ctx context.Context, reqCtx *handlers.R } // resolveTargetModel is a helper to update reqCtx with target model based on request. -func (d *Director) resolveTargetModel(reqCtx *handlers.RequestContext) error { +func (d *Director) resolveTargetModel(reqCtx *handlers.RequestContext) (*handlers.RequestContext, error) { requestBodyMap := reqCtx.Request.Body var ok bool reqCtx.IncomingModelName, ok = requestBodyMap["model"].(string) if !ok { - return errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request body"} + return nil, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request body"} } if reqCtx.TargetModelName == "" { // Default to incoming model name reqCtx.TargetModelName = reqCtx.IncomingModelName } reqCtx.Request.Body["model"] = reqCtx.TargetModelName - return nil + return reqCtx, nil } // HandleRequest orchestrates the request lifecycle. @@ -129,7 +135,7 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo logger := log.FromContext(ctx) // Resolve target model and update req context. - err := d.resolveTargetModel(reqCtx) + reqCtx, err := d.resolveTargetModel(reqCtx) if err != nil { return reqCtx, err } @@ -161,15 +167,13 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo if len(candidatePods) == 0 { return reqCtx, errutil.Error{Code: errutil.ServiceUnavailable, Msg: "failed to find candidate pods for serving the request"} } - // TODO(rahulgurnani/lukevandrie): Perhaps, refactor/implement Admit plugin for Admission control. if err := d.admissionController.Admit(ctx, reqCtx, candidatePods, *infObjective.Spec.Priority); err != nil { logger.V(logutil.DEFAULT).Info("Request rejected by admission control", "error", err) return reqCtx, err } snapshotOfCandidatePods := d.toSchedulerPodMetrics(candidatePods) - // Prepare per request data - // TODO(rahulgurnani): Add retries and timeout in the preparedata step. + // Prepare per request data by running PrepareData plugins. d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) // Run admit request plugins @@ -343,14 +347,45 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling } } +// prepareData runs the PrepareData plugin with retries and timeout. +func prepareData(plugin PrepareData, ctx context.Context, request *schedulingtypes.LLMRequest, pods []types.Pod) { + currentTimeout := prepareDataTimeout + for i := 0; i <= prepareDataMaxRetries; i++ { + done := make(chan struct{}) + go func() { + defer close(done) + plugin.PrepareData(ctx, request, pods) + }() + + select { + case <-done: + // Plugin executed successfully + return + case <-time.After(currentTimeout): + log.FromContext(ctx).V(logutil.DEBUG).Info("PrepareData plugin timed out, retrying...", "plugin", plugin.TypedName(), "retry", i+1, "timeout", currentTimeout) + if i == prepareDataMaxRetries { + log.FromContext(ctx).Error(nil, "PrepareData plugin failed after multiple retries", "plugin", plugin.TypedName()) + return + } + } + } +} + func (d *Director) runPrepareDataPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, pods []types.Pod) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) + // Parallely execute PrepareData for all the plugins. Some plugins might take time to prepare data e.g. latency predictor. + // Failure in any prepareData doesn't block the request processing. + var wg sync.WaitGroup for _, plugin := range d.requestControlPlugins.prepareDataPlugins { loggerDebug.Info("Running PrepareData plugin", "plugin", plugin.TypedName()) - plugin.PrepareData(ctx, request, pods) - loggerDebug.Info("Completed running PrepareData plugin successfully", "plugin", plugin.TypedName()) + wg.Add(1) + go func(p PrepareData) { + defer wg.Done() + prepareData(p, ctx, request, pods) + }(plugin) } + wg.Wait() } func (d *Director) runAdmitRequestPlugins(ctx context.Context, From 47fc3a5a4d551d4dbd8a8d0767b328cd984af5d9 Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Tue, 4 Nov 2025 01:51:18 +0000 Subject: [PATCH 07/12] Update interface names based on suggestions --- pkg/epp/requestcontrol/director.go | 16 ++--- pkg/epp/requestcontrol/director_test.go | 68 ++++++++++--------- pkg/epp/requestcontrol/plugins.go | 22 +++--- .../requestcontrol/request_control_config.go | 14 ++-- 4 files changed, 63 insertions(+), 57 deletions(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 9e813a0d5..80dbbfec2 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -119,7 +119,7 @@ func (d *Director) resolveTargetModel(reqCtx *handlers.RequestContext) (*handler var ok bool reqCtx.IncomingModelName, ok = requestBodyMap["model"].(string) if !ok { - return nil, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request body"} + return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request body"} } if reqCtx.TargetModelName == "" { // Default to incoming model name @@ -347,14 +347,14 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling } } -// prepareData runs the PrepareData plugin with retries and timeout. -func prepareData(plugin PrepareData, ctx context.Context, request *schedulingtypes.LLMRequest, pods []types.Pod) { +// prepareData executes the PrepareRequestData plugins with retries and timeout. +func prepareData(plugin DataProducer, ctx context.Context, request *schedulingtypes.LLMRequest, pods []types.Pod) { currentTimeout := prepareDataTimeout for i := 0; i <= prepareDataMaxRetries; i++ { done := make(chan struct{}) go func() { defer close(done) - plugin.PrepareData(ctx, request, pods) + plugin.PrepareRequestData(ctx, request, pods) }() select { @@ -377,10 +377,10 @@ func (d *Director) runPrepareDataPlugins(ctx context.Context, // Parallely execute PrepareData for all the plugins. Some plugins might take time to prepare data e.g. latency predictor. // Failure in any prepareData doesn't block the request processing. var wg sync.WaitGroup - for _, plugin := range d.requestControlPlugins.prepareDataPlugins { + for _, plugin := range d.requestControlPlugins.dataProducerPlugins { loggerDebug.Info("Running PrepareData plugin", "plugin", plugin.TypedName()) wg.Add(1) - go func(p PrepareData) { + go func(p DataProducer) { defer wg.Done() prepareData(p, ctx, request, pods) }(plugin) @@ -393,8 +393,8 @@ func (d *Director) runAdmitRequestPlugins(ctx context.Context, loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) for _, plugin := range d.requestControlPlugins.admitRequestPlugins { loggerDebug.Info("Running AdmitRequest plugin", "plugin", plugin.TypedName()) - if denyReason := plugin.Admit(ctx, request, pods); denyReason != "" { - loggerDebug.Info("AdmitRequest plugin denied the request", "plugin", plugin.TypedName(), "reason", denyReason) + if denyReason := plugin.AdmitRequest(ctx, request, pods); denyReason != nil { + loggerDebug.Info("AdmitRequest plugin denied the request", "plugin", plugin.TypedName(), "reason", denyReason.Error()) return false } loggerDebug.Info("Completed running AdmitRequest plugin successfully", "plugin", plugin.TypedName()) diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 559978def..15b9b4613 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -93,23 +93,27 @@ func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) return res } -type mockPrepareRequestDataPlugin struct { - tn plugins.TypedName - prepareDataCalled bool +type mockDataProducerPlugin struct { + tn plugins.TypedName + prepareRequestDataCalled bool } -func newmockPrepareRequestDataPlugin(name string) *mockPrepareRequestDataPlugin { - return &mockPrepareRequestDataPlugin{ +func newmockDataProducerPlugin(name string) *mockDataProducerPlugin { + return &mockDataProducerPlugin{ tn: plugins.TypedName{Type: "mock-prepare-request-data", Name: name}, } } -func (m *mockPrepareRequestDataPlugin) TypedName() plugins.TypedName { +func (m *mockDataProducerPlugin) TypedName() plugins.TypedName { return m.tn } -func (m *mockPrepareRequestDataPlugin) PrepareData(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) { - m.prepareDataCalled = true +func (m *mockDataProducerPlugin) Produces() map[string]any { + return map[string]any{} +} + +func (m *mockDataProducerPlugin) PrepareRequestData(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) { + m.prepareRequestDataCalled = true } type mockAdmitRequestPlugins struct { @@ -127,9 +131,9 @@ func (m *mockAdmitRequestPlugins) TypedName() plugins.TypedName { return m.tn } -func (m *mockAdmitRequestPlugins) Admit(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) string { +func (m *mockAdmitRequestPlugins) AdmitRequest(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error { m.admitRequestCalled = true - return "" + return nil } func TestDirector_HandleRequest(t *testing.T) { @@ -241,19 +245,19 @@ func TestDirector_HandleRequest(t *testing.T) { } tests := []struct { - name string - reqBodyMap map[string]any - mockAdmissionController *mockAdmissionController - inferenceObjectiveName string - schedulerMockSetup func(m *mockScheduler) - wantErrCode string // Expected errutil code string - wantReqCtx *handlers.RequestContext // Fields to check in the returned RequestContext - wantMutatedBodyModel string // Expected model in reqCtx.Request.Body after PostDispatch - targetModelName string // Expected model name after target model resolution - prepareDataCalled bool - admitRequestCalled bool - prepareDataPlugins *mockPrepareRequestDataPlugin - admitRequestPlugins *mockAdmitRequestPlugins + name string + reqBodyMap map[string]any + mockAdmissionController *mockAdmissionController + inferenceObjectiveName string + schedulerMockSetup func(m *mockScheduler) + wantErrCode string // Expected errutil code string + wantReqCtx *handlers.RequestContext // Fields to check in the returned RequestContext + wantMutatedBodyModel string // Expected model in reqCtx.Request.Body after PostDispatch + targetModelName string // Expected model name after target model resolution + prepareRequestDataCalled bool + admitRequestCalled bool + dataProducerPlugins *mockDataProducerPlugin + admitRequestPlugins *mockAdmitRequestPlugins }{ { name: "successful completions request", @@ -333,10 +337,10 @@ func TestDirector_HandleRequest(t *testing.T) { }, TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", }, - wantMutatedBodyModel: model, - targetModelName: model, - prepareDataCalled: true, - prepareDataPlugins: newmockPrepareRequestDataPlugin("test-plugin"), + wantMutatedBodyModel: model, + targetModelName: model, + prepareRequestDataCalled: true, + dataProducerPlugins: newmockDataProducerPlugin("test-plugin"), }, { name: "successful chat completions request with admit request plugins", @@ -518,8 +522,8 @@ func TestDirector_HandleRequest(t *testing.T) { test.schedulerMockSetup(mockSched) } config := NewConfig() - if test.prepareDataPlugins != nil { - config = config.WithPrepareDataPlugins(test.prepareDataPlugins) + if test.dataProducerPlugins != nil { + config = config.WithDataProducers(test.dataProducerPlugins) } if test.admitRequestPlugins != nil { config = config.WithAdmitRequestPlugins(test.admitRequestPlugins) @@ -569,10 +573,10 @@ func TestDirector_HandleRequest(t *testing.T) { "Mutated reqCtx.Request.Body model mismatch") } if test.admitRequestPlugins != nil { - assert.True(t, test.admitRequestPlugins.admitRequestCalled, "AdmitRequestPlugins not called") + assert.True(t, test.admitRequestPlugins.admitRequestCalled, "AdmitRequest not called") } - if test.prepareDataPlugins != nil { - assert.True(t, test.prepareDataPlugins.prepareDataCalled, "PrepareDataPlugins not called") + if test.dataProducerPlugins != nil { + assert.True(t, test.dataProducerPlugins.prepareRequestDataCalled, "PrepareRequestData not called") } }) } diff --git a/pkg/epp/requestcontrol/plugins.go b/pkg/epp/requestcontrol/plugins.go index 8e6b91cdb..4240f78ee 100644 --- a/pkg/epp/requestcontrol/plugins.go +++ b/pkg/epp/requestcontrol/plugins.go @@ -58,17 +58,19 @@ type ResponseComplete interface { ResponseComplete(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod) } -// PrepareData is called by the director before scheduling requests. -// PrepareData plugin is implemented by data producers which produce data from different sources. -type PrepareData interface { - plugins.Plugin - PrepareData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) +// PrepareRequestData is called by the director before scheduling requests. +// DataProducer plugin is implemented by data producers which produce data from different sources. +type DataProducer interface { + plugins.ProducerPlugin + PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) } -// AdmitRequest is called by the director after the PrepareData plugins and before scheduling. -// AdmitRequest plugin is implemented by plugins for admission control. These plugins need to implement Admit method. -type AdmitRequest interface { +// AdmissionPlugin is called by the director after the prepare data phase and before scheduling. +// When a request has to go through multiple AdmissionPlugin, +// the request is admitted only if all plugins say that the request should be admitted. +type AdmissionPlugin interface { plugins.Plugin - // Admit returns the denial reason if the request is denied. If the request is allowed, it returns an empty string. - Admit(ctx context.Context, request *types.LLMRequest, pods []types.Pod) string + // AdmitRequest returns the denial reason, wrapped as error if the request is denied. + // If the request is allowed, it returns nil. + AdmitRequest(ctx context.Context, request *types.LLMRequest, pods []types.Pod) error } diff --git a/pkg/epp/requestcontrol/request_control_config.go b/pkg/epp/requestcontrol/request_control_config.go index 40618c215..91ff3b345 100644 --- a/pkg/epp/requestcontrol/request_control_config.go +++ b/pkg/epp/requestcontrol/request_control_config.go @@ -23,8 +23,8 @@ import ( // NewConfig creates a new Config object and returns its pointer. func NewConfig() *Config { return &Config{ - admitRequestPlugins: []AdmitRequest{}, - prepareDataPlugins: []PrepareData{}, + admitRequestPlugins: []AdmissionPlugin{}, + dataProducerPlugins: []DataProducer{}, preRequestPlugins: []PreRequest{}, responseReceivedPlugins: []ResponseReceived{}, responseStreamingPlugins: []ResponseStreaming{}, @@ -34,8 +34,8 @@ func NewConfig() *Config { // Config provides a configuration for the requestcontrol plugins. type Config struct { - admitRequestPlugins []AdmitRequest - prepareDataPlugins []PrepareData + admitRequestPlugins []AdmissionPlugin + dataProducerPlugins []DataProducer preRequestPlugins []PreRequest responseReceivedPlugins []ResponseReceived responseStreamingPlugins []ResponseStreaming @@ -71,13 +71,13 @@ func (c *Config) WithResponseCompletePlugins(plugins ...ResponseComplete) *Confi } // WithPrepareDataPlugins sets the given plugins as the PrepareData plugins. -func (c *Config) WithPrepareDataPlugins(plugins ...PrepareData) *Config { - c.prepareDataPlugins = plugins +func (c *Config) WithDataProducers(plugins ...DataProducer) *Config { + c.dataProducerPlugins = plugins return c } // WithAdmitRequestPlugins sets the given plugins as the AdmitRequest plugins. -func (c *Config) WithAdmitRequestPlugins(plugins ...AdmitRequest) *Config { +func (c *Config) WithAdmitRequestPlugins(plugins ...AdmissionPlugin) *Config { c.admitRequestPlugins = plugins return c } From 4bbfcbea937d816aaebdba2d799d406c26c9e93f Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Thu, 6 Nov 2025 03:05:00 +0000 Subject: [PATCH 08/12] Update test and remove duplicate AttributeMap. Address other review comments. --- pkg/epp/backend/metrics/fake.go | 8 +- pkg/epp/backend/metrics/pod_metrics.go | 3 + pkg/epp/backend/metrics/types.go | 4 + pkg/epp/datalayer/attributemap.go | 2 - pkg/epp/datalayer/endpoint.go | 5 + pkg/epp/requestcontrol/director.go | 23 +++-- pkg/epp/requestcontrol/director_test.go | 99 +++++++++++-------- .../requestcontrol/request_control_config.go | 12 +-- pkg/epp/scheduling/types/types.go | 72 +------------- 9 files changed, 102 insertions(+), 126 deletions(-) diff --git a/pkg/epp/backend/metrics/fake.go b/pkg/epp/backend/metrics/fake.go index 613ebf5ec..1c7a90528 100644 --- a/pkg/epp/backend/metrics/fake.go +++ b/pkg/epp/backend/metrics/fake.go @@ -32,8 +32,9 @@ import ( // FakePodMetrics is an implementation of PodMetrics that doesn't run the async refresh loop. type FakePodMetrics struct { - Pod *backend.Pod - Metrics *MetricsState + Pod *backend.Pod + Metrics *MetricsState + Attributes *datalayer.Attributes } func (fpm *FakePodMetrics) String() string { @@ -51,6 +52,9 @@ func (fpm *FakePodMetrics) GetMetrics() *MetricsState { func (fpm *FakePodMetrics) UpdatePod(pod *datalayer.PodInfo) { fpm.Pod = pod } +func (fpm *FakePodMetrics) GetAttributes() *datalayer.Attributes { + return fpm.Attributes +} func (*FakePodMetrics) Put(string, datalayer.Cloneable) {} func (*FakePodMetrics) Get(string) (datalayer.Cloneable, bool) { return nil, false } diff --git a/pkg/epp/backend/metrics/pod_metrics.go b/pkg/epp/backend/metrics/pod_metrics.go index a1114aecf..4d22ef18c 100644 --- a/pkg/epp/backend/metrics/pod_metrics.go +++ b/pkg/epp/backend/metrics/pod_metrics.go @@ -126,6 +126,9 @@ func (pm *podMetrics) stopRefreshLoop() { func (*podMetrics) Put(string, datalayer.Cloneable) {} func (*podMetrics) Get(string) (datalayer.Cloneable, bool) { return nil, false } func (*podMetrics) Keys() []string { return nil } +func (*podMetrics) GetAttributes() *datalayer.Attributes { + return nil +} func (pm *podMetrics) UpdateMetrics(updated *MetricsState) { updated.UpdateTime = time.Now() diff --git a/pkg/epp/backend/metrics/types.go b/pkg/epp/backend/metrics/types.go index 99f15a20f..a0f18a782 100644 --- a/pkg/epp/backend/metrics/types.go +++ b/pkg/epp/backend/metrics/types.go @@ -75,4 +75,8 @@ func (f *PodMetricsFactory) ReleaseEndpoint(ep PodMetrics) { } } +func (f *PodMetricsFactory) GetAttributes() *datalayer.Attributes { + return datalayer.NewAttributes() +} + type PodMetrics = datalayer.Endpoint diff --git a/pkg/epp/datalayer/attributemap.go b/pkg/epp/datalayer/attributemap.go index 8b213915e..614bf57bb 100644 --- a/pkg/epp/datalayer/attributemap.go +++ b/pkg/epp/datalayer/attributemap.go @@ -20,8 +20,6 @@ import ( "sync" ) -// TODO(rahulgurnani): Deprecate this AttributeMap in favor of AttributeMap added in scheduling layer. - // Cloneable types support cloning of the value. type Cloneable interface { Clone() Cloneable diff --git a/pkg/epp/datalayer/endpoint.go b/pkg/epp/datalayer/endpoint.go index 74c11905e..96c5423d5 100644 --- a/pkg/epp/datalayer/endpoint.go +++ b/pkg/epp/datalayer/endpoint.go @@ -25,6 +25,7 @@ import ( type EndpointPodState interface { GetPod() *PodInfo UpdatePod(*PodInfo) + GetAttributes() *Attributes } // EndpointMetricsState allows management of the Metrics related attributes. @@ -89,6 +90,10 @@ func (srv *ModelServer) Keys() []string { return srv.attributes.Keys() } +func (srv *ModelServer) GetAttributes() *Attributes { + return srv.attributes +} + func (srv *ModelServer) Clone() *ModelServer { clone := &ModelServer{ attributes: srv.attributes.Clone(), diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 80dbbfec2..020b3bed2 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -33,10 +33,10 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" @@ -174,10 +174,11 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo snapshotOfCandidatePods := d.toSchedulerPodMetrics(candidatePods) // Prepare per request data by running PrepareData plugins. + // NOTE: Failure in prepare data plugins does not block the request processing. d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) // Run admit request plugins - if !d.runAdmitRequestPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) { + if !d.withAdmissionPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) { logger.V(logutil.DEFAULT).Info("Request cannot be admitted") return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "request cannot be admitted"} } @@ -276,7 +277,11 @@ func (d *Director) prepareRequest(ctx context.Context, reqCtx *handlers.RequestC func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []schedulingtypes.Pod { pm := make([]schedulingtypes.Pod, len(pods)) for i, pod := range pods { - pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone()} + if pod.GetAttributes() != nil { + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: pod.GetAttributes().Clone()} + } else { + pm[i] = &schedulingtypes.PodMetrics{Pod: pod.GetPod().Clone(), MetricsState: pod.GetMetrics().Clone(), AttributeMap: datalayer.NewAttributes()} + } } return pm @@ -348,7 +353,7 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling } // prepareData executes the PrepareRequestData plugins with retries and timeout. -func prepareData(plugin DataProducer, ctx context.Context, request *schedulingtypes.LLMRequest, pods []types.Pod) { +func prepareData(plugin DataProducer, ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) { currentTimeout := prepareDataTimeout for i := 0; i <= prepareDataMaxRetries; i++ { done := make(chan struct{}) @@ -372,9 +377,9 @@ func prepareData(plugin DataProducer, ctx context.Context, request *schedulingty } func (d *Director) runPrepareDataPlugins(ctx context.Context, - request *schedulingtypes.LLMRequest, pods []types.Pod) { + request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) - // Parallely execute PrepareData for all the plugins. Some plugins might take time to prepare data e.g. latency predictor. + // Parallelly execute PrepareData for all the plugins. Some plugins might take time to prepare data e.g. latency predictor. // Failure in any prepareData doesn't block the request processing. var wg sync.WaitGroup for _, plugin := range d.requestControlPlugins.dataProducerPlugins { @@ -388,10 +393,10 @@ func (d *Director) runPrepareDataPlugins(ctx context.Context, wg.Wait() } -func (d *Director) runAdmitRequestPlugins(ctx context.Context, - request *schedulingtypes.LLMRequest, pods []types.Pod) bool { +func (d *Director) withAdmissionPlugins(ctx context.Context, + request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) bool { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) - for _, plugin := range d.requestControlPlugins.admitRequestPlugins { + for _, plugin := range d.requestControlPlugins.admissionPlugins { loggerDebug.Info("Running AdmitRequest plugin", "plugin", plugin.TypedName()) if denyReason := plugin.AdmitRequest(ctx, request, pods); denyReason != nil { loggerDebug.Info("AdmitRequest plugin denied the request", "plugin", plugin.TypedName(), "reason", denyReason.Error()) diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 15b9b4613..73c0e2616 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -37,6 +37,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata" @@ -48,6 +49,10 @@ import ( testutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" ) +const ( + mockProducedDataKey = "producedDataKey" +) + // --- Mocks --- type mockAdmissionController struct { @@ -66,9 +71,16 @@ func (m *mockAdmissionController) Admit( type mockScheduler struct { scheduleResults *schedulingtypes.SchedulingResult scheduleErr error + dataProduced bool // denotes whether data production is expected. } -func (m *mockScheduler) Schedule(_ context.Context, _ *schedulingtypes.LLMRequest, _ []schedulingtypes.Pod) (*schedulingtypes.SchedulingResult, error) { +func (m *mockScheduler) Schedule(_ context.Context, _ *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) (*schedulingtypes.SchedulingResult, error) { + if pods != nil && m.dataProduced { + data, ok := pods[0].Get(mockProducedDataKey) + if !ok || data.(mockProducedDataType).value != 42 { + return nil, errors.New("expected produced data not found in pod") + } + } return m.scheduleResults, m.scheduleErr } @@ -94,11 +106,10 @@ func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) } type mockDataProducerPlugin struct { - tn plugins.TypedName - prepareRequestDataCalled bool + tn plugins.TypedName } -func newmockDataProducerPlugin(name string) *mockDataProducerPlugin { +func newMockDataProducerPlugin(name string) *mockDataProducerPlugin { return &mockDataProducerPlugin{ tn: plugins.TypedName{Type: "mock-prepare-request-data", Name: name}, } @@ -109,33 +120,44 @@ func (m *mockDataProducerPlugin) TypedName() plugins.TypedName { } func (m *mockDataProducerPlugin) Produces() map[string]any { - return map[string]any{} + // Produces data of type int, 0 denotes it is int. + return map[string]any{mockProducedDataKey: 0} } func (m *mockDataProducerPlugin) PrepareRequestData(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) { - m.prepareRequestDataCalled = true + pods[0].Put(mockProducedDataKey, mockProducedDataType{value: 42}) } -type mockAdmitRequestPlugins struct { - tn plugins.TypedName +type mockAdmissionPlugin struct { + tn plugins.TypedName + // TODO: Replace this will admission control. admitRequestCalled bool } -func newmockAdmitRequestPlugins(name string) *mockAdmitRequestPlugins { - return &mockAdmitRequestPlugins{ +func newMockAdmissionPlugin(name string) *mockAdmissionPlugin { + return &mockAdmissionPlugin{ tn: plugins.TypedName{Type: "mock-admit-data", Name: name}, } } -func (m *mockAdmitRequestPlugins) TypedName() plugins.TypedName { +func (m *mockAdmissionPlugin) TypedName() plugins.TypedName { return m.tn } -func (m *mockAdmitRequestPlugins) AdmitRequest(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error { +func (m *mockAdmissionPlugin) AdmitRequest(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error { m.admitRequestCalled = true return nil } +type mockProducedDataType struct { + value int +} + +// Clone implements types.Cloneable. +func (m mockProducedDataType) Clone() datalayer.Cloneable { + return mockProducedDataType{value: m.value} +} + func TestDirector_HandleRequest(t *testing.T) { ctx := logutil.NewTestLoggerIntoContext(context.Background()) @@ -210,6 +232,7 @@ func TestDirector_HandleRequest(t *testing.T) { TargetPods: []schedulingtypes.Pod{ &schedulingtypes.ScoredPod{ Pod: &schedulingtypes.PodMetrics{ + AttributeMap: datalayer.NewAttributes(), Pod: &backend.Pod{ Address: "192.168.1.100", Port: "8000", @@ -220,6 +243,7 @@ func TestDirector_HandleRequest(t *testing.T) { }, &schedulingtypes.ScoredPod{ Pod: &schedulingtypes.PodMetrics{ + AttributeMap: datalayer.NewAttributes(), Pod: &backend.Pod{ Address: "192.168.2.100", Port: "8000", @@ -230,6 +254,7 @@ func TestDirector_HandleRequest(t *testing.T) { }, &schedulingtypes.ScoredPod{ Pod: &schedulingtypes.PodMetrics{ + AttributeMap: datalayer.NewAttributes(), Pod: &backend.Pod{ Address: "192.168.4.100", Port: "8000", @@ -245,19 +270,18 @@ func TestDirector_HandleRequest(t *testing.T) { } tests := []struct { - name string - reqBodyMap map[string]any - mockAdmissionController *mockAdmissionController - inferenceObjectiveName string - schedulerMockSetup func(m *mockScheduler) - wantErrCode string // Expected errutil code string - wantReqCtx *handlers.RequestContext // Fields to check in the returned RequestContext - wantMutatedBodyModel string // Expected model in reqCtx.Request.Body after PostDispatch - targetModelName string // Expected model name after target model resolution - prepareRequestDataCalled bool - admitRequestCalled bool - dataProducerPlugins *mockDataProducerPlugin - admitRequestPlugins *mockAdmitRequestPlugins + name string + reqBodyMap map[string]any + mockAdmissionController *mockAdmissionController + inferenceObjectiveName string + schedulerMockSetup func(m *mockScheduler) + wantErrCode string // Expected errutil code string + wantReqCtx *handlers.RequestContext // Fields to check in the returned RequestContext + wantMutatedBodyModel string // Expected model in reqCtx.Request.Body after PostDispatch + targetModelName string // Expected model name after target model resolution + admitRequestCalled bool + dataProducerPlugin *mockDataProducerPlugin + admissionPlugin *mockAdmissionPlugin }{ { name: "successful completions request", @@ -326,6 +350,7 @@ func TestDirector_HandleRequest(t *testing.T) { mockAdmissionController: &mockAdmissionController{admitErr: nil}, schedulerMockSetup: func(m *mockScheduler) { m.scheduleResults = defaultSuccessfulScheduleResults + m.dataProduced = true }, wantReqCtx: &handlers.RequestContext{ TargetModelName: model, @@ -337,10 +362,9 @@ func TestDirector_HandleRequest(t *testing.T) { }, TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", }, - wantMutatedBodyModel: model, - targetModelName: model, - prepareRequestDataCalled: true, - dataProducerPlugins: newmockDataProducerPlugin("test-plugin"), + wantMutatedBodyModel: model, + targetModelName: model, + dataProducerPlugin: newMockDataProducerPlugin("test-plugin"), }, { name: "successful chat completions request with admit request plugins", @@ -370,7 +394,7 @@ func TestDirector_HandleRequest(t *testing.T) { wantMutatedBodyModel: model, targetModelName: model, admitRequestCalled: true, - admitRequestPlugins: newmockAdmitRequestPlugins("test-plugin"), + admissionPlugin: newMockAdmissionPlugin("test-plugin"), }, { name: "successful chat completions request with multiple messages", @@ -522,11 +546,11 @@ func TestDirector_HandleRequest(t *testing.T) { test.schedulerMockSetup(mockSched) } config := NewConfig() - if test.dataProducerPlugins != nil { - config = config.WithDataProducers(test.dataProducerPlugins) + if test.dataProducerPlugin != nil { + config = config.WithDataProducers(test.dataProducerPlugin) } - if test.admitRequestPlugins != nil { - config = config.WithAdmitRequestPlugins(test.admitRequestPlugins) + if test.admissionPlugin != nil { + config = config.WithAdmissionPlugins(test.admissionPlugin) } director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, config) @@ -572,11 +596,8 @@ func TestDirector_HandleRequest(t *testing.T) { assert.Equal(t, test.wantMutatedBodyModel, returnedReqCtx.Request.Body["model"], "Mutated reqCtx.Request.Body model mismatch") } - if test.admitRequestPlugins != nil { - assert.True(t, test.admitRequestPlugins.admitRequestCalled, "AdmitRequest not called") - } - if test.dataProducerPlugins != nil { - assert.True(t, test.dataProducerPlugins.prepareRequestDataCalled, "PrepareRequestData not called") + if test.admissionPlugin != nil { + assert.True(t, test.admissionPlugin.admitRequestCalled, "AdmitRequest not called") } }) } diff --git a/pkg/epp/requestcontrol/request_control_config.go b/pkg/epp/requestcontrol/request_control_config.go index 91ff3b345..e5c246894 100644 --- a/pkg/epp/requestcontrol/request_control_config.go +++ b/pkg/epp/requestcontrol/request_control_config.go @@ -23,7 +23,7 @@ import ( // NewConfig creates a new Config object and returns its pointer. func NewConfig() *Config { return &Config{ - admitRequestPlugins: []AdmissionPlugin{}, + admissionPlugins: []AdmissionPlugin{}, dataProducerPlugins: []DataProducer{}, preRequestPlugins: []PreRequest{}, responseReceivedPlugins: []ResponseReceived{}, @@ -34,7 +34,7 @@ func NewConfig() *Config { // Config provides a configuration for the requestcontrol plugins. type Config struct { - admitRequestPlugins []AdmissionPlugin + admissionPlugins []AdmissionPlugin dataProducerPlugins []DataProducer preRequestPlugins []PreRequest responseReceivedPlugins []ResponseReceived @@ -70,15 +70,15 @@ func (c *Config) WithResponseCompletePlugins(plugins ...ResponseComplete) *Confi return c } -// WithPrepareDataPlugins sets the given plugins as the PrepareData plugins. +// WithDataProducers sets the given plugins as the PrepareData plugins. func (c *Config) WithDataProducers(plugins ...DataProducer) *Config { c.dataProducerPlugins = plugins return c } -// WithAdmitRequestPlugins sets the given plugins as the AdmitRequest plugins. -func (c *Config) WithAdmitRequestPlugins(plugins ...AdmissionPlugin) *Config { - c.admitRequestPlugins = plugins +// WithAdmissionPlugins sets the given plugins as the AdmitRequest plugins. +func (c *Config) WithAdmissionPlugins(plugins ...AdmissionPlugin) *Config { + c.admissionPlugins = plugins return c } diff --git a/pkg/epp/scheduling/types/types.go b/pkg/epp/scheduling/types/types.go index e87250eb8..8e0553fae 100644 --- a/pkg/epp/scheduling/types/types.go +++ b/pkg/epp/scheduling/types/types.go @@ -21,10 +21,10 @@ import ( "errors" "fmt" "strings" - "sync" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" ) const nilString = "" @@ -192,8 +192,8 @@ type Pod interface { GetPod() *backend.Pod GetMetrics() *backendmetrics.MetricsState String() string - Put(key string, value Cloneable) - Get(key string) (Cloneable, bool) + Get(string) (datalayer.Cloneable, bool) + Put(string, datalayer.Cloneable) Keys() []string } @@ -221,19 +221,7 @@ func (pm *PodMetrics) GetMetrics() *backendmetrics.MetricsState { type PodMetrics struct { *backend.Pod *backendmetrics.MetricsState - AttributeMap -} - -func (pm *PodMetrics) Put(key string, value Cloneable) { - pm.AttributeMap.Put(key, value) -} - -func (pm *PodMetrics) Get(key string) (Cloneable, bool) { - return pm.AttributeMap.Get(key) -} - -func (pm *PodMetrics) Keys() []string { - return pm.AttributeMap.Keys() + datalayer.AttributeMap } // ProfileRunResult captures the profile run result. @@ -251,55 +239,3 @@ type SchedulingResult struct { type Cloneable interface { Clone() Cloneable } - -// AttributeMap is used to store flexible metadata or traits -// across different aspects of an inference server. -// Stored values must be Cloneable. This is a per-request snapshot of the attributes. -type AttributeMap interface { - Put(string, Cloneable) - Get(string) (Cloneable, bool) - Keys() []string -} - -// Attributes provides a goroutine-safe implementation of AttributeMap. -type Attributes struct { - data sync.Map // key: attribute name (string), value: attribute value (opaque, Cloneable) -} - -// NewAttributes returns a new instance of Attributes. -func NewAttributes() *Attributes { - return &Attributes{} -} - -// Put adds or updates an attribute in the map. -func (a *Attributes) Put(key string, value Cloneable) { - if value != nil { - a.data.Store(key, value) // TODO: Clone into map to ensure isolation - } -} - -// Get retrieves an attribute by key, returning a cloned copy. -func (a *Attributes) Get(key string) (Cloneable, bool) { - val, ok := a.data.Load(key) - if !ok { - return nil, false - } - if cloneable, ok := val.(Cloneable); ok { - return cloneable.Clone(), true - } - return nil, false -} - -// Clone creates a deep copy of the entire attribute map. -func (a *Attributes) Clone() *Attributes { - clone := NewAttributes() - a.data.Range(func(key, value any) bool { - if sk, ok := key.(string); ok { - if v, ok := value.(Cloneable); ok { - clone.Put(sk, v) - } - } - return true - }) - return clone -} From 01b6b3b9a6b50c1989341f375649f894138f143a Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Tue, 11 Nov 2025 19:24:17 +0000 Subject: [PATCH 09/12] Execute prepare data plugins sequentially with retries and timeout. Also added more tests and some refactoring --- pkg/epp/backend/metrics/types.go | 4 - pkg/epp/requestcontrol/director.go | 61 +++++++------ pkg/epp/requestcontrol/director_test.go | 88 +++++++++---------- pkg/epp/requestcontrol/plugins.go | 7 +- .../requestcontrol/request_control_config.go | 14 +-- .../framework/plugins/multi/prefix/plugin.go | 1 - 6 files changed, 90 insertions(+), 85 deletions(-) diff --git a/pkg/epp/backend/metrics/types.go b/pkg/epp/backend/metrics/types.go index a0f18a782..99f15a20f 100644 --- a/pkg/epp/backend/metrics/types.go +++ b/pkg/epp/backend/metrics/types.go @@ -75,8 +75,4 @@ func (f *PodMetricsFactory) ReleaseEndpoint(ep PodMetrics) { } } -func (f *PodMetricsFactory) GetAttributes() *datalayer.Attributes { - return datalayer.NewAttributes() -} - type PodMetrics = datalayer.Endpoint diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 020b3bed2..aaf7185b0 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -24,7 +24,6 @@ import ( "math/rand" "net" "strings" - "sync" "time" "sigs.k8s.io/controller-runtime/pkg/log" @@ -175,7 +174,9 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo // Prepare per request data by running PrepareData plugins. // NOTE: Failure in prepare data plugins does not block the request processing. - d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) + if d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) != nil { + return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "failed to prepare request data"} + } // Run admit request plugins if !d.withAdmissionPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) { @@ -352,45 +353,55 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling } } -// prepareData executes the PrepareRequestData plugins with retries and timeout. -func prepareData(plugin DataProducer, ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) { +// executePlugins executes PrepareDataPlugins sequentially. +// TODO: Change to DAG execution in the following PRs. +func (d *Director) executePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod, plugins []PrepareDataPlugin) error { + for _, plugin := range plugins { + err := prepareDataWithRetriesAndTimeout(plugin, ctx, request, pods) + if err != nil { + return err + } + } + return nil +} + +// prepareDataWithRetriesAndTimeout executes the PrepareRequestData plugins with retries and timeout. +func prepareDataWithRetriesAndTimeout(plugin PrepareDataPlugin, ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error { currentTimeout := prepareDataTimeout for i := 0; i <= prepareDataMaxRetries; i++ { - done := make(chan struct{}) + errCh := make(chan error, 1) go func() { - defer close(done) - plugin.PrepareRequestData(ctx, request, pods) + errCh <- plugin.PrepareRequestData(ctx, request, pods) }() select { - case <-done: - // Plugin executed successfully - return + case <-ctx.Done(): + return ctx.Err() + case err := <-errCh: + if err != nil { + log.FromContext(ctx).V(logutil.DEBUG).Info("PrepareData plugin failed, retrying...", "plugin", plugin.TypedName(), "retry", i+1, "error", err) + continue + } + return nil // Success case <-time.After(currentTimeout): log.FromContext(ctx).V(logutil.DEBUG).Info("PrepareData plugin timed out, retrying...", "plugin", plugin.TypedName(), "retry", i+1, "timeout", currentTimeout) if i == prepareDataMaxRetries { - log.FromContext(ctx).Error(nil, "PrepareData plugin failed after multiple retries", "plugin", plugin.TypedName()) - return + return fmt.Errorf("PrepareData plugin %s failed after %d retries", plugin.TypedName().String(), prepareDataMaxRetries) } } } + return nil } func (d *Director) runPrepareDataPlugins(ctx context.Context, - request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) { - loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) - // Parallelly execute PrepareData for all the plugins. Some plugins might take time to prepare data e.g. latency predictor. - // Failure in any prepareData doesn't block the request processing. - var wg sync.WaitGroup - for _, plugin := range d.requestControlPlugins.dataProducerPlugins { - loggerDebug.Info("Running PrepareData plugin", "plugin", plugin.TypedName()) - wg.Add(1) - go func(p DataProducer) { - defer wg.Done() - prepareData(p, ctx, request, pods) - }(plugin) + request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error { + err := d.executePlugins(ctx, request, pods, d.requestControlPlugins.prepareDataPlugins) + if err != nil { + log.FromContext(ctx).Error(err, "failed to execute PrepareData plugins as DAG, falling back to parallel execution") + return err } - wg.Wait() + + return nil } func (d *Director) withAdmissionPlugins(ctx context.Context, diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 73c0e2616..8fcc0d5ff 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "maps" "testing" "time" @@ -105,38 +106,23 @@ func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) return res } -type mockDataProducerPlugin struct { - tn plugins.TypedName -} - -func newMockDataProducerPlugin(name string) *mockDataProducerPlugin { - return &mockDataProducerPlugin{ - tn: plugins.TypedName{Type: "mock-prepare-request-data", Name: name}, +func newMockPrepareDataPlugin(name string) *mockPrepareDataPlugin { + return &mockPrepareDataPlugin{ + name: name, + produces: map[string]any{mockProducedDataKey: 0}, + consumes: map[string]any{}, } } -func (m *mockDataProducerPlugin) TypedName() plugins.TypedName { - return m.tn -} - -func (m *mockDataProducerPlugin) Produces() map[string]any { - // Produces data of type int, 0 denotes it is int. - return map[string]any{mockProducedDataKey: 0} -} - -func (m *mockDataProducerPlugin) PrepareRequestData(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) { - pods[0].Put(mockProducedDataKey, mockProducedDataType{value: 42}) -} - type mockAdmissionPlugin struct { - tn plugins.TypedName - // TODO: Replace this will admission control. - admitRequestCalled bool + tn plugins.TypedName + denialError error } -func newMockAdmissionPlugin(name string) *mockAdmissionPlugin { +func newMockAdmissionPlugin(name string, denialError error) *mockAdmissionPlugin { return &mockAdmissionPlugin{ - tn: plugins.TypedName{Type: "mock-admit-data", Name: name}, + tn: plugins.TypedName{Type: "mock-admit-data", Name: name}, + denialError: denialError, } } @@ -145,8 +131,7 @@ func (m *mockAdmissionPlugin) TypedName() plugins.TypedName { } func (m *mockAdmissionPlugin) AdmitRequest(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error { - m.admitRequestCalled = true - return nil + return m.denialError } type mockProducedDataType struct { @@ -279,9 +264,8 @@ func TestDirector_HandleRequest(t *testing.T) { wantReqCtx *handlers.RequestContext // Fields to check in the returned RequestContext wantMutatedBodyModel string // Expected model in reqCtx.Request.Body after PostDispatch targetModelName string // Expected model name after target model resolution - admitRequestCalled bool - dataProducerPlugin *mockDataProducerPlugin - admissionPlugin *mockAdmissionPlugin + admitRequestDenialError error // Expected denial error from admission plugin + prepareDataPlugin *mockPrepareDataPlugin }{ { name: "successful completions request", @@ -364,7 +348,7 @@ func TestDirector_HandleRequest(t *testing.T) { }, wantMutatedBodyModel: model, targetModelName: model, - dataProducerPlugin: newMockDataProducerPlugin("test-plugin"), + prepareDataPlugin: newMockPrepareDataPlugin("test-plugin"), }, { name: "successful chat completions request with admit request plugins", @@ -391,10 +375,29 @@ func TestDirector_HandleRequest(t *testing.T) { }, TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000", }, - wantMutatedBodyModel: model, - targetModelName: model, - admitRequestCalled: true, - admissionPlugin: newMockAdmissionPlugin("test-plugin"), + wantMutatedBodyModel: model, + targetModelName: model, + admitRequestDenialError: nil, + }, + { + name: "denied request by admit request plugin", + reqBodyMap: map[string]any{ + "model": model, + "messages": []any{ + map[string]any{ + "role": "user", + "content": "critical prompt", + }, + }, + }, + mockAdmissionController: &mockAdmissionController{admitErr: nil}, + schedulerMockSetup: func(m *mockScheduler) { + m.scheduleResults = defaultSuccessfulScheduleResults + }, + wantMutatedBodyModel: model, + targetModelName: model, + admitRequestDenialError: errors.New("denied by admit plugin"), + wantErrCode: errutil.Internal, }, { name: "successful chat completions request with multiple messages", @@ -546,12 +549,10 @@ func TestDirector_HandleRequest(t *testing.T) { test.schedulerMockSetup(mockSched) } config := NewConfig() - if test.dataProducerPlugin != nil { - config = config.WithDataProducers(test.dataProducerPlugin) - } - if test.admissionPlugin != nil { - config = config.WithAdmissionPlugins(test.admissionPlugin) + if test.prepareDataPlugin != nil { + config = config.WithPrepareDataPlugins(test.prepareDataPlugin) } + config = config.WithAdmissionPlugins(newMockAdmissionPlugin("test-admit-plugin", test.admitRequestDenialError)) director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, config) reqCtx := &handlers.RequestContext{ @@ -566,9 +567,7 @@ func TestDirector_HandleRequest(t *testing.T) { TargetModelName: test.targetModelName, } // Deep copy the body map. - for k, v := range test.reqBodyMap { - reqCtx.Request.Body[k] = v - } + maps.Copy(reqCtx.Request.Body, test.reqBodyMap) returnedReqCtx, err := director.HandleRequest(ctx, reqCtx) @@ -596,9 +595,6 @@ func TestDirector_HandleRequest(t *testing.T) { assert.Equal(t, test.wantMutatedBodyModel, returnedReqCtx.Request.Body["model"], "Mutated reqCtx.Request.Body model mismatch") } - if test.admissionPlugin != nil { - assert.True(t, test.admissionPlugin.admitRequestCalled, "AdmitRequest not called") - } }) } } diff --git a/pkg/epp/requestcontrol/plugins.go b/pkg/epp/requestcontrol/plugins.go index 4240f78ee..8c6602049 100644 --- a/pkg/epp/requestcontrol/plugins.go +++ b/pkg/epp/requestcontrol/plugins.go @@ -59,10 +59,11 @@ type ResponseComplete interface { } // PrepareRequestData is called by the director before scheduling requests. -// DataProducer plugin is implemented by data producers which produce data from different sources. -type DataProducer interface { +// PrepareDataPlugin plugin is implemented by data producers which produce data from different sources. +type PrepareDataPlugin interface { plugins.ProducerPlugin - PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) + plugins.ConsumerPlugin + PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) error } // AdmissionPlugin is called by the director after the prepare data phase and before scheduling. diff --git a/pkg/epp/requestcontrol/request_control_config.go b/pkg/epp/requestcontrol/request_control_config.go index e5c246894..9701be999 100644 --- a/pkg/epp/requestcontrol/request_control_config.go +++ b/pkg/epp/requestcontrol/request_control_config.go @@ -24,7 +24,7 @@ import ( func NewConfig() *Config { return &Config{ admissionPlugins: []AdmissionPlugin{}, - dataProducerPlugins: []DataProducer{}, + prepareDataPlugins: []PrepareDataPlugin{}, preRequestPlugins: []PreRequest{}, responseReceivedPlugins: []ResponseReceived{}, responseStreamingPlugins: []ResponseStreaming{}, @@ -35,7 +35,7 @@ func NewConfig() *Config { // Config provides a configuration for the requestcontrol plugins. type Config struct { admissionPlugins []AdmissionPlugin - dataProducerPlugins []DataProducer + prepareDataPlugins []PrepareDataPlugin preRequestPlugins []PreRequest responseReceivedPlugins []ResponseReceived responseStreamingPlugins []ResponseStreaming @@ -70,9 +70,9 @@ func (c *Config) WithResponseCompletePlugins(plugins ...ResponseComplete) *Confi return c } -// WithDataProducers sets the given plugins as the PrepareData plugins. -func (c *Config) WithDataProducers(plugins ...DataProducer) *Config { - c.dataProducerPlugins = plugins +// WithPrepareDataPlugins sets the given plugins as the PrepareData plugins. +func (c *Config) WithPrepareDataPlugins(plugins ...PrepareDataPlugin) *Config { + c.prepareDataPlugins = plugins return c } @@ -85,7 +85,6 @@ func (c *Config) WithAdmissionPlugins(plugins ...AdmissionPlugin) *Config { // AddPlugins adds the given plugins to the Config. // The type of each plugin is checked and added to the corresponding list of plugins in the Config. // If a plugin implements multiple plugin interfaces, it will be added to each corresponding list. - func (c *Config) AddPlugins(pluginObjects ...plugins.Plugin) { for _, plugin := range pluginObjects { if preRequestPlugin, ok := plugin.(PreRequest); ok { @@ -100,5 +99,8 @@ func (c *Config) AddPlugins(pluginObjects ...plugins.Plugin) { if responseCompletePlugin, ok := plugin.(ResponseComplete); ok { c.responseCompletePlugins = append(c.responseCompletePlugins, responseCompletePlugin) } + if prepareDataPlugin, ok := plugin.(PrepareDataPlugin); ok { + c.prepareDataPlugins = append(c.prepareDataPlugins, prepareDataPlugin) + } } } diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go index 9def3e4e9..b83621367 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go @@ -209,7 +209,6 @@ func (p *Plugin) Score(ctx context.Context, cycleState *types.CycleState, reques log.FromContext(ctx).V(logutil.TRACE).Info("prefix cached state", "cached-servers", state.PrefixCacheServers, "hashes", state.PrefixHashes) // calculate the scores of pods scores := make(map[types.Pod]float64, len(pods)) - total := len(state.PrefixHashes) podScoreFunc := func(pod types.Pod) float64 { if total == 0 { From 3c7c320fadf81127b58e61705a851c89f06d5ec2 Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Tue, 11 Nov 2025 20:04:16 +0000 Subject: [PATCH 10/12] Update prefix match plugin to implement PrepareData plugin --- .../framework/plugins/multi/prefix/plugin.go | 39 +++++++++++++++++-- .../plugins/multi/prefix/plugin_test.go | 13 ++++++- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go index b83621367..8f2c84c1f 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go @@ -57,6 +57,8 @@ const ( DefaultLRUCapacityPerServer = 31250 PrefixCachePluginType = "prefix-cache-scorer" + + PrefixCacheMatchKey = "PrefixCacheMatchKey" ) const ( @@ -195,8 +197,17 @@ func (p *Plugin) WithName(name string) *Plugin { return p } -// Score returns the scoring result for the given list of pods based on context. -func (p *Plugin) Score(ctx context.Context, cycleState *types.CycleState, request *types.LLMRequest, pods []types.Pod) map[types.Pod]float64 { +func (p *Plugin) Consumes() map[string]any { + return map[string]any{} +} + +func (p *Plugin) Produces() map[string]any { + return map[string]any{ + PrefixCacheMatchKey: &SchedulingContextState{}, + } +} + +func (p *Plugin) PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) { // pre score step, hashing prompt and find longest prefix match. hashes := hashPrompt(ctx, request, getBlockSize(pods, p.config.DefaultBlockSize), p.config.MaxPrefixBlocksToMatch) state := &SchedulingContextState{ @@ -204,8 +215,30 @@ func (p *Plugin) Score(ctx context.Context, cycleState *types.CycleState, reques PrefixCacheServers: p.matchLongestPrefix(ctx, hashes), } - cycleState.Write(plugins.StateKey(p.TypedName().String()), state) + // TODO: Instead store this in the pods attribute map to avoid global state in the plugin. p.pluginState.Write(request.RequestId, plugins.StateKey(p.TypedName().String()), state) +} + +// Score returns the scoring result for the given list of pods based on context. +func (p *Plugin) Score(ctx context.Context, cycleState *types.CycleState, request *types.LLMRequest, pods []types.Pod) map[types.Pod]float64 { + // TODO(rahulgurnani): Remove duplication with PrepareRequestData after testing. + state, err := plugins.ReadPluginStateKey[*SchedulingContextState](p.pluginState, request.RequestId, plugins.StateKey(p.TypedName().String())) + if err != nil { + // This should not happen, but in case it does, we recalculate the state. + // In unit tests, this doesn't happen as PrepareRequestData is always called before Score. + // TODO: When the prefix plugin is split into separate score plugin and pre-request plugin, + // remove this recalculation. + log.FromContext(ctx).Error(err, "failed to read prefix plugin state, recalculating") + hashes := hashPrompt(ctx, request, getBlockSize(pods, p.config.DefaultBlockSize), p.config.MaxPrefixBlocksToMatch) + state = &SchedulingContextState{ + PrefixHashes: hashes, + PrefixCacheServers: p.matchLongestPrefix(ctx, hashes), + } + p.pluginState.Write(request.RequestId, plugins.StateKey(p.TypedName().String()), state) + } + // TODO(rahulgurnani): cleanup the cycleState after all the changes are done. Seems llm-d-scheduler relies on cyclestate presently. + cycleState.Write(plugins.StateKey(p.TypedName().String()), state) + log.FromContext(ctx).V(logutil.TRACE).Info("prefix cached state", "cached-servers", state.PrefixCacheServers, "hashes", state.PrefixHashes) // calculate the scores of pods scores := make(map[types.Pod]float64, len(pods)) diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go index 59a09db52..7a27dce7f 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go @@ -55,6 +55,7 @@ func TestPrefixPluginCompletion(t *testing.T) { }, }, } + plugin.PrepareRequestData(context.Background(), req1, pods) scores := plugin.Score(context.Background(), types.NewCycleState(), req1, pods) state, err := plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req1.RequestId, plugins.StateKey(plugin.TypedName().String())) assert.NoError(t, err) @@ -87,6 +88,7 @@ func TestPrefixPluginCompletion(t *testing.T) { }, }, } + plugin.PrepareRequestData(context.Background(), req2, pods) scores = plugin.Score(context.Background(), types.NewCycleState(), req2, pods) state, err = plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req2.RequestId, plugins.StateKey(plugin.TypedName().String())) assert.NoError(t, err) @@ -118,6 +120,7 @@ func TestPrefixPluginCompletion(t *testing.T) { }, }, } + plugin.PrepareRequestData(context.Background(), req3, pods) scores = plugin.Score(context.Background(), types.NewCycleState(), req3, pods) state, err = plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req3.RequestId, plugins.StateKey(plugin.TypedName().String())) assert.NoError(t, err) @@ -148,6 +151,7 @@ func TestPrefixPluginCompletion(t *testing.T) { }, }, } + plugin.PrepareRequestData(context.Background(), req4, pods) scores = plugin.Score(context.Background(), types.NewCycleState(), req4, pods) state, err = plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req4.RequestId, plugins.StateKey(plugin.TypedName().String())) assert.NoError(t, err) @@ -178,6 +182,7 @@ func TestPrefixPluginCompletion(t *testing.T) { }, }, } + plugin.PrepareRequestData(context.Background(), req5, pods) scores = plugin.Score(context.Background(), types.NewCycleState(), req5, pods) state, err = plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req5.RequestId, plugins.StateKey(plugin.TypedName().String())) assert.NoError(t, err) @@ -223,6 +228,7 @@ func TestPrefixPluginChatCompletions(t *testing.T) { }, }, } + plugin.PrepareRequestData(context.Background(), req1, pods) scores := plugin.Score(context.Background(), types.NewCycleState(), req1, pods) state, err := plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req1.RequestId, plugins.StateKey(plugin.TypedName().String())) assert.NoError(t, err) @@ -258,6 +264,7 @@ func TestPrefixPluginChatCompletionsGrowth(t *testing.T) { }, }, } + plugin.PrepareRequestData(context.Background(), req1, pods) scores := plugin.Score(context.Background(), types.NewCycleState(), req1, pods) state, err := plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req1.RequestId, plugins.StateKey(plugin.TypedName().String())) assert.NoError(t, err) @@ -293,6 +300,7 @@ func TestPrefixPluginChatCompletionsGrowth(t *testing.T) { }, }, } + plugin.PrepareRequestData(context.Background(), req2, pods) scores = plugin.Score(context.Background(), types.NewCycleState(), req2, pods) state, err = plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req2.RequestId, plugins.StateKey(plugin.TypedName().String())) assert.NoError(t, err) @@ -328,6 +336,7 @@ func TestPrefixPluginChatCompletionsGrowth(t *testing.T) { }, }, } + plugin.PrepareRequestData(context.Background(), req3, pods) scores = plugin.Score(context.Background(), types.NewCycleState(), req3, pods) state, err = plugins.ReadPluginStateKey[*SchedulingContextState](plugin.pluginState, req3.RequestId, plugins.StateKey(plugin.TypedName().String())) assert.NoError(t, err) @@ -387,6 +396,7 @@ func BenchmarkPrefixPluginStress(b *testing.B) { } b.ResetTimer() + plugin.PrepareRequestData(context.Background(), req, pods) // Benchmark the scoring operation scores := plugin.Score(context.Background(), types.NewCycleState(), req, pods) _ = scores // Use the result to prevent optimization @@ -468,8 +478,9 @@ func BenchmarkPrefixPluginChatCompletionsStress(b *testing.B) { } b.ResetTimer() - for i := 0; i < b.N; i++ { + for b.Loop() { // Benchmark the scoring operation + plugin.PrepareRequestData(context.Background(), req, pods) scores := plugin.Score(context.Background(), types.NewCycleState(), req, pods) _ = scores // Use the result to prevent optimization From acd9db0bc0e3a9da713d3e8372560eb921ed3e11 Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Tue, 11 Nov 2025 20:06:00 +0000 Subject: [PATCH 11/12] Add back stashed changes. Update outdated comments. --- pkg/epp/requestcontrol/director.go | 1 - pkg/epp/requestcontrol/director_test.go | 23 +++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index aaf7185b0..93c89e542 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -173,7 +173,6 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo snapshotOfCandidatePods := d.toSchedulerPodMetrics(candidatePods) // Prepare per request data by running PrepareData plugins. - // NOTE: Failure in prepare data plugins does not block the request processing. if d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) != nil { return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "failed to prepare request data"} } diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 8fcc0d5ff..e705beefd 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -106,6 +106,29 @@ func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool) return res } +type mockPrepareDataPlugin struct { + name string + produces map[string]any + consumes map[string]any +} + +func (m *mockPrepareDataPlugin) TypedName() plugins.TypedName { + return plugins.TypedName{Name: m.name, Type: "mock"} +} + +func (m *mockPrepareDataPlugin) Produces() map[string]any { + return m.produces +} + +func (m *mockPrepareDataPlugin) Consumes() map[string]any { + return m.consumes +} + +func (m *mockPrepareDataPlugin) PrepareRequestData(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error { + pods[0].Put(mockProducedDataKey, mockProducedDataType{value: 42}) + return nil +} + func newMockPrepareDataPlugin(name string) *mockPrepareDataPlugin { return &mockPrepareDataPlugin{ name: name, From 471c37e453a22a5365866889afc880ba334015a5 Mon Sep 17 00:00:00 2001 From: Rahul Gurnani Date: Wed, 12 Nov 2025 01:14:28 +0000 Subject: [PATCH 12/12] Update function names and remove extra methods. Also don't fail request if prepare data call fails --- pkg/epp/requestcontrol/director.go | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 93c89e542..e892eaf08 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -174,11 +174,12 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo // Prepare per request data by running PrepareData plugins. if d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) != nil { - return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "failed to prepare request data"} + // Don't fail the request if PrepareData plugins fail. + logger.V(logutil.DEFAULT).Error(err, "failed to prepare per request data") } // Run admit request plugins - if !d.withAdmissionPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) { + if !d.runAdmissionPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) { logger.V(logutil.DEFAULT).Info("Request cannot be admitted") return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "request cannot be admitted"} } @@ -352,18 +353,6 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling } } -// executePlugins executes PrepareDataPlugins sequentially. -// TODO: Change to DAG execution in the following PRs. -func (d *Director) executePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod, plugins []PrepareDataPlugin) error { - for _, plugin := range plugins { - err := prepareDataWithRetriesAndTimeout(plugin, ctx, request, pods) - if err != nil { - return err - } - } - return nil -} - // prepareDataWithRetriesAndTimeout executes the PrepareRequestData plugins with retries and timeout. func prepareDataWithRetriesAndTimeout(plugin PrepareDataPlugin, ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error { currentTimeout := prepareDataTimeout @@ -392,18 +381,21 @@ func prepareDataWithRetriesAndTimeout(plugin PrepareDataPlugin, ctx context.Cont return nil } +// TODO: Execute plugins in parallel once DAG execution is supported. +// runPrepareDataPlugins executes PrepareDataPlugins sequentially. func (d *Director) runPrepareDataPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error { - err := d.executePlugins(ctx, request, pods, d.requestControlPlugins.prepareDataPlugins) - if err != nil { - log.FromContext(ctx).Error(err, "failed to execute PrepareData plugins as DAG, falling back to parallel execution") - return err + for _, plugin := range d.requestControlPlugins.prepareDataPlugins { + err := prepareDataWithRetriesAndTimeout(plugin, ctx, request, pods) + if err != nil { + return err + } } return nil } -func (d *Director) withAdmissionPlugins(ctx context.Context, +func (d *Director) runAdmissionPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) bool { loggerDebug := log.FromContext(ctx).V(logutil.DEBUG) for _, plugin := range d.requestControlPlugins.admissionPlugins {