Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ COPY internal ./internal
COPY apix ./apix
COPY api ./api
COPY version ./version
COPY sidecars ./sidecars
WORKDIR /src/cmd/epp
RUN go build -ldflags="-X sigs.k8s.io/gateway-api-inference-extension/version.CommitSHA=${COMMIT_SHA} -X sigs.k8s.io/gateway-api-inference-extension/version.BuildRef=${BUILD_REF}" -o /epp

Expand Down
69 changes: 65 additions & 4 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package runner
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
Expand Down Expand Up @@ -61,13 +62,15 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/slo_aware_router"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
testfilter "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test/filter"
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
latencypredictor "sigs.k8s.io/gateway-api-inference-extension/sidecars/latencypredictorasync"
"sigs.k8s.io/gateway-api-inference-extension/version"
)

Expand Down Expand Up @@ -108,6 +111,7 @@ var (
"then a self-signed certificate is used.")
// metric flags
totalQueuedRequestsMetric = flag.String("total-queued-requests-metric", runserver.DefaultTotalQueuedRequestsMetric, "Prometheus metric for the number of queued requests.")
totalRunningRequestsMetric = flag.String("total-running-requests-metric", runserver.DefaultTotalRunningRequestsMetric, "Prometheus metric for the number of running requests.")
kvCacheUsagePercentageMetric = flag.String("kv-cache-usage-percentage-metric", runserver.DefaultKvCacheUsagePercentageMetric, "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).")
// LoRA metrics
loraInfoMetric = flag.String("lora-info-metric", runserver.DefaultLoraInfoMetric, "Prometheus metric for the LoRA info metrics (must be in vLLM label format).")
Expand All @@ -127,7 +131,10 @@ var (
modelServerMetricsScheme = flag.String("model-server-metrics-scheme", "http", "Scheme to scrape metrics from pods")
modelServerMetricsHttpsInsecureSkipVerify = flag.Bool("model-server-metrics-https-insecure-skip-verify", true, "When using 'https' scheme for 'model-server-metrics-scheme', configure 'InsecureSkipVerify' (default to true)")
haEnableLeaderElection = flag.Bool("ha-enable-leader-election", false, "Enables leader election for high availability. When enabled, readiness probes will only pass on the leader.")
tracing = flag.Bool("tracing", true, "Enables emitting traces")

// Latency Predictor Flag
enableLatencyPredictor = flag.Bool("enable-latency-predictor", false, "Enable the regression-based latency predictor and scheduler scorer.")
tracing = flag.Bool("tracing", true, "Enables emitting traces")

setupLog = ctrl.Log.WithName("setup")
)
Expand Down Expand Up @@ -297,9 +304,29 @@ func (r *Runner) Run(ctx context.Context) error {
runtime.SetBlockProfileRate(1)
}

err = r.parsePluginsConfiguration(ctx, datastore)
// ===================================================================
// == Latency Predictor Integration
// ===================================================================
var predictor latencypredictor.PredictorInterface // Use the interface type
if *enableLatencyPredictor {
setupLog.Info("Latency predictor is enabled. Initializing...")
predictor = latencypredictor.New(latencypredictor.ConfigFromEnv(), ctrl.Log.WithName("latency-predictor"))

// For the runnable, you'll need to type assert back to the concrete type
concretePredictor := predictor.(*latencypredictor.Predictor)
if err := mgr.Add(runnable.NoLeaderElection(&predictorRunnable{predictor: concretePredictor})); err != nil {
setupLog.Error(err, "Failed to register latency predictor runnable")
return err
}
} else {
setupLog.Info("Latency predictor is disabled.")
predictor = nil // This will be a true nil interface
}
// ===================================================================

err = r.parsePluginsConfiguration(ctx, predictor, datastore)
if err != nil {
setupLog.Error(err, "Failed to parse plugins configuration")
setupLog.Error(err, "Failed to parse the configuration")
return err
}

Expand Down Expand Up @@ -368,6 +395,7 @@ func (r *Runner) Run(ctx context.Context) error {
Director: director,
SaturationDetector: saturationDetector,
UseExperimentalDatalayerV2: useDatalayerV2, // pluggable data layer feature flag
LatencyPredictor: predictor,
}
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "Failed to setup EPP controllers")
Expand Down Expand Up @@ -410,7 +438,14 @@ func (r *Runner) registerInTreePlugins() {
plugins.Register(testfilter.HeaderBasedTestingFilterType, testfilter.HeaderBasedTestingFilterFactory)
}

func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Datastore) error {
func (r *Runner) registerLatencyPredictorPlugins(predictor latencypredictor.PredictorInterface) {
plugins.Register(slo_aware_router.SLOAwareRouterPluginType, func(name string, _ json.RawMessage, _ plugins.Handle) (plugins.Plugin, error) {
return slo_aware_router.NewSLOAwareRouter(predictor, slo_aware_router.HeadroomSelectionStrategy).WithName(name), nil
})
plugins.Register(profile.SLOAwareProfileHandlerType, profile.SLOAwareProfileHandlerFactory)
}

func (r *Runner) parsePluginsConfiguration(ctx context.Context, predictor latencypredictor.PredictorInterface, ds datastore.Datastore) error {
if *configText == "" && *configFile == "" {
return nil // configuring through code, not through file
}
Expand All @@ -429,6 +464,12 @@ func (r *Runner) parsePluginsConfiguration(ctx context.Context, ds datastore.Dat
}

r.registerInTreePlugins()
// If we have a latency predictor enabled and predictor and datastore are not nil,
// register the latency predictor plugins (currently just the SLO scorer).
if *enableLatencyPredictor && predictor != nil {
setupLog.Info("Registering latency predictor plugins")
r.registerLatencyPredictorPlugins(predictor)
}
handle := plugins.NewEppHandle(ctx, ds.PodList)
config, err := loader.LoadConfig(configBytes, handle, logger)

Expand Down Expand Up @@ -459,6 +500,7 @@ func (r *Runner) setupMetricsCollection(setupLog logr.Logger, useExperimentalDat
func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) {
mapping, err := backendmetrics.NewMetricMapping(
*totalQueuedRequestsMetric,
*totalRunningRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric,
*cacheInfoMetric,
Expand Down Expand Up @@ -502,6 +544,7 @@ func setupDatalayer() (datalayer.EndpointFactory, error) {
*modelServerMetricsHttpsInsecureSkipVerify,
nil)
extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric,
*totalRunningRequestsMetric,
*kvCacheUsagePercentageMetric,
*loraInfoMetric, *cacheInfoMetric)

Expand Down Expand Up @@ -613,3 +656,21 @@ func setupPprofHandlers(mgr ctrl.Manager) error {
}
return nil
}

// ===================================================================
// == Latency Predictor Plugin and Helpers
// ===================================================================

// predictorRunnable implements controller-runtime's Runnable interface to manage the predictor's lifecycle.
type predictorRunnable struct {
predictor *latencypredictor.Predictor
}

func (p *predictorRunnable) Start(ctx context.Context) error {
setupLog.Info("Starting latency predictor...")
p.predictor.Start(ctx)
<-ctx.Done()
setupLog.Info("Stopping latency predictor...")
p.predictor.Stop()
return nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/elastic/crd-ref-docs v0.2.0
github.com/envoyproxy/go-control-plane/envoy v1.35.0
github.com/go-logr/logr v1.4.3
github.com/go-logr/zapr v1.3.0
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
Expand Down Expand Up @@ -61,7 +62,6 @@ require (
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-logr/zapr v1.3.0 // indirect
github.com/go-openapi/jsonpointer v0.21.2 // indirect
github.com/go-openapi/jsonreference v0.21.0 // indirect
github.com/go-openapi/swag v0.23.1 // indirect
Expand Down
9 changes: 9 additions & 0 deletions pkg/epp/backend/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ func (p *PodMetricsClientImpl) promToPodMetrics(
}
}

if p.MetricMapping.TotalRunningRequests != nil {
running, err := p.getMetric(metricFamilies, *p.MetricMapping.TotalRunningRequests)
if err == nil {
updated.RunningQueueSize = int(running.GetGauge().GetValue())
} else {
errs = multierr.Append(errs, err)
}
}

if p.MetricMapping.KVCacheUtilization != nil {
usage, err := p.getMetric(metricFamilies, *p.MetricMapping.KVCacheUtilization)
if err == nil {
Expand Down
24 changes: 15 additions & 9 deletions pkg/epp/backend/metrics/metrics_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ type MetricSpec struct {

// MetricMapping holds named MetricSpecs.
type MetricMapping struct {
TotalQueuedRequests *MetricSpec
KVCacheUtilization *MetricSpec
LoraRequestInfo *MetricSpec
CacheConfigInfo *MetricSpec
TotalQueuedRequests *MetricSpec
TotalRunningRequests *MetricSpec
KVCacheUtilization *MetricSpec
LoraRequestInfo *MetricSpec
CacheConfigInfo *MetricSpec
}

// stringToMetricSpec converts a string to a MetricSpec.
Expand Down Expand Up @@ -94,11 +95,15 @@ func stringToMetricSpec(specStr string) (*MetricSpec, error) {
}

// NewMetricMapping creates a MetricMapping from string values.
func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr, cacheInfoMetric string) (*MetricMapping, error) {
func NewMetricMapping(queuedStr, runningStr, kvUsageStr, loraReqInfoStr, cacheInfoMetric string) (*MetricMapping, error) {
queuedSpec, err := stringToMetricSpec(queuedStr)
if err != nil {
return nil, fmt.Errorf("error parsing WaitingRequests: %w", err)
}
runningSpec, err := stringToMetricSpec(runningStr)
if err != nil {
return nil, fmt.Errorf("error parsing RunningRequests: %w", err)
}
kvUsageSpec, err := stringToMetricSpec(kvUsageStr)
if err != nil {
return nil, fmt.Errorf("error parsing KVCacheUsage: %w", err)
Expand All @@ -114,10 +119,11 @@ func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr, cacheInfoMetric str
}

mapping := &MetricMapping{
TotalQueuedRequests: queuedSpec,
KVCacheUtilization: kvUsageSpec,
LoraRequestInfo: loraReqInfoSpec,
CacheConfigInfo: cacheInfoSpec,
TotalQueuedRequests: queuedSpec,
TotalRunningRequests: runningSpec,
KVCacheUtilization: kvUsageSpec,
LoraRequestInfo: loraReqInfoSpec,
CacheConfigInfo: cacheInfoSpec,
}

return mapping, nil
Expand Down
13 changes: 11 additions & 2 deletions pkg/epp/datalayer/metrics/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func Produces() map[string]any {
// configured with the given metrics' specifications.
// These are mandatory metrics per the MSP specification, and are used
// as the basis for the built-in scheduling plugins.
func NewExtractor(queueSpec, kvusageSpec, loraSpec, cacheInfoSpec string) (*Extractor, error) {
mapping, err := NewMapping(queueSpec, kvusageSpec, loraSpec, cacheInfoSpec)
func NewExtractor(queueSpec, runningSpec, kvusageSpec, loraSpec, cacheInfoSpec string) (*Extractor, error) {
mapping, err := NewMapping(queueSpec, runningSpec, kvusageSpec, loraSpec, cacheInfoSpec)
if err != nil {
return nil, fmt.Errorf("failed to create extractor metrics Mapping - %w", err)
}
Expand Down Expand Up @@ -107,6 +107,15 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi
}
}

if spec := ext.mapping.TotalRunningRequests; spec != nil { // extract running requests
if metric, err := spec.getLatestMetric(families); err != nil {
errs = append(errs, err)
} else {
clone.RunningQueueSize = int(extractValue(metric))
updated = true
}
}

if spec := ext.mapping.KVCacheUtilization; spec != nil { // extract KV cache usage
if metric, err := spec.getLatestMetric(families); err != nil {
errs = append(errs, err)
Expand Down
24 changes: 15 additions & 9 deletions pkg/epp/datalayer/metrics/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,25 @@ import (
// Mapping holds specifications for the well-known metrics defined
// in the Model Server Protocol.
type Mapping struct {
TotalQueuedRequests *Spec
KVCacheUtilization *Spec
LoraRequestInfo *LoRASpec
CacheInfo *Spec
TotalQueuedRequests *Spec
TotalRunningRequests *Spec
KVCacheUtilization *Spec
LoraRequestInfo *LoRASpec
CacheInfo *Spec
}

// NewMapping creates a metrics.Mapping from the input specification strings.
func NewMapping(queue, kvusage, lora, cacheInfo string) (*Mapping, error) {
func NewMapping(queue, running, kvusage, lora, cacheInfo string) (*Mapping, error) {
var errs []error

queueSpec, err := parseStringToSpec(queue)
if err != nil {
errs = append(errs, err)
}
runningSpec, err := parseStringToSpec(running)
if err != nil {
errs = append(errs, err)
}
kvusageSpec, err := parseStringToSpec(kvusage)
if err != nil {
errs = append(errs, err)
Expand All @@ -55,9 +60,10 @@ func NewMapping(queue, kvusage, lora, cacheInfo string) (*Mapping, error) {
return nil, errors.Join(errs...)
}
return &Mapping{
TotalQueuedRequests: queueSpec,
KVCacheUtilization: kvusageSpec,
LoraRequestInfo: loraSpec,
CacheInfo: cacheInfoSpec,
TotalQueuedRequests: queueSpec,
TotalRunningRequests: runningSpec,
KVCacheUtilization: kvusageSpec,
LoraRequestInfo: loraSpec,
CacheInfo: cacheInfoSpec,
}, nil
}
2 changes: 1 addition & 1 deletion pkg/epp/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Datastore interface {
// PodList lists pods matching the given predicate.
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool
PodDelete(podNAme string)
PodDelete(podName string)

// Clears the store state, happens when the pool gets deleted.
Clear()
Expand Down
Loading