Skip to content

Commit c8b6ba5

Browse files
committed
Cleanup dead code, ensure all preparedata plugins execute
1 parent 1be7468 commit c8b6ba5

File tree

2 files changed

+8
-51
lines changed

2 files changed

+8
-51
lines changed

pkg/epp/requestcontrol/director.go

Lines changed: 8 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"math/rand"
2525
"net"
2626
"strings"
27-
"sync"
2827
"time"
2928

3029
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -354,30 +353,6 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling
354353
}
355354
}
356355

357-
// prepareData executes the PrepareRequestData plugins with retries and timeout.
358-
func prepareData(plugin PrepareDataPlugin, ctx context.Context, request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) {
359-
currentTimeout := prepareDataTimeout
360-
for i := 0; i <= prepareDataMaxRetries; i++ {
361-
done := make(chan struct{})
362-
go func() {
363-
defer close(done)
364-
plugin.PrepareRequestData(ctx, request, pods)
365-
}()
366-
367-
select {
368-
case <-done:
369-
// Plugin executed successfully
370-
return
371-
case <-time.After(currentTimeout):
372-
log.FromContext(ctx).V(logutil.DEBUG).Info("PrepareData plugin timed out, retrying...", "plugin", plugin.TypedName(), "retry", i+1, "timeout", currentTimeout)
373-
if i == prepareDataMaxRetries {
374-
log.FromContext(ctx).Error(nil, "PrepareData plugin failed after multiple retries", "plugin", plugin.TypedName())
375-
return
376-
}
377-
}
378-
}
379-
}
380-
381356
// executePluginsAsDAG executes PrepareData plugins as a DAG based on their dependencies asynchronously.
382357
// So, a plugin is executed only after all its dependencies have been executed.
383358
// If there is a cycle or other error in the DAG, it returns an error.
@@ -388,9 +363,9 @@ func (d *Director) executePluginsAsDAG(ctx context.Context, request *schedulingt
388363
if err != nil {
389364
return err
390365
}
391-
392366
// Execute the DAG
393-
// Initialize channels and nameToNode map
367+
368+
// Channels to signal plugin execution completion.
394369
pluginExecuted := map[string]chan struct{}{}
395370
nameToNode := map[string]PrepareDataPlugin{}
396371
for _, plugin := range plugins {
@@ -407,33 +382,25 @@ func (d *Director) executePluginsAsDAG(ctx context.Context, request *schedulingt
407382
}
408383
nameToNode[pluginName].PrepareRequestData(ctx, request, pods)
409384
// Signal that the plugin has been executed.
410-
<-pluginExecuted[pluginName]
385+
close(pluginExecuted[pluginName])
411386
}()
412387
}
388+
for pluginName := range dag {
389+
<-pluginExecuted[pluginName]
390+
}
413391
return nil
414392
}
415393

416394
func (d *Director) runPrepareDataPlugins(ctx context.Context,
417395
request *schedulingtypes.LLMRequest, pods []schedulingtypes.Pod) error {
396+
// Parallelly execute PrepareData for all the plugins. Some plugins might take time to prepare data e.g. latency predictor.
397+
// Failure in any prepareData doesn't block the request processing.
418398
err := d.executePluginsAsDAG(ctx, request, pods, d.requestControlPlugins.prepareDataPlugins)
419399
if err != nil {
420400
log.FromContext(ctx).Error(err, "failed to execute PrepareData plugins as DAG, falling back to parallel execution")
421401
return err
422402
}
423-
loggerDebug := log.FromContext(ctx).V(logutil.DEBUG)
424-
// Parallelly execute PrepareData for all the plugins. Some plugins might take time to prepare data e.g. latency predictor.
425-
// Failure in any prepareData doesn't block the request processing.
426-
var wg sync.WaitGroup
427403

428-
for _, plugin := range d.requestControlPlugins.prepareDataPlugins {
429-
loggerDebug.Info("Running PrepareData plugin", "plugin", plugin.TypedName())
430-
wg.Add(1)
431-
go func(p PrepareDataPlugin) {
432-
defer wg.Done()
433-
prepareData(p, ctx, request, pods)
434-
}(plugin)
435-
}
436-
wg.Wait()
437404
return nil
438405
}
439406

pkg/epp/requestcontrol/graph_util.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,6 @@ func buildDAG(plugins []PrepareDataPlugin) map[string][]string {
5353
// prepareDataGraph builds a DAG of data preparation plugins and checks for cycles.
5454
// If there is a cycle, it returns an error.
5555
func prepareDataGraph(plugins []PrepareDataPlugin) (map[string][]string, error) {
56-
nameToNode := map[string]PrepareDataPlugin{}
57-
58-
for _, node := range nameToNode {
59-
nameToNode[node.TypedName().String()] = node
60-
}
61-
// Channels to signal plugin execution completion.
62-
pluginExecuted := map[string]chan struct{}{}
63-
for _, plugin := range plugins {
64-
pluginExecuted[plugin.TypedName().String()] = make(chan struct{})
65-
}
6656
dag := buildDAG(plugins)
6757

6858
// Check for cycles in the DAG.

0 commit comments

Comments
 (0)