@@ -17,6 +17,7 @@ limitations under the License.
1717package main
1818
1919import (
20+ "context"
2021 "flag"
2122 "fmt"
2223 "net"
@@ -41,8 +42,10 @@ import (
4142 "sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4243 backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4344 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
45+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller"
4446 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4547 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
48+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4649 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4750 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
4851 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
@@ -157,13 +160,20 @@ func run() error {
157160 })
158161 setupLog .Info ("Flags processed" , "flags" , flags )
159162
160- // Init runtime.
163+ // --- Load Configurations from Environment Variables ---
164+ // Note: Scheduler config is loaded via its package init currently. We may
165+ // want to load it here explicitly:
166+ fcConfig , flowControllerEnabled := flowcontroller .LoadConfigFromEnv ()
167+ sdConfig := saturationdetector .LoadConfigFromEnv ()
168+
169+ // --- Get Kubernetes Config ---
161170 cfg , err := ctrl .GetConfig ()
162171 if err != nil {
163- setupLog .Error (err , "Failed to get rest config" )
172+ setupLog .Error (err , "Failed to get Kubernetes rest config" )
164173 return err
165174 }
166175
176+ // --- Setup Manager ---
167177 poolNamespacedName := types.NamespacedName {
168178 Name : * poolName ,
169179 Namespace : * poolNamespace ,
@@ -174,7 +184,7 @@ func run() error {
174184 return err
175185 }
176186
177- // Set up mapper for metric scraping.
187+ // --- Setup Datastore ---
178188 mapping , err := backendmetrics .NewMetricMapping (
179189 * totalQueuedRequestsMetric ,
180190 * kvCacheUsagePercentageMetric ,
@@ -185,14 +195,12 @@ func run() error {
185195 return err
186196 }
187197 verifyMetricMapping (* mapping , setupLog )
188-
189198 pmf := backendmetrics .NewPodMetricsFactory (& backendmetrics.PodMetricsClientImpl {MetricMapping : mapping }, * refreshMetricsInterval )
190- // Setup runner.
191199 ctx := ctrl .SetupSignalHandler ()
200+ appDatastore := datastore .NewDatastore (ctx , pmf )
192201
193- datastore := datastore .NewDatastore (ctx , pmf )
194-
195- scheduler := scheduling .NewScheduler (datastore )
202+ // --- Initialize EPP Components ---
203+ appScheduler := scheduling .NewScheduler (appDatastore )
196204 if schedulerV2 == "true" {
197205 queueScorerWeight := envutil .GetEnvInt ("QUEUE_SCORE_WEIGHT" , scorer .DefaultQueueScorerWeight , setupLog )
198206 kvCacheScorerWeight := envutil .GetEnvInt ("KV_CACHE_SCORE_WEIGHT" , scorer .DefaultKVCacheScorerWeight , setupLog )
@@ -213,41 +221,78 @@ func run() error {
213221 []plugins.PostSchedule {},
214222 []plugins.PostResponse {},
215223 schedConfigOpts ... )
216- scheduler = scheduling .NewSchedulerWithConfig (datastore , schedulerConfig )
224+ appScheduler = scheduling .NewSchedulerWithConfig (appDatastore , schedulerConfig )
225+ }
226+
227+ appSaturationDetector , err := saturationdetector .NewDetector (
228+ * sdConfig ,
229+ appDatastore ,
230+ ctrl .Log .WithName ("saturation-detector" ),
231+ )
232+ if err != nil {
233+ setupLog .Error (err , "Failed to create SaturationDetector" )
234+ return err
235+ }
236+
237+ var appFlowController * flowcontroller.FlowController
238+ if flowControllerEnabled {
239+ appFlowController , err = flowcontroller .NewFlowController (
240+ appSaturationDetector ,
241+ fcConfig ,
242+ )
243+ if err != nil {
244+ setupLog .Error (err , "Failed to create FlowController" )
245+ return err
246+ }
247+ setupLog .Info ("FlowController enabled and initialized." )
248+ } else {
249+ setupLog .Info ("FlowController is disabled via configuration." )
217250 }
251+
252+ // --- Setup ExtProc Server Runner ---
218253 serverRunner := & runserver.ExtProcServerRunner {
219254 GrpcPort : * grpcPort ,
220255 DestinationEndpointHintMetadataNamespace : * destinationEndpointHintMetadataNamespace ,
221256 DestinationEndpointHintKey : * destinationEndpointHintKey ,
222257 PoolNamespacedName : poolNamespacedName ,
223- Datastore : datastore ,
258+ Datastore : appDatastore ,
224259 SecureServing : * secureServing ,
225260 CertPath : * certPath ,
226261 RefreshPrometheusMetricsInterval : * refreshPrometheusMetricsInterval ,
227- Scheduler : scheduler ,
262+ Scheduler : appScheduler ,
263+ FlowController : appFlowController , // Pass instance (can be nil)
264+ SaturationDetector : appSaturationDetector ,
265+ FlowControllerEnabled : flowControllerEnabled ,
228266 }
229267 if err := serverRunner .SetupWithManager (ctx , mgr ); err != nil {
230- setupLog .Error (err , "Failed to setup ext-proc controllers" )
268+ setupLog .Error (err , "Failed to setup EPP controllers" )
269+ return err
270+ }
271+
272+ // --- Add Runnables to Manager ---
273+
274+ // Register FlowController Run loop.
275+ if err := registerFlowController (mgr , appFlowController ); err != nil {
231276 return err
232277 }
233278
234279 // Register health server.
235- if err := registerHealthServer (mgr , ctrl .Log .WithName ("health" ), datastore , * grpcHealthPort ); err != nil {
280+ if err := registerHealthServer (mgr , ctrl .Log .WithName ("health" ), appDatastore , * grpcHealthPort ); err != nil {
236281 return err
237282 }
238283
239284 // Register ext-proc server.
240- if err := mgr .Add (serverRunner .AsRunnable (ctrl .Log .WithName ("ext-proc" ))); err != nil {
241- setupLog .Error (err , "Failed to register ext-proc gRPC server" )
285+ if err := registerExtProcServer (mgr , serverRunner , ctrl .Log .WithName ("ext-proc" )); err != nil {
242286 return err
243287 }
244288
245289 // Register metrics handler.
246- if err := registerMetricsHandler (mgr , * metricsPort , cfg , datastore ); err != nil {
290+ if err := registerMetricsHandler (mgr , * metricsPort , cfg , appDatastore ); err != nil {
247291 return err
248292 }
249293
250- // Start the manager. This blocks until a signal is received.
294+ // --- Start Manager ---
295+ // This blocks until a signal is received.
251296 setupLog .Info ("Controller manager starting" )
252297 if err := mgr .Start (ctx ); err != nil {
253298 setupLog .Error (err , "Error starting controller manager" )
@@ -275,6 +320,39 @@ func initLogging(opts *zap.Options) {
275320 ctrl .SetLogger (logger )
276321}
277322
323+ // registerFlowController adds the FlowController Run loop as a Runnable to the
324+ // manager.
325+ func registerFlowController (mgr manager.Manager , fc * flowcontroller.FlowController ) error {
326+ if fc == nil {
327+ setupLog .Info ("FlowController is nil (disabled), skipping registration." )
328+ return nil // Not an error if it's intentionally disabled
329+ }
330+ if err := mgr .Add (manager .RunnableFunc (func (runCtx context.Context ) error {
331+ fcLog := ctrl .Log .WithName ("flowcontroller-runnable" )
332+ fcLog .Info ("Starting FlowController Run loop" )
333+ // Run the FlowController; it handles context cancellation internally.
334+ fc .Run (runCtx )
335+ fcLog .Info ("FlowController Run loop stopped" )
336+ return nil
337+ })); err != nil {
338+ setupLog .Error (err , "Failed to add FlowController runnable to manager" )
339+ return err
340+ }
341+ setupLog .Info ("FlowController Run loop added to manager." )
342+ return nil
343+ }
344+
345+ // registerExtProcServer adds the ExtProcServerRunner as a Runnable to the
346+ // manager.
347+ func registerExtProcServer (mgr manager.Manager , runner * runserver.ExtProcServerRunner , logger logr.Logger ) error {
348+ if err := mgr .Add (runner .AsRunnable (logger )); err != nil {
349+ setupLog .Error (err , "Failed to register ext-proc gRPC server runnable" )
350+ return err
351+ }
352+ setupLog .Info ("ExtProc server runner added to manager." )
353+ return nil
354+ }
355+
278356// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
279357func registerHealthServer (mgr manager.Manager , logger logr.Logger , ds datastore.Datastore , port int ) error {
280358 srv := grpc .NewServer ()
@@ -364,5 +442,4 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
364442 if mapping .LoraRequestInfo == nil {
365443 logger .Info ("Not scraping metric: LoraRequestInfo" )
366444 }
367-
368445}
0 commit comments