@@ -366,10 +366,10 @@ func (d *Director) executePluginsAsDAG(ctx context.Context, request *schedulingt
366366 // Execute the DAG
367367
368368 // Channels to signal plugin execution completion.
369- pluginExecuted := map [string ]chan struct {}{}
369+ pluginExecuted := make ( map [string ]chan error )
370370 nameToNode := map [string ]PrepareDataPlugin {}
371371 for _ , plugin := range plugins {
372- pluginExecuted [plugin .TypedName ().String ()] = make (chan struct {} )
372+ pluginExecuted [plugin .TypedName ().String ()] = make (chan error )
373373 nameToNode [plugin .TypedName ().String ()] = plugin
374374 }
375375
@@ -378,15 +378,52 @@ func (d *Director) executePluginsAsDAG(ctx context.Context, request *schedulingt
378378 // Wait for the dependencies to complete before executing a plugin.
379379 go func () {
380380 for _ , dep := range dependents {
381- <- pluginExecuted [dep ]
381+ err , open := <- pluginExecuted [dep ]
382+ if ! open {
383+ continue
384+ }
385+ if err != nil {
386+ // If a dependency failed, propagate the error and do not execute this plugin.
387+ pluginExecuted [pluginName ] <- fmt .Errorf ("dependency plugin %s failed: %w" , dep , err )
388+ }
382389 }
383- nameToNode [pluginName ].PrepareRequestData (ctx , request , pods )
384390 // Signal that the plugin has been executed.
385- close (pluginExecuted [pluginName ])
391+ defer close (pluginExecuted [pluginName ])
392+
393+ pluginExecuted [pluginName ] <- prepareDataWithRetriesAndTimeout (nameToNode [pluginName ], ctx , request , pods )
386394 }()
387395 }
388396 for pluginName := range dag {
389- <- pluginExecuted [pluginName ]
397+ err := <- pluginExecuted [pluginName ]
398+ if err != nil {
399+ return err
400+ }
401+ }
402+ return nil
403+ }
404+
405+ // prepareDataWithRetriesAndTimeout executes the PrepareRequestData plugins with retries and timeout.
406+ func prepareDataWithRetriesAndTimeout (plugin PrepareDataPlugin , ctx context.Context , request * schedulingtypes.LLMRequest , pods []schedulingtypes.Pod ) error {
407+ currentTimeout := prepareDataTimeout
408+ for i := 0 ; i <= prepareDataMaxRetries ; i ++ {
409+ errCh := make (chan error , 1 )
410+ go func () {
411+ errCh <- plugin .PrepareRequestData (ctx , request , pods )
412+ }()
413+
414+ select {
415+ case err := <- errCh :
416+ if err != nil {
417+ log .FromContext (ctx ).V (logutil .DEBUG ).Info ("PrepareData plugin failed, retrying..." , "plugin" , plugin .TypedName (), "retry" , i + 1 , "error" , err )
418+ continue
419+ }
420+ return nil // Success
421+ case <- time .After (currentTimeout ):
422+ log .FromContext (ctx ).V (logutil .DEBUG ).Info ("PrepareData plugin timed out, retrying..." , "plugin" , plugin .TypedName (), "retry" , i + 1 , "timeout" , currentTimeout )
423+ if i == prepareDataMaxRetries {
424+ return fmt .Errorf ("PrepareData plugin %s failed after %d retries" , plugin .TypedName ().String (), prepareDataMaxRetries )
425+ }
426+ }
390427 }
391428 return nil
392429}
0 commit comments