Skip to content

Commit f28020a

Browse files
committed
Implement error propagation for prepare data plugins. Also add retry timeout logic back since it got removed in previous commit.
1 parent c8b6ba5 commit f28020a

File tree

3 files changed

+51
-9
lines changed

3 files changed

+51
-9
lines changed

pkg/epp/requestcontrol/director.go

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -366,27 +366,68 @@ func (d *Director) executePluginsAsDAG(ctx context.Context, request *schedulingt
366366
// Execute the DAG
367367

368368
// Channels to signal plugin execution completion.
369-
pluginExecuted := map[string]chan struct{}{}
369+
pluginExecuted := make(map[string]chan error)
370370
nameToNode := map[string]PrepareDataPlugin{}
371371
for _, plugin := range plugins {
372-
pluginExecuted[plugin.TypedName().String()] = make(chan struct{})
372+
pluginExecuted[plugin.TypedName().String()] = make(chan error)
373373
nameToNode[plugin.TypedName().String()] = plugin
374374
}
375375

376376
for pluginName, dependents := range dag {
377377
// Execute plugins based on dependencies.
378378
// Wait for the dependencies to complete before executing a plugin.
379-
go func() {
379+
go func() error {
380380
for _, dep := range dependents {
381-
<-pluginExecuted[dep]
381+
err, open := <-pluginExecuted[dep]
382+
if !open {
383+
continue
384+
}
385+
if err != nil {
386+
// If a dependency failed, propagate the error and do not execute this plugin.
387+
pluginExecuted[pluginName] <- fmt.Errorf("dependency plugin %s failed: %w", dep, err)
388+
return err
389+
}
382390
}
383-
nameToNode[pluginName].PrepareRequestData(ctx, request, pods)
384391
// Signal that the plugin has been executed.
385-
close(pluginExecuted[pluginName])
392+
defer close(pluginExecuted[pluginName])
393+
394+
return prepareDataWithRetriesAndTimeout(nameToNode[pluginName], ctx, request, pods)
386395
}()
387396
}
388397
for pluginName := range dag {
389-
<-pluginExecuted[pluginName]
398+
err, open := <-pluginExecuted[pluginName]
399+
if !open {
400+
continue
401+
}
402+
if err != nil {
403+
return err
404+
}
405+
}
406+
return nil
407+
}
408+
409+
// prepareDataWithRetriesAndTimeout executes the PrepareRequestData plugins with retries and timeout.
410+
func prepareDataWithRetriesAndTimeout(plugin PrepareDataPlugin, ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
411+
currentTimeout := prepareDataTimeout
412+
for i := 0; i <= prepareDataMaxRetries; i++ {
413+
errCh := make(chan error, 1)
414+
go func() {
415+
errCh <- plugin.PrepareRequestData(ctx, request, pods)
416+
}()
417+
418+
select {
419+
case err := <-errCh:
420+
if err != nil {
421+
log.FromContext(ctx).V(logutil.DEBUG).Info("PrepareData plugin failed, retrying...", "plugin", plugin.TypedName(), "retry", i+1, "error", err)
422+
continue
423+
}
424+
return nil // Success
425+
case <-time.After(currentTimeout):
426+
log.FromContext(ctx).V(logutil.DEBUG).Info("PrepareData plugin timed out, retrying...", "plugin", plugin.TypedName(), "retry", i+1, "timeout", currentTimeout)
427+
if i == prepareDataMaxRetries {
428+
return fmt.Errorf("PrepareData plugin %s failed after %d retries", plugin.TypedName().String(), prepareDataMaxRetries)
429+
}
430+
}
390431
}
391432
return nil
392433
}

pkg/epp/requestcontrol/graph_util_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ func (m *mockPrepareDataPlugin) Consumes() map[string]any {
4444
return m.consumes
4545
}
4646

47-
func (m *mockPrepareDataPlugin) PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) {
47+
func (m *mockPrepareDataPlugin) PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) error {
4848
pods[0].Put(mockProducedDataKey, mockProducedDataType{value: 42})
49+
return nil
4950
}
5051

5152
func TestPrepareDataGraph(t *testing.T) {

pkg/epp/requestcontrol/plugins.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ type ResponseComplete interface {
6363
type PrepareDataPlugin interface {
6464
plugins.ProducerPlugin
6565
plugins.ConsumerPlugin
66-
PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod)
66+
PrepareRequestData(ctx context.Context, request *types.LLMRequest, pods []types.Pod) error
6767
}
6868

6969
// AdmissionPlugin is called by the director after the prepare data phase and before scheduling.

0 commit comments

Comments
 (0)