@@ -60,27 +60,13 @@ type SaturationDetector interface {
6060 IsSaturated (ctx context.Context , candidatePods []backendmetrics.PodMetrics ) bool
6161}
6262
63- type RequestControlPlugins struct {
64- preRequestPlugins []PreRequest
65- responseReceivedPlugins []ResponseReceived
66- responseStreamingPlugins []ResponseStreaming
67- responseCompletePlugins []ResponseComplete
68- }
69-
7063// NewDirectorWithConfig creates a new Director instance with all dependencies.
7164func NewDirectorWithConfig (datastore Datastore , scheduler Scheduler , saturationDetector SaturationDetector , config * Config ) * Director {
72- RCPlugins := RequestControlPlugins {
73- preRequestPlugins : config .preRequestPlugins ,
74- responseReceivedPlugins : config .responseReceivedPlugins ,
75- responseStreamingPlugins : config .responseStreamingPlugins ,
76- responseCompletePlugins : config .responseCompletePlugins ,
77- }
78-
7965 return & Director {
8066 datastore : datastore ,
8167 scheduler : scheduler ,
8268 saturationDetector : saturationDetector ,
83- requestControlPlugins : RCPlugins ,
69+ requestControlPlugins : * config ,
8470 defaultPriority : 0 , // define default priority explicitly
8571 }
8672}
@@ -90,7 +76,7 @@ type Director struct {
9076 datastore Datastore
9177 scheduler Scheduler
9278 saturationDetector SaturationDetector
93- requestControlPlugins RequestControlPlugins
79+ requestControlPlugins Config
9480 // we just need a pointer to an int variable since priority is a pointer in InferenceObjective
9581 // no need to set this in the constructor, since the value we want is the default int val
9682 // and value types cannot be nil
@@ -291,7 +277,7 @@ func (d *Director) toSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []sch
291277 return pm
292278}
293279
294- // HandleResponseReceived is called when the first chunk of the response arrives .
280+ // HandleResponseReceived is called when the response headers are received .
295281func (d * Director ) HandleResponseReceived (ctx context.Context , reqCtx * handlers.RequestContext ) (* handlers.RequestContext , error ) {
296282 response := & Response {
297283 RequestId : reqCtx .Request .Headers [requtil .RequestIdHeaderKey ],
@@ -347,22 +333,22 @@ func (d *Director) runPreRequestPlugins(ctx context.Context, request *scheduling
347333 schedulingResult * schedulingtypes.SchedulingResult , targetPort int ) {
348334 loggerDebug := log .FromContext (ctx ).V (logutil .DEBUG )
349335 for _ , plugin := range d .requestControlPlugins .preRequestPlugins {
350- loggerDebug .Info ("Running pre-request plugin" , "plugin" , plugin .TypedName ())
336+ loggerDebug .Info ("Running PreRequest plugin" , "plugin" , plugin .TypedName ())
351337 before := time .Now ()
352338 plugin .PreRequest (ctx , request , schedulingResult , targetPort )
353339 metrics .RecordPluginProcessingLatency (PreRequestExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
354- loggerDebug .Info ("Completed running pre-request plugin successfully" , "plugin" , plugin .TypedName ())
340+ loggerDebug .Info ("Completed running PreRequest plugin successfully" , "plugin" , plugin .TypedName ())
355341 }
356342}
357343
358344func (d * Director ) runResponseReceivedPlugins (ctx context.Context , request * schedulingtypes.LLMRequest , response * Response , targetPod * backend.Pod ) {
359345 loggerDebug := log .FromContext (ctx ).V (logutil .DEBUG )
360346 for _ , plugin := range d .requestControlPlugins .responseReceivedPlugins {
361- loggerDebug .Info ("Running post-response plugin" , "plugin" , plugin .TypedName ())
347+ loggerDebug .Info ("Running ResponseReceived plugin" , "plugin" , plugin .TypedName ())
362348 before := time .Now ()
363349 plugin .ResponseReceived (ctx , request , response , targetPod )
364350 metrics .RecordPluginProcessingLatency (ResponseReceivedExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
365- loggerDebug .Info ("Completed running post-response plugin successfully" , "plugin" , plugin .TypedName ())
351+ loggerDebug .Info ("Completed running ResponseReceived plugin successfully" , "plugin" , plugin .TypedName ())
366352 }
367353}
368354
@@ -373,16 +359,17 @@ func (d *Director) runResponseStreamingPlugins(ctx context.Context, request *sch
373359 before := time .Now ()
374360 plugin .ResponseStreaming (ctx , request , response , targetPod )
375361 metrics .RecordPluginProcessingLatency (ResponseStreamingExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
362+ loggerTrace .Info ("Completed running ResponseStreaming plugin successfully" , "plugin" , plugin .TypedName ())
376363 }
377364}
378365
379366func (d * Director ) runResponseCompletePlugins (ctx context.Context , request * schedulingtypes.LLMRequest , response * Response , targetPod * backend.Pod ) {
380367 loggerDebug := log .FromContext (ctx ).V (logutil .DEBUG )
381368 for _ , plugin := range d .requestControlPlugins .responseCompletePlugins {
382- loggerDebug .Info ("Running post-response complete plugin" , "plugin" , plugin .TypedName ())
369+ loggerDebug .Info ("Running ResponseComplete plugin" , "plugin" , plugin .TypedName ())
383370 before := time .Now ()
384371 plugin .ResponseComplete (ctx , request , response , targetPod )
385372 metrics .RecordPluginProcessingLatency (ResponseCompleteExtensionPoint , plugin .TypedName ().Type , plugin .TypedName ().Name , time .Since (before ))
386- loggerDebug .Info ("Completed running post-response complete plugin successfully" , "plugin" , plugin .TypedName ())
373+ loggerDebug .Info ("Completed running ResponseComplete plugin successfully" , "plugin" , plugin .TypedName ())
387374 }
388375}
0 commit comments