@@ -17,6 +17,7 @@ limitations under the License.
1717package main
1818
1919import (
20+ "context"
2021 "flag"
2122 "fmt"
2223 "net"
@@ -40,8 +41,10 @@ import (
4041 "sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
4142 backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
4243 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
44+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontroller"
4345 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4446 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
47+ "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
4548 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
4649 runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
4750 "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -136,13 +139,20 @@ func run() error {
136139 })
137140 setupLog .Info ("Flags processed" , "flags" , flags )
138141
139- // Init runtime.
142+ // --- Load Configurations from Environment Variables ---
143+ // Note: Scheduler config is loaded via its package init currently. We may
144+ // want to load it here explicitly:
145+ fcConfig , flowControllerEnabled := flowcontroller .LoadConfigFromEnv ()
146+ sdConfig := saturationdetector .LoadConfigFromEnv ()
147+
148+ // --- Get Kubernetes Config ---
140149 cfg , err := ctrl .GetConfig ()
141150 if err != nil {
142- setupLog .Error (err , "Failed to get rest config" )
151+ setupLog .Error (err , "Failed to get Kubernetes rest config" )
143152 return err
144153 }
145154
155+ // --- Setup Manager ---
146156 poolNamespacedName := types.NamespacedName {
147157 Name : * poolName ,
148158 Namespace : * poolNamespace ,
@@ -153,7 +163,7 @@ func run() error {
153163 return err
154164 }
155165
156- // Set up mapper for metric scraping.
166+ // --- Setup Datastore ---
157167 mapping , err := backendmetrics .NewMetricMapping (
158168 * totalQueuedRequestsMetric ,
159169 * kvCacheUsagePercentageMetric ,
@@ -164,47 +174,82 @@ func run() error {
164174 return err
165175 }
166176 verifyMetricMapping (* mapping , setupLog )
167-
168177 pmf := backendmetrics .NewPodMetricsFactory (& backendmetrics.PodMetricsClientImpl {MetricMapping : mapping }, * refreshMetricsInterval )
169- // Setup runner.
170178 ctx := ctrl .SetupSignalHandler ()
179+ appDatastore := datastore .NewDatastore (ctx , pmf )
171180
172- datastore := datastore .NewDatastore (ctx , pmf )
181+ // --- Initialize EPP Components ---
182+ appScheduler := scheduling .NewScheduler (appDatastore )
183+
184+ appSaturationDetector , err := saturationdetector .NewDetector (
185+ * sdConfig ,
186+ appDatastore ,
187+ ctrl .Log .WithName ("saturation-detector" ),
188+ )
189+ if err != nil {
190+ setupLog .Error (err , "Failed to create SaturationDetector" )
191+ return err
192+ }
173193
174- scheduler := scheduling .NewScheduler (datastore )
194+ var appFlowController * flowcontroller.FlowController
195+ if flowControllerEnabled {
196+ appFlowController , err = flowcontroller .NewFlowController (
197+ appSaturationDetector ,
198+ fcConfig ,
199+ )
200+ if err != nil {
201+ setupLog .Error (err , "Failed to create FlowController" )
202+ return err
203+ }
204+ setupLog .Info ("FlowController enabled and initialized." )
205+ } else {
206+ setupLog .Info ("FlowController is disabled via configuration." )
207+ }
208+
209+ // --- Setup ExtProc Server Runner ---
175210 serverRunner := & runserver.ExtProcServerRunner {
176211 GrpcPort : * grpcPort ,
177212 DestinationEndpointHintMetadataNamespace : * destinationEndpointHintMetadataNamespace ,
178213 DestinationEndpointHintKey : * destinationEndpointHintKey ,
179214 PoolNamespacedName : poolNamespacedName ,
180- Datastore : datastore ,
215+ Datastore : appDatastore ,
181216 SecureServing : * secureServing ,
182217 CertPath : * certPath ,
183218 RefreshPrometheusMetricsInterval : * refreshPrometheusMetricsInterval ,
184- Scheduler : scheduler ,
219+ Scheduler : appScheduler ,
220+ FlowController : appFlowController , // Pass instance (can be nil)
221+ SaturationDetector : appSaturationDetector ,
222+ FlowControllerEnabled : flowControllerEnabled ,
185223 }
186224 if err := serverRunner .SetupWithManager (ctx , mgr ); err != nil {
187- setupLog .Error (err , "Failed to setup ext-proc controllers" )
225+ setupLog .Error (err , "Failed to setup EPP controllers" )
226+ return err
227+ }
228+
229+ // --- Add Runnables to Manager ---
230+
231+ // Register FlowController Run loop.
232+ if err := registerFlowController (mgr , appFlowController ); err != nil {
188233 return err
189234 }
190235
191236 // Register health server.
192- if err := registerHealthServer (mgr , ctrl .Log .WithName ("health" ), datastore , * grpcHealthPort ); err != nil {
237+ if err := registerHealthServer (mgr , ctrl .Log .WithName ("health" ), appDatastore , * grpcHealthPort ); err != nil {
193238 return err
194239 }
195240
196241 // Register ext-proc server.
197- if err := mgr .Add (serverRunner .AsRunnable (ctrl .Log .WithName ("ext-proc" ))); err != nil {
198- setupLog .Error (err , "Failed to register ext-proc gRPC server" )
242+ if err := registerExtProcServer (mgr , serverRunner , ctrl .Log .WithName ("ext-proc" )); err != nil {
199243 return err
200244 }
201245
202246 // Register metrics handler.
203- if err := registerMetricsHandler (mgr , * metricsPort , cfg , datastore ); err != nil {
247+ if err := registerMetricsHandler (mgr , * metricsPort , cfg , appDatastore ); err != nil {
204248 return err
205249 }
206250
207- // Start the manager. This blocks until a signal is received.
251+ // --- Start Manager ---
252+ // This blocks until a signal is received.
208253 setupLog .Info ("Controller manager starting" )
209254 if err := mgr .Start (ctx ); err != nil {
210255 setupLog .Error (err , "Error starting controller manager" )
@@ -232,6 +277,39 @@ func initLogging(opts *zap.Options) {
232277 ctrl .SetLogger (logger )
233278}
234279
280+ // registerFlowController adds the FlowController Run loop as a Runnable to the
281+ // manager.
282+ func registerFlowController (mgr manager.Manager , fc * flowcontroller.FlowController ) error {
283+ if fc == nil {
284+ setupLog .Info ("FlowController is nil (disabled), skipping registration." )
285+ return nil // Not an error if it's intentionally disabled
286+ }
287+ if err := mgr .Add (manager .RunnableFunc (func (runCtx context.Context ) error {
288+ fcLog := ctrl .Log .WithName ("flowcontroller-runnable" )
289+ fcLog .Info ("Starting FlowController Run loop" )
290+ // Run the FlowController; it handles context cancellation internally.
291+ fc .Run (runCtx )
292+ fcLog .Info ("FlowController Run loop stopped" )
293+ return nil
294+ })); err != nil {
295+ setupLog .Error (err , "Failed to add FlowController runnable to manager" )
296+ return err
297+ }
298+ setupLog .Info ("FlowController Run loop added to manager." )
299+ return nil
300+ }
301+
302+ // registerExtProcServer adds the ExtProcServerRunner as a Runnable to the
303+ // manager.
304+ func registerExtProcServer (mgr manager.Manager , runner * runserver.ExtProcServerRunner , logger logr.Logger ) error {
305+ if err := mgr .Add (runner .AsRunnable (logger )); err != nil {
306+ setupLog .Error (err , "Failed to register ext-proc gRPC server runnable" )
307+ return err
308+ }
309+ setupLog .Info ("ExtProc server runner added to manager." )
310+ return nil
311+ }
312+
235313// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
236314func registerHealthServer (mgr manager.Manager , logger logr.Logger , ds datastore.Datastore , port int ) error {
237315 srv := grpc .NewServer ()
@@ -321,5 +399,4 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge
321399 if mapping .LoraRequestInfo == nil {
322400 logger .Info ("Not scraping metric: LoraRequestInfo" )
323401 }
324-
325402}
0 commit comments