Skip to content

Commit dfea9cf

Browse files
committed
Add AdmitRequest and PrepareData plugins
1 parent 9cfe65f commit dfea9cf

File tree

5 files changed

+139
-1
lines changed

5 files changed

+139
-1
lines changed

pkg/epp/datalayer/attributemap.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"sync"
2121
)
2222

23+
// TODO(rahulgurnani): Deprecate this AttributeMap in favor of AttributeMap added in scheduling layer.
24+
2325
// Cloneable types support cloning of the value.
2426
type Cloneable interface {
2527
Clone() Cloneable

pkg/epp/requestcontrol/director.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/handlers"
3737
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
3838
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
39+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3940
schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
4041
errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error"
4142
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -167,8 +168,18 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
167168
logger.V(logutil.DEFAULT).Info("Request rejected by admission control", "error", err)
168169
return reqCtx, err
169170
}
171+
copyOfCandidatePods := d.toSchedulerPodMetrics(candidatePods)
170172

171-
result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, d.toSchedulerPodMetrics(candidatePods))
173+
// Prepare per request data
174+
d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, copyOfCandidatePods)
175+
176+
// Run admit request plugins
177+
if !d.runAdmitRequestPlugins(ctx, reqCtx.SchedulingRequest, copyOfCandidatePods) {
178+
logger.V(logutil.DEFAULT).Info("Request cannot be admitted")
179+
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "request cannot be admitted"}
180+
}
181+
182+
result, err := d.scheduler.Schedule(ctx, reqCtx.SchedulingRequest, copyOfCandidatePods)
172183
if err != nil {
173184
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
174185
}
@@ -333,6 +344,29 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling
333344
}
334345
}
335346

347+
func (d *Director) runPrepareDataPlugins(ctx context.Context,
348+
request *schedulingtypes.LLMRequest, pods []types.Pod) {
349+
loggerDebug := log.FromContext(ctx).V(logutil.DEBUG)
350+
for _, plugin := range d.requestControlPlugins.prepareDataPlugins {
351+
loggerDebug.Info("Running PrepareData plugin", "plugin", plugin.TypedName())
352+
plugin.PrepareData(ctx, request, pods)
353+
loggerDebug.Info("Completed running PrepareData plugin successfully", "plugin", plugin.TypedName())
354+
}
355+
}
356+
357+
func (d *Director) runAdmitRequestPlugins(ctx context.Context,
358+
request *schedulingtypes.LLMRequest, pods []types.Pod) bool {
359+
loggerDebug := log.FromContext(ctx).V(logutil.DEBUG)
360+
for _, plugin := range d.requestControlPlugins.admitRequestPlugins {
361+
loggerDebug.Info("Running AdmitRequest plugin", "plugin", plugin.TypedName())
362+
if !plugin.Admit(ctx, request, pods) {
363+
return false
364+
}
365+
loggerDebug.Info("Completed running AdmitRequest plugin successfully", "plugin", plugin.TypedName())
366+
}
367+
return true
368+
}
369+
336370
func (d *Director) runResponseReceivedPlugins(ctx context.Context, request *schedulingtypes.LLMRequest, response *Response, targetPod *backend.Pod) {
337371
loggerDebug := log.FromContext(ctx).V(logutil.DEBUG)
338372
for _, plugin := range d.requestControlPlugins.responseReceivedPlugins {

pkg/epp/requestcontrol/plugins.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,15 @@ type ResponseComplete interface {
5757
plugins.Plugin
5858
ResponseComplete(ctx context.Context, request *types.LLMRequest, response *Response, targetPod *backend.Pod)
5959
}
60+
61+
// PrepareData is called by the director before scheduling requests.
62+
type PrepareData interface {
63+
plugins.Plugin
64+
PrepareData(ctx context.Context, request *types.LLMRequest, pods []types.Pod)
65+
}
66+
67+
// AdmitRequest is called by the director after the PrepareData plugins and before scheduling.
68+
type AdmitRequest interface {
69+
plugins.Plugin
70+
Admit(ctx context.Context, request *types.LLMRequest, pods []types.Pod) bool
71+
}

pkg/epp/requestcontrol/request_control_config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
// NewConfig creates a new Config object and returns its pointer.
2424
func NewConfig() *Config {
2525
return &Config{
26+
admitRequestPlugins: []AdmitRequest{},
27+
prepareDataPlugins: []PrepareData{},
2628
preRequestPlugins: []PreRequest{},
2729
responseReceivedPlugins: []ResponseReceived{},
2830
responseStreamingPlugins: []ResponseStreaming{},
@@ -32,6 +34,8 @@ func NewConfig() *Config {
3234

3335
// Config provides a configuration for the requestcontrol plugins.
3436
type Config struct {
37+
admitRequestPlugins []AdmitRequest
38+
prepareDataPlugins []PrepareData
3539
preRequestPlugins []PreRequest
3640
responseReceivedPlugins []ResponseReceived
3741
responseStreamingPlugins []ResponseStreaming

pkg/epp/scheduling/types/types.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"errors"
2222
"fmt"
2323
"strings"
24+
"sync"
2425

2526
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
2627
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
@@ -191,6 +192,9 @@ type Pod interface {
191192
GetPod() *backend.Pod
192193
GetMetrics() *backendmetrics.MetricsState
193194
String() string
195+
Put(key string, value Cloneable)
196+
Get(key string) (Cloneable, bool)
197+
Keys() []string
194198
}
195199

196200
type ScoredPod struct {
@@ -217,6 +221,19 @@ func (pm *PodMetrics) GetMetrics() *backendmetrics.MetricsState {
217221
type PodMetrics struct {
218222
*backend.Pod
219223
*backendmetrics.MetricsState
224+
AttributeMap
225+
}
226+
227+
func (pm *PodMetrics) Put(key string, value Cloneable) {
228+
pm.AttributeMap.Put(key, value)
229+
}
230+
231+
func (pm *PodMetrics) Get(key string) (Cloneable, bool) {
232+
return pm.AttributeMap.Get(key)
233+
}
234+
235+
func (pm *PodMetrics) Keys() []string {
236+
return pm.AttributeMap.Keys()
220237
}
221238

222239
// ProfileRunResult captures the profile run result.
@@ -229,3 +246,72 @@ type SchedulingResult struct {
229246
ProfileResults map[string]*ProfileRunResult
230247
PrimaryProfileName string
231248
}
249+
250+
// Cloneable types support cloning of the value.
251+
type Cloneable interface {
252+
Clone() Cloneable
253+
}
254+
255+
// AttributeMap is used to store flexible metadata or traits
256+
// across different aspects of an inference server.
257+
// Stored values must be Cloneable.
258+
type AttributeMap interface {
259+
Put(string, Cloneable)
260+
Get(string) (Cloneable, bool)
261+
Keys() []string
262+
}
263+
264+
// Attributes provides a goroutine-safe implementation of AttributeMap.
265+
type Attributes struct {
266+
data sync.Map // key: attribute name (string), value: attribute value (opaque, Cloneable)
267+
}
268+
269+
// NewAttributes returns a new instance of Attributes.
270+
func NewAttributes() *Attributes {
271+
return &Attributes{}
272+
}
273+
274+
// Put adds or updates an attribute in the map.
275+
func (a *Attributes) Put(key string, value Cloneable) {
276+
if value != nil {
277+
a.data.Store(key, value) // TODO: Clone into map to ensure isolation
278+
}
279+
}
280+
281+
// Get retrieves an attribute by key, returning a cloned copy.
282+
func (a *Attributes) Get(key string) (Cloneable, bool) {
283+
val, ok := a.data.Load(key)
284+
if !ok {
285+
return nil, false
286+
}
287+
if cloneable, ok := val.(Cloneable); ok {
288+
return cloneable.Clone(), true
289+
}
290+
return nil, false
291+
}
292+
293+
// Keys returns all keys in the attribute map.
294+
func (a *Attributes) Keys() []string {
295+
var keys []string
296+
a.data.Range(func(key, _ any) bool {
297+
if sk, ok := key.(string); ok {
298+
keys = append(keys, sk)
299+
}
300+
return true
301+
})
302+
return keys
303+
}
304+
305+
// Clone creates a deep copy of the entire attribute map.
306+
func (a *Attributes) Clone() *Attributes {
307+
clone := NewAttributes()
308+
a.data.Range(func(key, value any) bool {
309+
if sk, ok := key.(string); ok {
310+
if v, ok := value.(Cloneable); ok {
311+
clone.Put(sk, v)
312+
}
313+
}
314+
return true
315+
})
316+
return clone
317+
}

0 commit comments

Comments
 (0)