Skip to content

Commit 13b8c32

Browse files
committed
Execute prepare data plugins sequentially with retries and timeout. Also added more tests and some refactoring
1 parent 4bbfcbe commit 13b8c32

File tree

6 files changed

+90
-83
lines changed

6 files changed

+90
-83
lines changed

pkg/epp/backend/metrics/types.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,4 @@ func (f *PodMetricsFactory) ReleaseEndpoint(ep PodMetrics) {
7575
}
7676
}
7777

78-
func (f *PodMetricsFactory) GetAttributes() *datalayer.Attributes {
79-
return datalayer.NewAttributes()
80-
}
81-
8278
type PodMetrics = datalayer.Endpoint

pkg/epp/requestcontrol/director.go

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"math/rand"
2525
"net"
2626
"strings"
27-
"sync"
2827
"time"
2928

3029
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -175,7 +174,9 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
175174

176175
// Prepare per request data by running PrepareData plugins.
177176
// NOTE: Failure in prepare data plugins does not block the request processing.
178-
d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods)
177+
if d.runPrepareDataPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) != nil {
178+
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "failed to prepare request data"}
179+
}
179180

180181
// Run admit request plugins
181182
if !d.withAdmissionPlugins(ctx, reqCtx.SchedulingRequest, snapshotOfCandidatePods) {
@@ -352,45 +353,57 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling
352353
}
353354
}
354355

355-
// prepareData executes the PrepareRequestData plugins with retries and timeout.
356-
func prepareData(plugin DataProducer, ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) {
356+
// executePlugins executes PrepareDataPlugins sequentially.
357+
// TODO: Change to DAG execution in the following PRs.
358+
func (d *Director) executePlugins(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod, plugins []PrepareDataPlugin) error {
359+
for _, plugin := range plugins {
360+
err := prepareDataWithRetriesAndTimeout(plugin, ctx, request, pods)
361+
if err != nil {
362+
return err
363+
}
364+
}
365+
return nil
366+
}
367+
368+
// prepareDataWithRetriesAndTimeout executes the PrepareRequestData plugins with retries and timeout.
369+
func prepareDataWithRetriesAndTimeout(plugin PrepareDataPlugin, ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
357370
currentTimeout := prepareDataTimeout
358371
for i := 0; i <= prepareDataMaxRetries; i++ {
359-
done := make(chan struct{})
372+
errCh := make(chan error, 1)
360373
go func() {
361-
defer close(done)
362-
plugin.PrepareRequestData(ctx, request, pods)
374+
errCh <- plugin.PrepareRequestData(ctx, request, pods)
363375
}()
364376

365377
select {
366-
case <-done:
367-
// Plugin executed successfully
368-
return
378+
case <-ctx.Done():
379+
return ctx.Err()
380+
case err := <-errCh:
381+
if err != nil {
382+
log.FromContext(ctx).V(logutil.DEBUG).Info("PrepareData plugin failed, retrying...", "plugin", plugin.TypedName(), "retry", i+1, "error", err)
383+
continue
384+
}
385+
return nil // Success
369386
case <-time.After(currentTimeout):
370387
log.FromContext(ctx).V(logutil.DEBUG).Info("PrepareData plugin timed out, retrying...", "plugin", plugin.TypedName(), "retry", i+1, "timeout", currentTimeout)
371388
if i == prepareDataMaxRetries {
372-
log.FromContext(ctx).Error(nil, "PrepareData plugin failed after multiple retries", "plugin", plugin.TypedName())
373-
return
389+
return fmt.Errorf("PrepareData plugin %s failed after %d retries", plugin.TypedName().String(), prepareDataMaxRetries)
374390
}
375391
}
376392
}
393+
return nil
377394
}
378395

379396
func (d *Director) runPrepareDataPlugins(ctx context.Context,
380-
request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) {
381-
loggerDebug := log.FromContext(ctx).V(logutil.DEBUG)
397+
request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
382398
// Parallelly execute PrepareData for all the plugins. Some plugins might take time to prepare data e.g. latency predictor.
383399
// Failure in any prepareData doesn't block the request processing.
384-
var wg sync.WaitGroup
385-
for _, plugin := range d.requestControlPlugins.dataProducerPlugins {
386-
loggerDebug.Info("Running PrepareData plugin", "plugin", plugin.TypedName())
387-
wg.Add(1)
388-
go func(p DataProducer) {
389-
defer wg.Done()
390-
prepareData(p, ctx, request, pods)
391-
}(plugin)
400+
err := d.executePlugins(ctx, request, pods, d.requestControlPlugins.prepareDataPlugins)
401+
if err != nil {
402+
log.FromContext(ctx).Error(err, "failed to execute PrepareData plugins as DAG, falling back to parallel execution")
403+
return err
392404
}
393-
wg.Wait()
405+
406+
return nil
394407
}
395408

396409
func (d *Director) withAdmissionPlugins(ctx context.Context,

pkg/epp/requestcontrol/director_test.go

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"maps"
2324
"testing"
2425
"time"
2526

@@ -105,38 +106,23 @@ func (ds *mockDatastore) PodList(predicate func(backendmetrics.PodMetrics) bool)
105106
return res
106107
}
107108

108-
type mockDataProducerPlugin struct {
109-
tn plugins.TypedName
110-
}
111-
112-
func newMockDataProducerPlugin(name string) *mockDataProducerPlugin {
113-
return &mockDataProducerPlugin{
114-
tn: plugins.TypedName{Type: "mock-prepare-request-data", Name: name},
109+
func newMockPrepareDataPlugin(name string) *mockPrepareDataPlugin {
110+
return &mockPrepareDataPlugin{
111+
name: name,
112+
produces: map[string]any{mockProducedDataKey: 0},
113+
consumes: map[string]any{},
115114
}
116115
}
117116

118-
func (m *mockDataProducerPlugin) TypedName() plugins.TypedName {
119-
return m.tn
120-
}
121-
122-
func (m *mockDataProducerPlugin) Produces() map[string]any {
123-
// Produces data of type int, 0 denotes it is int.
124-
return map[string]any{mockProducedDataKey: 0}
125-
}
126-
127-
func (m *mockDataProducerPlugin) PrepareRequestData(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) {
128-
pods[0].Put(mockProducedDataKey, mockProducedDataType{value: 42})
129-
}
130-
131117
type mockAdmissionPlugin struct {
132-
tn plugins.TypedName
133-
// TODO: Replace this will admission control.
134-
admitRequestCalled bool
118+
tn plugins.TypedName
119+
denialError error
135120
}
136121

137-
func newMockAdmissionPlugin(name string) *mockAdmissionPlugin {
122+
func newMockAdmissionPlugin(name string, denialError error) *mockAdmissionPlugin {
138123
return &mockAdmissionPlugin{
139-
tn: plugins.TypedName{Type: "mock-admit-data", Name: name},
124+
tn: plugins.TypedName{Type: "mock-admit-data", Name: name},
125+
denialError: denialError,
140126
}
141127
}
142128

@@ -145,8 +131,7 @@ func (m *mockAdmissionPlugin) TypedName() plugins.TypedName {
145131
}
146132

147133
func (m *mockAdmissionPlugin) AdmitRequest(ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
148-
m.admitRequestCalled = true
149-
return nil
134+
return m.denialError
150135
}
151136

152137
type mockProducedDataType struct {
@@ -279,9 +264,8 @@ func TestDirector_HandleRequest(t *testing.T) {
279264
wantReqCtx *handlers.RequestContext // Fields to check in the returned RequestContext
280265
wantMutatedBodyModel string // Expected model in reqCtx.Request.Body after PostDispatch
281266
targetModelName string // Expected model name after target model resolution
282-
admitRequestCalled bool
283-
dataProducerPlugin *mockDataProducerPlugin
284-
admissionPlugin *mockAdmissionPlugin
267+
admitRequestDenialError error // Expected denial error from admission plugin
268+
prepareDataPlugin *mockPrepareDataPlugin
285269
}{
286270
{
287271
name: "successful completions request",
@@ -364,7 +348,7 @@ func TestDirector_HandleRequest(t *testing.T) {
364348
},
365349
wantMutatedBodyModel: model,
366350
targetModelName: model,
367-
dataProducerPlugin: newMockDataProducerPlugin("test-plugin"),
351+
prepareDataPlugin: newMockPrepareDataPlugin("test-plugin"),
368352
},
369353
{
370354
name: "successful chat completions request with admit request plugins",
@@ -391,10 +375,29 @@ func TestDirector_HandleRequest(t *testing.T) {
391375
},
392376
TargetEndpoint: "192.168.1.100:8000,192.168.2.100:8000,192.168.4.100:8000",
393377
},
394-
wantMutatedBodyModel: model,
395-
targetModelName: model,
396-
admitRequestCalled: true,
397-
admissionPlugin: newMockAdmissionPlugin("test-plugin"),
378+
wantMutatedBodyModel: model,
379+
targetModelName: model,
380+
admitRequestDenialError: nil,
381+
},
382+
{
383+
name: "denied request by admit request plugin",
384+
reqBodyMap: map[string]any{
385+
"model": model,
386+
"messages": []any{
387+
map[string]any{
388+
"role": "user",
389+
"content": "critical prompt",
390+
},
391+
},
392+
},
393+
mockAdmissionController: &mockAdmissionController{admitErr: nil},
394+
schedulerMockSetup: func(m *mockScheduler) {
395+
m.scheduleResults = defaultSuccessfulScheduleResults
396+
},
397+
wantMutatedBodyModel: model,
398+
targetModelName: model,
399+
admitRequestDenialError: errors.New("denied by admit plugin"),
400+
wantErrCode: errutil.Internal,
398401
},
399402
{
400403
name: "successful chat completions request with multiple messages",
@@ -546,12 +549,10 @@ func TestDirector_HandleRequest(t *testing.T) {
546549
test.schedulerMockSetup(mockSched)
547550
}
548551
config := NewConfig()
549-
if test.dataProducerPlugin != nil {
550-
config = config.WithDataProducers(test.dataProducerPlugin)
551-
}
552-
if test.admissionPlugin != nil {
553-
config = config.WithAdmissionPlugins(test.admissionPlugin)
552+
if test.prepareDataPlugin != nil {
553+
config = config.WithPrepareDataPlugins(test.prepareDataPlugin)
554554
}
555+
config = config.WithAdmissionPlugins(newMockAdmissionPlugin("test-admit-plugin", test.admitRequestDenialError))
555556
director := NewDirectorWithConfig(ds, mockSched, test.mockAdmissionController, config)
556557

557558
reqCtx := &handlers.RequestContext{
@@ -566,9 +567,7 @@ func TestDirector_HandleRequest(t *testing.T) {
566567
TargetModelName: test.targetModelName,
567568
}
568569
// Deep copy the body map.
569-
for k, v := range test.reqBodyMap {
570-
reqCtx.Request.Body[k] = v
571-
}
570+
maps.Copy(reqCtx.Request.Body, test.reqBodyMap)
572571

573572
returnedReqCtx, err := director.HandleRequest(ctx, reqCtx)
574573

@@ -596,9 +595,6 @@ func TestDirector_HandleRequest(t *testing.T) {
596595
assert.Equal(t, test.wantMutatedBodyModel, returnedReqCtx.Request.Body["model"],
597596
"Mutated reqCtx.Request.Body model mismatch")
598597
}
599-
if test.admissionPlugin != nil {
600-
assert.True(t, test.admissionPlugin.admitRequestCalled, "AdmitRequest not called")
601-
}
602598
})
603599
}
604600
}

pkg/epp/requestcontrol/plugins.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,11 @@ type ResponseComplete interface {
5959
}
6060

6161
// PrepareRequestData is called by the director before scheduling requests.
62-
// DataProducer plugin is implemented by data producers which produce data from different sources.
63-
type DataProducer interface {
62+
// PrepareDataPlugin plugin is implemented by data producers which produce data from different sources.
63+
type PrepareDataPlugin interface {
6464
plugins.ProducerPlugin
65-
PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod)
65+
plugins.ConsumerPlugin
66+
PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) error
6667
}
6768

6869
// AdmissionPlugin is called by the director after the prepare data phase and before scheduling.

pkg/epp/requestcontrol/request_control_config.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
func NewConfig() *Config {
2525
return &Config{
2626
admissionPlugins: []AdmissionPlugin{},
27-
dataProducerPlugins: []DataProducer{},
27+
prepareDataPlugins: []PrepareDataPlugin{},
2828
preRequestPlugins: []PreRequest{},
2929
responseReceivedPlugins: []ResponseReceived{},
3030
responseStreamingPlugins: []ResponseStreaming{},
@@ -35,7 +35,7 @@ func NewConfig() *Config {
3535
// Config provides a configuration for the requestcontrol plugins.
3636
type Config struct {
3737
admissionPlugins []AdmissionPlugin
38-
dataProducerPlugins []DataProducer
38+
prepareDataPlugins []PrepareDataPlugin
3939
preRequestPlugins []PreRequest
4040
responseReceivedPlugins []ResponseReceived
4141
responseStreamingPlugins []ResponseStreaming
@@ -70,9 +70,9 @@ func (c *Config) WithResponseCompletePlugins(plugins ...ResponseComplete) *Confi
7070
return c
7171
}
7272

73-
// WithDataProducers sets the given plugins as the PrepareData plugins.
74-
func (c *Config) WithDataProducers(plugins ...DataProducer) *Config {
75-
c.dataProducerPlugins = plugins
73+
// WithPrepareDataPlugins sets the given plugins as the PrepareData plugins.
74+
func (c *Config) WithPrepareDataPlugins(plugins ...PrepareDataPlugin) *Config {
75+
c.prepareDataPlugins = plugins
7676
return c
7777
}
7878

@@ -85,7 +85,6 @@ func (c *Config) WithAdmissionPlugins(plugins ...AdmissionPlugin) *Config {
8585
// AddPlugins adds the given plugins to the Config.
8686
// The type of each plugin is checked and added to the corresponding list of plugins in the Config.
8787
// If a plugin implements multiple plugin interfaces, it will be added to each corresponding list.
88-
8988
func (c *Config) AddPlugins(pluginObjects ...plugins.Plugin) {
9089
for _, plugin := range pluginObjects {
9190
if preRequestPlugin, ok := plugin.(PreRequest); ok {
@@ -100,5 +99,8 @@ func (c *Config) AddPlugins(pluginObjects ...plugins.Plugin) {
10099
if responseCompletePlugin, ok := plugin.(ResponseComplete); ok {
101100
c.responseCompletePlugins = append(c.responseCompletePlugins, responseCompletePlugin)
102101
}
102+
if prepareDataPlugin, ok := plugin.(PrepareDataPlugin); ok {
103+
c.prepareDataPlugins = append(c.prepareDataPlugins, prepareDataPlugin)
104+
}
103105
}
104106
}

pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,6 @@ func (p *Plugin) Score(ctx context.Context, cycleState *types.CycleState, reques
209209
log.FromContext(ctx).V(logutil.TRACE).Info("prefix cached state", "cached-servers", state.PrefixCacheServers, "hashes", state.PrefixHashes)
210210
// calculate the scores of pods
211211
scores := make(map[types.Pod]float64, len(pods))
212-
213212
total := len(state.PrefixHashes)
214213
podScoreFunc := func(pod types.Pod) float64 {
215214
if total == 0 {

0 commit comments

Comments
 (0)