From 5945cfb4e2872edef26d77ca3e420f6b1936132e Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 31 Oct 2025 16:18:55 -0700 Subject: [PATCH 01/10] partial draft --- cmd/epp/runner/runner.go | 156 ++++++++++++++---- .../inferenceobjective_reconciler_test.go | 2 +- .../controller/inferencepool_reconciler.go | 2 +- pkg/epp/controller/pod_reconciler_test.go | 2 +- pkg/epp/datalayer/endpointsPools.go | 46 ++++++ pkg/epp/datastore/datastore.go | 66 ++++---- pkg/epp/datastore/datastore_test.go | 8 +- .../metrics/collectors/inference_pool_test.go | 2 +- pkg/epp/requestcontrol/director_test.go | 4 +- pkg/epp/server/controller_manager.go | 51 +++--- pkg/epp/server/runserver.go | 44 ++--- test/utils/server.go | 2 +- 12 files changed, 260 insertions(+), 125 deletions(-) create mode 100644 pkg/epp/datalayer/endpointsPools.go diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index cbd3ea024..089929576 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -26,6 +26,8 @@ import ( "net/http/pprof" "os" "runtime" + "strconv" + "strings" "sync/atomic" "github.com/go-logr/logr" @@ -100,6 +102,8 @@ var ( poolName = flag.String("pool-name", runserver.DefaultPoolName, "Name of the InferencePool this Endpoint Picker is associated with.") poolGroup = flag.String("pool-group", runserver.DefaultPoolGroup, "group of the InferencePool this Endpoint Picker is associated with.") poolNamespace = flag.String("pool-namespace", "", "Namespace of the InferencePool this Endpoint Picker is associated with.") + selector = flag.String("selector", "", "selector to filter pods on. Format: a comma-separated list of labels, e.g., 'app: vllm-llama3-8b-instruct,env=prod'.") + targetPorts = flag.String("target-ports", "", "target ports of model server pods. Format: a comma-separated list of labels, e.g., '3000,3001,3002'") logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity") secureServing = flag.Bool("secure-serving", runserver.DefaultSecureServing, "Enables secure serving. Defaults to true.") healthChecking = flag.Bool("health-checking", runserver.DefaultHealthChecking, "Enables health checking") @@ -194,6 +198,64 @@ func (r *Runner) Run(ctx context.Context) error { setupLog.Error(err, "Failed to get Kubernetes rest config") return err } + //Setup EndPointsPool + endPointsPool := datalayer.NewEndPointsPool() + if *poolName != "" { + // Determine pool namespace: if --pool-namespace is non-empty, use it; else NAMESPACE env var; else default + resolvePoolNamespace := func() string { + if *poolNamespace != "" { + return *poolNamespace + } + if nsEnv := os.Getenv("NAMESPACE"); nsEnv != "" { + return nsEnv + } + return runserver.DefaultPoolNamespace + } + resolvedPoolNamespace := resolvePoolNamespace() + poolNamespacedName := types.NamespacedName{ + Name: *poolName, + Namespace: resolvedPoolNamespace, + } + poolGroupKind := schema.GroupKind{ + Group: *poolGroup, + Kind: "InferencePool", + } + poolGKNN := common.GKNN{ + NamespacedName: poolNamespacedName, + GroupKind: poolGroupKind, + } + endPointsPool.GKNN = poolGKNN + } + + if *selector != "" { + endPointsPool.EndPoints.Selector, err = strToMap(*selector) + if err != nil { + setupLog.Error(err, "Failed to parse flag %q with error: %w", "selector", err) + return err + } + endPointsPool.EndPoints.TargetPorts, err = strToUniqueIntSlice(*targetPorts) + if err != nil { + setupLog.Error(err, "Failed to parse flag %q with error: %w", "target-ports", err) + } + endPointsPool.StandaloneMode = true + + // Determine EPP namespace: NAMESPACE env var; else default + eppNsEnv := os.Getenv("EPP_NAMESPACE") + if eppNsEnv == "" { + setupLog.Error(err, "Failed to get environment variable EPP_NAMESPACE") + } + // Determine EPP name: EPP_NAME env var + eppNameEnv := os.Getenv("EPP_NAME") + if eppNameEnv == "" { + setupLog.Error(err, "Failed to get environment variable EPP_NAME") + + } + endPointsPool.GKNN = common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: eppNsEnv, Name: eppNameEnv}, + GroupKind: schema.GroupKind{Kind: "apps", Group: "Deployment"}, + } + + } // --- Setup Datastore --- useDatalayerV2 := env.GetEnvBool(enableExperimentalDatalayerV2, false, setupLog) @@ -201,7 +263,7 @@ func (r *Runner) Run(ctx context.Context) error { if err != nil { return err } - datastore := datastore.NewDatastore(ctx, epf, int32(*modelServerMetricsPort)) + datastore := datastore.NewDatastore(ctx, epf, int32(*modelServerMetricsPort), endPointsPool.EndPoints, endPointsPool.StandaloneMode) // --- Setup Metrics Server --- customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)} @@ -223,34 +285,10 @@ func (r *Runner) Run(ctx context.Context) error { }(), } - // Determine pool namespace: if --pool-namespace is non-empty, use it; else NAMESPACE env var; else default - resolvePoolNamespace := func() string { - if *poolNamespace != "" { - return *poolNamespace - } - if nsEnv := os.Getenv("NAMESPACE"); nsEnv != "" { - return nsEnv - } - return runserver.DefaultPoolNamespace - } - resolvedPoolNamespace := resolvePoolNamespace() - poolNamespacedName := types.NamespacedName{ - Name: *poolName, - Namespace: resolvedPoolNamespace, - } - poolGroupKind := schema.GroupKind{ - Group: *poolGroup, - Kind: "InferencePool", - } - poolGKNN := common.GKNN{ - NamespacedName: poolNamespacedName, - GroupKind: poolGroupKind, - } - isLeader := &atomic.Bool{} isLeader.Store(false) - mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions, *haEnableLeaderElection) + mgr, err := runserver.NewDefaultManager(endPointsPool, cfg, metricsServerOptions, *haEnableLeaderElection) if err != nil { setupLog.Error(err, "Failed to create controller manager") return err @@ -339,8 +377,7 @@ func (r *Runner) Run(ctx context.Context) error { // --- Setup ExtProc Server Runner --- serverRunner := &runserver.ExtProcServerRunner{ GrpcPort: *grpcPort, - PoolNamespacedName: poolNamespacedName, - PoolGKNN: poolGKNN, + EndPointsPool: endPointsPool, Datastore: datastore, SecureServing: *secureServing, HealthChecking: *healthChecking, @@ -547,9 +584,19 @@ func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore. } func validateFlags() error { - if *poolName == "" { - return fmt.Errorf("required %q flag not set", "poolName") + if (*poolName != "" && *selector != "") || (*poolName == "" && *selector == "") { + return fmt.Errorf("either poolName or selector must be set") } + if *selector != "" { + targetPortsList, err := strToUniqueIntSlice(*targetPorts) + if err != nil { + return fmt.Errorf("unexpected value for %q flag with error %w", "target-ports", err) + } + if len(targetPortsList) == 0 || len(targetPortsList) > 8 { + return fmt.Errorf("flag %q should have length from 1 to 8", "target-ports") + } + } + if *configText != "" && *configFile != "" { return fmt.Errorf("both the %q and %q flags can not be set at the same time", "configText", "configFile") } @@ -560,6 +607,55 @@ func validateFlags() error { return nil } +func strToUniqueIntSlice(s string) ([]int, error) { + seen := make(map[int]struct{}) + var intList []int + + if s == "" { + return intList, nil + } + + strList := strings.Split(s, ",") + + for _, str := range strList { + trimmedStr := strings.TrimSpace(str) + if trimmedStr == "" { + continue + } + portInt, err := strconv.Atoi(trimmedStr) + if err != nil { + return nil, fmt.Errorf("invalid number: '%s' is not an integer", trimmedStr) + } + + if _, ok := seen[portInt]; !ok { + seen[portInt] = struct{}{} + intList = append(intList, portInt) + } + } + return intList, nil +} + +func strToMap(s string) (map[string]string, error) { + m := make(map[string]string) + if s == "" { + return m, nil + } + + mPairs := strings.Split(s, ",") + for _, pair := range mPairs { + trimmedPair := strings.TrimSpace(pair) + if trimmedPair == "" { + continue + } + kv := strings.Split(trimmedPair, ":") + if len(kv) != 2 { + return nil, fmt.Errorf("invalid format, expected key:value paris") + } + m[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1]) + } + return m, nil +} + func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logger) { if mapping.TotalQueuedRequests == nil { logger.Info("Not scraping metric: TotalQueuedRequests") diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 4ceff5d07..1a7ed4dd6 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -164,7 +164,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) { for _, m := range test.objectivessInStore { ds.ObjectiveSet(m) } - _ = ds.PoolSet(context.Background(), fakeClient, pool) + _ = ds.EndPointsSet(context.Background(), fakeClient, pool) reconciler := &InferenceObjectiveReconciler{ Reader: fakeClient, Datastore: ds, diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index 3b52de0ae..abe2aec86 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -93,7 +93,7 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group) } - if err := c.Datastore.PoolSet(ctx, c.Reader, v1infPool); err != nil { + if err := c.Datastore.EndPointsSet(ctx, c.Reader, v1infPool); err != nil { return ctrl.Result{}, fmt.Errorf("failed to update datastore - %w", err) } diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index 28f817310..1e7f971be 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -197,7 +197,7 @@ func TestPodReconciler(t *testing.T) { // Configure the initial state of the datastore. store := datastore.NewDatastore(t.Context(), pmf, 0) - _ = store.PoolSet(t.Context(), fakeClient, test.pool) + _ = store.EndPointsSet(t.Context(), fakeClient, test.pool) for _, pod := range test.existingPods { store.PodUpdateOrAddIfNotExist(pod) } diff --git a/pkg/epp/datalayer/endpointsPools.go b/pkg/epp/datalayer/endpointsPools.go new file mode 100644 index 000000000..cd395ed15 --- /dev/null +++ b/pkg/epp/datalayer/endpointsPools.go @@ -0,0 +1,46 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datalayer + +import "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + +type EndPointsPool struct { + EndPoints *EndPoints + StandaloneMode bool + GKNN common.GKNN +} + +// NewEndPointsPool creates and returns a new empty instance of EndPointsPool. +func NewEndPointsPool() *EndPointsPool { + endPoints := NewEndPoints() + return &EndPointsPool{ + EndPoints: endPoints, + } +} + +type EndPoints struct { + Selector map[string]string + TargetPorts []int +} + +// NewEndPoints creates and returns a new empty instance of EndPointsPool. +func NewEndPoints() *EndPoints { + return &EndPoints{ + Selector: make(map[string]string), + TargetPorts: []int{}, + } +} diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index dade69469..29807166a 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -49,7 +49,7 @@ type Datastore interface { // PoolSet sets the given pool in datastore. If the given pool has different label selector than the previous pool // that was stored, the function triggers a resync of the pods to keep the datastore updated. If the given pool // is nil, this call triggers the datastore.Clear() function. - PoolSet(ctx context.Context, reader client.Reader, pool *v1.InferencePool) error + EndPointsSet(ctx context.Context, reader client.Reader, pool *v1.InferencePool) error PoolGet() (*v1.InferencePool, error) PoolHasSynced() bool PoolLabelsMatch(podLabels map[string]string) bool @@ -69,14 +69,16 @@ type Datastore interface { Clear() } -func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32) Datastore { +func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32, endPoints *datalayer.EndPoints, standaloneMode bool) Datastore { store := &datastore{ - parentCtx: parentCtx, - poolAndObjectivesMu: sync.RWMutex{}, - objectives: make(map[string]*v1alpha2.InferenceObjective), - pods: &sync.Map{}, - modelServerMetricsPort: modelServerMetricsPort, - epf: epFactory, + parentCtx: parentCtx, + endPointsAndObjectivesMu: sync.RWMutex{}, + standaloneMode: standaloneMode, + endPoints: endPoints, + objectives: make(map[string]*v1alpha2.InferenceObjective), + pods: &sync.Map{}, + modelServerMetricsPort: modelServerMetricsPort, + epf: epFactory, } return store } @@ -84,9 +86,11 @@ func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory type datastore struct { // parentCtx controls the lifecycle of the background metrics goroutines that spawn up by the datastore. parentCtx context.Context - // poolAndObjectivesMu is used to synchronize access to pool and the objectives map. - poolAndObjectivesMu sync.RWMutex - pool *v1.InferencePool + // endPointsAndObjectivesMu is used to synchronize access to pool and the objectives map. + endPointsAndObjectivesMu sync.RWMutex + standaloneMode bool + // endPoints is used to filter the available model server endpoints + endPoints *datalayer.EndPoints // key: InferenceObjective.Spec.ModelName, value: *InferenceObjective objectives map[string]*v1alpha2.InferenceObjective // key: types.NamespacedName, value: backendmetrics.PodMetrics @@ -98,9 +102,9 @@ type datastore struct { } func (ds *datastore) Clear() { - ds.poolAndObjectivesMu.Lock() - defer ds.poolAndObjectivesMu.Unlock() - ds.pool = nil + ds.endPointsAndObjectivesMu.Lock() + defer ds.endPointsAndObjectivesMu.Unlock() + ds.endPoints = nil ds.objectives = make(map[string]*v1alpha2.InferenceObjective) // stop all pods go routines before clearing the pods map. ds.pods.Range(func(_, v any) bool { @@ -111,14 +115,14 @@ func (ds *datastore) Clear() { } // /// InferencePool APIs /// -func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, pool *v1.InferencePool) error { +func (ds *datastore) EndPointsSet(ctx context.Context, reader client.Reader, pool *v1.InferencePool) error { if pool == nil { ds.Clear() return nil } logger := log.FromContext(ctx) - ds.poolAndObjectivesMu.Lock() - defer ds.poolAndObjectivesMu.Unlock() + ds.endPointsAndObjectivesMu.Lock() + defer ds.endPointsAndObjectivesMu.Unlock() oldPool := ds.pool ds.pool = pool @@ -139,8 +143,8 @@ func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, pool *v1 } func (ds *datastore) PoolGet() (*v1.InferencePool, error) { - ds.poolAndObjectivesMu.RLock() - defer ds.poolAndObjectivesMu.RUnlock() + ds.endPointsAndObjectivesMu.RLock() + defer ds.endPointsAndObjectivesMu.RUnlock() if !ds.PoolHasSynced() { return nil, errPoolNotSynced } @@ -148,14 +152,14 @@ func (ds *datastore) PoolGet() (*v1.InferencePool, error) { } func (ds *datastore) PoolHasSynced() bool { - ds.poolAndObjectivesMu.RLock() - defer ds.poolAndObjectivesMu.RUnlock() + ds.endPointsAndObjectivesMu.RLock() + defer ds.endPointsAndObjectivesMu.RUnlock() return ds.pool != nil } func (ds *datastore) PoolLabelsMatch(podLabels map[string]string) bool { - ds.poolAndObjectivesMu.RLock() - defer ds.poolAndObjectivesMu.RUnlock() + ds.endPointsAndObjectivesMu.RLock() + defer ds.endPointsAndObjectivesMu.RUnlock() if ds.pool == nil { return false } @@ -165,15 +169,15 @@ func (ds *datastore) PoolLabelsMatch(podLabels map[string]string) bool { } func (ds *datastore) ObjectiveSet(infObjective *v1alpha2.InferenceObjective) { - ds.poolAndObjectivesMu.Lock() - defer ds.poolAndObjectivesMu.Unlock() + ds.endPointsAndObjectivesMu.Lock() + defer ds.endPointsAndObjectivesMu.Unlock() // Set the objective. ds.objectives[infObjective.Name] = infObjective } func (ds *datastore) ObjectiveGet(objectiveName string) *v1alpha2.InferenceObjective { - ds.poolAndObjectivesMu.RLock() - defer ds.poolAndObjectivesMu.RUnlock() + ds.endPointsAndObjectivesMu.RLock() + defer ds.endPointsAndObjectivesMu.RUnlock() iObj, ok := ds.objectives[objectiveName] if !ok { return nil @@ -182,14 +186,14 @@ func (ds *datastore) ObjectiveGet(objectiveName string) *v1alpha2.InferenceObjec } func (ds *datastore) ObjectiveDelete(namespacedName types.NamespacedName) { - ds.poolAndObjectivesMu.Lock() - defer ds.poolAndObjectivesMu.Unlock() + ds.endPointsAndObjectivesMu.Lock() + defer ds.endPointsAndObjectivesMu.Unlock() delete(ds.objectives, namespacedName.Name) } func (ds *datastore) ObjectiveGetAll() []*v1alpha2.InferenceObjective { - ds.poolAndObjectivesMu.RLock() - defer ds.poolAndObjectivesMu.RUnlock() + ds.endPointsAndObjectivesMu.RLock() + defer ds.endPointsAndObjectivesMu.RUnlock() res := []*v1alpha2.InferenceObjective{} for _, v := range ds.objectives { res = append(res, v) diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index ee59071e6..7acc87657 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -87,7 +87,7 @@ func TestPool(t *testing.T) { Build() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) ds := NewDatastore(context.Background(), pmf, 0) - _ = ds.PoolSet(context.Background(), fakeClient, tt.inferencePool) + _ = ds.EndPointsSet(context.Background(), fakeClient, tt.inferencePool) gotPool, gotErr := ds.PoolGet() if diff := cmp.Diff(tt.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { t.Errorf("Unexpected error diff (+got/-want): %s", diff) @@ -328,7 +328,7 @@ func TestMetrics(t *testing.T) { Build() pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond) ds := NewDatastore(ctx, pmf, 0) - _ = ds.PoolSet(ctx, fakeClient, inferencePool) + _ = ds.EndPointsSet(ctx, fakeClient, inferencePool) for _, pod := range test.storePods { ds.PodUpdateOrAddIfNotExist(pod) } @@ -397,7 +397,7 @@ func TestPods(t *testing.T) { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) ds := NewDatastore(t.Context(), pmf, 0) fakeClient := fake.NewFakeClient() - if err := ds.PoolSet(ctx, fakeClient, inferencePool); err != nil { + if err := ds.EndPointsSet(ctx, fakeClient, inferencePool); err != nil { t.Error(err) } for _, pod := range test.existingPods { @@ -581,7 +581,7 @@ func TestPodInfo(t *testing.T) { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) ds := NewDatastore(t.Context(), pmf, 0) fakeClient := fake.NewFakeClient() - if err := ds.PoolSet(ctx, fakeClient, test.pool); err != nil { + if err := ds.EndPointsSet(ctx, fakeClient, test.pool); err != nil { t.Error(err) } for _, pod := range test.existingPods { diff --git a/pkg/epp/metrics/collectors/inference_pool_test.go b/pkg/epp/metrics/collectors/inference_pool_test.go index af2923e50..e6a9c574c 100644 --- a/pkg/epp/metrics/collectors/inference_pool_test.go +++ b/pkg/epp/metrics/collectors/inference_pool_test.go @@ -83,7 +83,7 @@ func TestMetricsCollected(t *testing.T) { TargetPorts: []v1.Port{{Number: v1.PortNumber(int32(8000))}}, }, } - _ = ds.PoolSet(context.Background(), fakeClient, inferencePool) + _ = ds.EndPointsSet(context.Background(), fakeClient, inferencePool) _ = ds.PodUpdateOrAddIfNotExist(pod1) time.Sleep(1 * time.Second) diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 8cb9c91a5..a8778f7da 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -140,7 +140,7 @@ func TestDirector_HandleRequest(t *testing.T) { scheme := runtime.NewScheme() _ = clientgoscheme.AddToScheme(scheme) fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() - if err := ds.PoolSet(ctx, fakeClient, pool); err != nil { + if err := ds.EndPointsSet(ctx, fakeClient, pool); err != nil { t.Fatalf("Error while setting inference pool: %v", err) } @@ -595,7 +595,7 @@ func TestGetRandomPod(t *testing.T) { t.Run(test.name, func(t *testing.T) { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Millisecond) ds := datastore.NewDatastore(t.Context(), pmf, 0) - err := ds.PoolSet(t.Context(), fakeClient, pool) + err := ds.EndPointsSet(t.Context(), fakeClient, pool) if err != nil { t.Errorf("unexpected error setting pool: %s", err) } diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index 47e4f12d4..e3ca2a15c 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -18,6 +18,7 @@ package server import ( "fmt" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" @@ -33,7 +34,6 @@ import ( v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" ) var scheme = runtime.NewScheme() @@ -45,48 +45,47 @@ func init() { } // defaultManagerOptions returns the default options used to create the manager. -func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver.Options) (ctrl.Options, error) { +func defaultManagerOptions(endPointsPool *datalayer.EndPointsPool, metricsServerOptions metricsserver.Options) (ctrl.Options, error) { opt := ctrl.Options{ Scheme: scheme, Cache: cache.Options{ ByObject: map[client.Object]cache.ByObject{ &corev1.Pod{}: { Namespaces: map[string]cache.Config{ - gknn.Namespace: {}, - }, - }, - &v1alpha2.InferenceObjective{}: { - Namespaces: map[string]cache.Config{ - gknn.Namespace: {}, + endPointsPool.GKNN.Namespace: {}, }, }, }, }, Metrics: metricsServerOptions, } - switch gknn.Group { - case v1alpha2.GroupName: - opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ - Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ - "metadata.name": gknn.Name, - })}}, + if !endPointsPool.StandaloneMode { + opt.Cache.ByObject[&v1alpha2.InferenceObjective{}] = cache.ByObject{Namespaces: map[string]cache.Config{ + endPointsPool.GKNN.Namespace: {}, + }} + switch endPointsPool.GKNN.Group { + case v1alpha2.GroupName: + opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{endPointsPool.GKNN.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": endPointsPool.GKNN.Name, + })}}, + } + case v1.GroupName: + opt.Cache.ByObject[&v1.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{endPointsPool.GKNN.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": endPointsPool.GKNN.Name, + })}}, + } } - case v1.GroupName: - opt.Cache.ByObject[&v1.InferencePool{}] = cache.ByObject{ - Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ - "metadata.name": gknn.Name, - })}}, - } - default: - return ctrl.Options{}, fmt.Errorf("unknown group: %s", gknn.Group) + } return opt, nil } // NewDefaultManager creates a new controller manager with default configuration. -func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) { - opt, err := defaultManagerOptions(gknn, metricsServerOptions) +func NewDefaultManager(endPointsPool *datalayer.EndPointsPool, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) { + opt, err := defaultManagerOptions(endPointsPool, metricsServerOptions) if err != nil { return nil, fmt.Errorf("failed to create controller manager options: %v", err) } @@ -95,8 +94,8 @@ func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerO opt.LeaderElection = true opt.LeaderElectionResourceLock = "leases" // The lease name needs to be unique per EPP deployment. - opt.LeaderElectionID = fmt.Sprintf("epp-%s-%s.gateway-api-inference-extension.sigs.k8s.io", gknn.Namespace, gknn.Name) - opt.LeaderElectionNamespace = gknn.Namespace + opt.LeaderElectionID = fmt.Sprintf("epp-%s-%s.gateway-api-inference-extension.sigs.k8s.io", endPointsPool.GKNN.Namespace, endPointsPool.GKNN.Name) + opt.LeaderElectionNamespace = endPointsPool.GKNN.Namespace opt.LeaderElectionReleaseOnCancel = true } diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index c3037175e..c79054bd5 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -20,6 +20,7 @@ import ( "context" "crypto/tls" "fmt" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -28,14 +29,11 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/health" healthgrpc "google.golang.org/grpc/health/grpc_health_v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" tlsutil "sigs.k8s.io/gateway-api-inference-extension/internal/tls" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/controller" dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics" @@ -48,8 +46,7 @@ import ( // ExtProcServerRunner provides methods to manage an external process server. type ExtProcServerRunner struct { GrpcPort int - PoolNamespacedName types.NamespacedName - PoolGKNN common.GKNN + EndPointsPool *datalayer.EndPointsPool Datastore datastore.Datastore SecureServing bool HealthChecking bool @@ -91,17 +88,8 @@ const ( // NewDefaultExtProcServerRunner creates a runner with default values. // Note: Dependencies like Datastore, Scheduler, SD need to be set separately. func NewDefaultExtProcServerRunner() *ExtProcServerRunner { - poolGKNN := common.GKNN{ - NamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, - GroupKind: schema.GroupKind{ - Group: DefaultPoolGroup, - Kind: "InferencePool", - }, - } return &ExtProcServerRunner{ GrpcPort: DefaultGrpcPort, - PoolNamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, - PoolGKNN: poolGKNN, SecureServing: DefaultSecureServing, HealthChecking: DefaultHealthChecking, RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, @@ -113,20 +101,22 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner { // SetupWithManager sets up the runner with the given manager. func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { // Create the controllers and register them with the manager - if err := (&controller.InferencePoolReconciler{ - Datastore: r.Datastore, - Reader: mgr.GetClient(), - PoolGKNN: r.PoolGKNN, - }).SetupWithManager(mgr); err != nil { - return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) - } + if !r.EndPointsPool.StandaloneMode { + if err := (&controller.InferencePoolReconciler{ + Datastore: r.Datastore, + Reader: mgr.GetClient(), + PoolGKNN: r.EndPointsPool.GKNN, + }).SetupWithManager(mgr); err != nil { + return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) + } - if err := (&controller.InferenceObjectiveReconciler{ - Datastore: r.Datastore, - Reader: mgr.GetClient(), - PoolGKNN: r.PoolGKNN, - }).SetupWithManager(ctx, mgr); err != nil { - return fmt.Errorf("failed setting up InferenceObjectiveReconciler: %w", err) + if err := (&controller.InferenceObjectiveReconciler{ + Datastore: r.Datastore, + Reader: mgr.GetClient(), + PoolGKNN: r.EndPointsPool.GKNN, + }).SetupWithManager(ctx, mgr); err != nil { + return fmt.Errorf("failed setting up InferenceObjectiveReconciler: %w", err) + } } if err := (&controller.PodReconciler{ diff --git a/test/utils/server.go b/test/utils/server.go index 9cf907d29..76060c105 100644 --- a/test/utils/server.go +++ b/test/utils/server.go @@ -72,7 +72,7 @@ func PrepareForTestStreamingServer(objectives []*v1alpha2.InferenceObjective, po Build() pool := testutil.MakeInferencePool(poolName).Namespace(namespace).ObjRef() pool.Spec.TargetPorts = []v1.Port{{Number: v1.PortNumber(poolPort)}} - _ = ds.PoolSet(context.Background(), fakeClient, pool) + _ = ds.EndPointsSet(context.Background(), fakeClient, pool) return ctx, cancel, ds, pmc } From 370f1a395254178678ac58272165467127a99491 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Sun, 2 Nov 2025 11:03:57 -0800 Subject: [PATCH 02/10] refactor Signed-off-by: Xiyue Yu --- cmd/epp/runner/health.go | 2 +- cmd/epp/runner/runner.go | 2 +- pkg/epp/backend/metrics/logger.go | 6 +- pkg/epp/backend/metrics/pod_metrics_test.go | 4 +- .../inferenceobjective_reconciler_test.go | 33 ++++---- .../controller/inferencepool_reconciler.go | 26 ++++++- .../inferencepool_reconciler_test.go | 8 +- pkg/epp/controller/pod_reconciler.go | 4 +- pkg/epp/controller/pod_reconciler_test.go | 4 +- pkg/epp/datalayer/factory.go | 4 +- pkg/epp/datastore/datastore.go | 52 ++++++------- pkg/epp/datastore/datastore_test.go | 2 +- pkg/epp/requestcontrol/director.go | 4 +- pkg/epp/requestcontrol/director_test.go | 64 +++++++++++++-- pkg/epp/util/pool/pool.go | 78 +++++++++++++++++++ test/integration/epp/hermetic_test.go | 2 +- 16 files changed, 223 insertions(+), 72 deletions(-) create mode 100644 pkg/epp/util/pool/pool.go diff --git a/cmd/epp/runner/health.go b/cmd/epp/runner/health.go index 1edbcff8e..c80cc4d20 100644 --- a/cmd/epp/runner/health.go +++ b/cmd/epp/runner/health.go @@ -44,7 +44,7 @@ const ( ) func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { - isLive := s.datastore.PoolHasSynced() + isLive := s.datastore.EndPointsPoolHasSynced() // If leader election is disabled, use current logic: all checks are based on whether the pool has synced. if !s.leaderElectionEnabled { diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 089929576..8df79a891 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -263,7 +263,7 @@ func (r *Runner) Run(ctx context.Context) error { if err != nil { return err } - datastore := datastore.NewDatastore(ctx, epf, int32(*modelServerMetricsPort), endPointsPool.EndPoints, endPointsPool.StandaloneMode) + datastore := datastore.NewDatastore(ctx, epf, int32(*modelServerMetricsPort), endPointsPool) // --- Setup Metrics Server --- customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)} diff --git a/pkg/epp/backend/metrics/logger.go b/pkg/epp/backend/metrics/logger.go index 69fc404e7..9f60ba76a 100644 --- a/pkg/epp/backend/metrics/logger.go +++ b/pkg/epp/backend/metrics/logger.go @@ -97,7 +97,7 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore datalayer.PoolInfo, } podTotalCount := len(podMetrics) - metrics.RecordInferencePoolAvgKVCache(pool.Name, kvCacheTotal/float64(podTotalCount)) - metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(queueTotal/podTotalCount)) - metrics.RecordInferencePoolReadyPods(pool.Name, float64(podTotalCount)) + metrics.RecordInferencePoolAvgKVCache(pool.GKNN.Name, kvCacheTotal/float64(podTotalCount)) + metrics.RecordInferencePoolAvgQueueSize(pool.GKNN.Name, float64(queueTotal/podTotalCount)) + metrics.RecordInferencePoolReadyPods(pool.GKNN.Name, float64(podTotalCount)) } diff --git a/pkg/epp/backend/metrics/pod_metrics_test.go b/pkg/epp/backend/metrics/pod_metrics_test.go index b0297cd1e..9d32f338e 100644 --- a/pkg/epp/backend/metrics/pod_metrics_test.go +++ b/pkg/epp/backend/metrics/pod_metrics_test.go @@ -86,8 +86,8 @@ func TestMetricsRefresh(t *testing.T) { type fakeDataStore struct{} -func (f *fakeDataStore) PoolGet() (*v1.InferencePool, error) { - return &v1.InferencePool{Spec: v1.InferencePoolSpec{TargetPorts: []v1.Port{{Number: 8000}}}}, nil +func (f *fakeDataStore) PoolGet() (*datalayer.EndPointsPool, error) { + return datalayer.NewEndPointsPool(), nil } func (f *fakeDataStore) PodList(func(PodMetrics) bool) []PodMetrics { diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 1a7ed4dd6..7b65269ec 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -18,6 +18,7 @@ package controller import ( "context" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "testing" "time" @@ -36,16 +37,17 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" ) var ( - pool = utiltest.MakeInferencePool("test-pool1").Namespace("ns1").ObjRef() + inferencePool = utiltest.MakeInferencePool("test-pool1").Namespace("ns1").ObjRef() infObjective1 = utiltest.MakeInferenceObjective("model1"). - Namespace(pool.Namespace). + Namespace(inferencePool.Namespace). Priority(1). CreationTimestamp(metav1.Unix(1000, 0)). - PoolName(pool.Name). + PoolName(inferencePool.Name). PoolGroup("inference.networking.k8s.io").ObjRef() infObjective1Pool2 = utiltest.MakeInferenceObjective(infObjective1.Name). Namespace(infObjective1.Namespace). @@ -57,24 +59,24 @@ var ( Namespace(infObjective1.Namespace). Priority(2). CreationTimestamp(metav1.Unix(1003, 0)). - PoolName(pool.Name). + PoolName(inferencePool.Name). PoolGroup("inference.networking.k8s.io").ObjRef() infObjective1Deleted = utiltest.MakeInferenceObjective(infObjective1.Name). Namespace(infObjective1.Namespace). CreationTimestamp(metav1.Unix(1004, 0)). DeletionTimestamp(). - PoolName(pool.Name). + PoolName(inferencePool.Name). PoolGroup("inference.networking.k8s.io").ObjRef() infObjective1DiffGroup = utiltest.MakeInferenceObjective(infObjective1.Name). - Namespace(pool.Namespace). + Namespace(inferencePool.Namespace). Priority(1). CreationTimestamp(metav1.Unix(1005, 0)). - PoolName(pool.Name). + PoolName(inferencePool.Name). PoolGroup("inference.networking.x-k8s.io").ObjRef() infObjective2 = utiltest.MakeInferenceObjective("model2"). - Namespace(pool.Namespace). + Namespace(inferencePool.Namespace). CreationTimestamp(metav1.Unix(1000, 0)). - PoolName(pool.Name). + PoolName(inferencePool.Name). PoolGroup("inference.networking.k8s.io").ObjRef() ) @@ -120,7 +122,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) { { name: "Objective not found, no matching existing objective to delete", objectivessInStore: []*v1alpha2.InferenceObjective{infObjective1}, - incomingReq: &types.NamespacedName{Name: "non-existent-objective", Namespace: pool.Namespace}, + incomingReq: &types.NamespacedName{Name: "non-existent-objective", Namespace: inferencePool.Namespace}, wantObjectives: []*v1alpha2.InferenceObjective{infObjective1}, }, { @@ -160,17 +162,18 @@ func TestInferenceObjectiveReconciler(t *testing.T) { WithObjects(initObjs...). Build() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(t.Context(), pmf, 0) + ds := datastore.NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool()) for _, m := range test.objectivessInStore { ds.ObjectiveSet(m) } - _ = ds.EndPointsSet(context.Background(), fakeClient, pool) + endPointsPool := pool.InferencePoolToEndPointsPool(inferencePool) + _ = ds.PoolSet(context.Background(), fakeClient, endPointsPool) reconciler := &InferenceObjectiveReconciler{ Reader: fakeClient, Datastore: ds, PoolGKNN: common.GKNN{ - NamespacedName: types.NamespacedName{Name: pool.Name, Namespace: pool.Namespace}, - GroupKind: schema.GroupKind{Group: pool.GroupVersionKind().Group, Kind: pool.GroupVersionKind().Kind}, + NamespacedName: types.NamespacedName{Name: inferencePool.Name, Namespace: inferencePool.Namespace}, + GroupKind: schema.GroupKind{Group: inferencePool.GroupVersionKind().Group, Kind: inferencePool.GroupVersionKind().Kind}, }, } if test.incomingReq == nil { @@ -191,7 +194,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) { t.Errorf("Unexpected; want: %d, got:%d", len(test.wantObjectives), len(ds.ObjectiveGetAll())) } - if diff := diffStore(ds, diffStoreParams{wantPool: pool, wantObjectives: test.wantObjectives}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: inferencePool, wantObjectives: test.wantObjectives}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index abe2aec86..156673914 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -19,11 +19,11 @@ package controller import ( "context" "fmt" - "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" @@ -92,8 +92,30 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques default: return ctrl.Result{}, fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group) } + gknn := common.GKNN{ + NamespacedName: req.NamespacedName, + GroupKind: c.PoolGKNN.GroupKind, + } + targetPorts := make([]int, 0, len(v1infPool.Spec.TargetPorts)) + for _, p := range v1infPool.Spec.TargetPorts { + targetPorts = append(targetPorts, int(p.Number)) + + } + selector := make(map[string]string, len(v1infPool.Spec.Selector.MatchLabels)) + for k, v := range v1infPool.Spec.Selector.MatchLabels { + selector[string(k)] = string(v) + } + endPoints := &datalayer.EndPoints{ + Selector: selector, + TargetPorts: targetPorts, + } + endPointsPool := &datalayer.EndPointsPool{ + EndPoints: endPoints, + StandaloneMode: false, + GKNN: gknn, + } - if err := c.Datastore.EndPointsSet(ctx, c.Reader, v1infPool); err != nil { + if err := c.Datastore.PoolSet(ctx, c.Reader, endPointsPool); err != nil { return ctrl.Result{}, fmt.Errorf("failed to update datastore - %w", err) } diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index a2bce1256..1ec1ae9a8 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -18,6 +18,8 @@ package controller import ( "context" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" "testing" "time" @@ -114,7 +116,7 @@ func TestInferencePoolReconciler(t *testing.T) { ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(ctx, pmf, 0) + ds := datastore.NewDatastore(ctx, pmf, 0, datalayer.NewEndPointsPool()) inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: ds, PoolGKNN: gknn} // Step 1: Inception, only ready pods matching pool1 are added to the store. @@ -261,7 +263,7 @@ func TestXInferencePoolReconciler(t *testing.T) { ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(ctx, pmf, 0) + ds := datastore.NewDatastore(ctx, pmf, 0, datalayer.NewEndPointsPool()) inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: ds, PoolGKNN: gknn} // Step 1: Inception, only ready pods matching pool1 are added to the store. @@ -332,7 +334,7 @@ func xDiffStore(t *testing.T, datastore datastore.Datastore, params xDiffStorePa gotXPool := &v1alpha2.InferencePool{} - err := gotXPool.ConvertFrom(gotPool) + err := gotXPool.ConvertFrom(pool.EndPointsPoolToInferencePool(gotPool)) if err != nil { t.Fatalf("failed to convert InferencePool to XInferencePool: %v", err) } diff --git a/pkg/epp/controller/pod_reconciler.go b/pkg/epp/controller/pod_reconciler.go index b3a78ef92..8add16ef3 100644 --- a/pkg/epp/controller/pod_reconciler.go +++ b/pkg/epp/controller/pod_reconciler.go @@ -41,8 +41,8 @@ type PodReconciler struct { func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) - if !c.Datastore.PoolHasSynced() { - logger.V(logutil.TRACE).Info("Skipping reconciling Pod because the InferencePool is not available yet") + if !c.Datastore.EndPointsPoolHasSynced() { + logger.V(logutil.TRACE).Info("Skipping reconciling Pod because the EndPointsPicker is not available yet") // When the inferencePool is initialized it lists the appropriate pods and populates the datastore, so no need to requeue. return ctrl.Result{}, nil } diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index 1e7f971be..f39b29852 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -18,6 +18,8 @@ package controller import ( "context" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "testing" "time" @@ -197,7 +199,7 @@ func TestPodReconciler(t *testing.T) { // Configure the initial state of the datastore. store := datastore.NewDatastore(t.Context(), pmf, 0) - _ = store.EndPointsSet(t.Context(), fakeClient, test.pool) + _ = store.PoolSet(t.Context(), fakeClient, test.pool) for _, pod := range test.existingPods { store.PodUpdateOrAddIfNotExist(pod) } diff --git a/pkg/epp/datalayer/factory.go b/pkg/epp/datalayer/factory.go index 989527c6c..4ed4d08ef 100644 --- a/pkg/epp/datalayer/factory.go +++ b/pkg/epp/datalayer/factory.go @@ -23,8 +23,6 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/log" - - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" ) // PoolInfo represents the DataStore information needed for endpoints. @@ -36,7 +34,7 @@ import ( // - Global metrics logging uses PoolGet solely for error return and PodList to enumerate // all endpoints for metrics summarization. type PoolInfo interface { - PoolGet() (*v1.InferencePool, error) + PoolGet() (*EndPointsPool, error) PodList(func(Endpoint) bool) []Endpoint } diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index 29807166a..f9f5360f4 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -49,8 +49,8 @@ type Datastore interface { // PoolSet sets the given pool in datastore. If the given pool has different label selector than the previous pool // that was stored, the function triggers a resync of the pods to keep the datastore updated. If the given pool // is nil, this call triggers the datastore.Clear() function. - EndPointsSet(ctx context.Context, reader client.Reader, pool *v1.InferencePool) error - PoolGet() (*v1.InferencePool, error) + PoolSet(ctx context.Context, reader client.Reader, endPointsPool *datalayer.EndPointsPool) error + PoolGet() (*datalayer.EndPointsPool, error) PoolHasSynced() bool PoolLabelsMatch(podLabels map[string]string) bool @@ -69,12 +69,11 @@ type Datastore interface { Clear() } -func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32, endPoints *datalayer.EndPoints, standaloneMode bool) Datastore { +func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32, endPointsPool *datalayer.EndPointsPool) Datastore { store := &datastore{ parentCtx: parentCtx, endPointsAndObjectivesMu: sync.RWMutex{}, - standaloneMode: standaloneMode, - endPoints: endPoints, + endPointsPool: endPointsPool, objectives: make(map[string]*v1alpha2.InferenceObjective), pods: &sync.Map{}, modelServerMetricsPort: modelServerMetricsPort, @@ -89,8 +88,7 @@ type datastore struct { // endPointsAndObjectivesMu is used to synchronize access to pool and the objectives map. endPointsAndObjectivesMu sync.RWMutex standaloneMode bool - // endPoints is used to filter the available model server endpoints - endPoints *datalayer.EndPoints + endPointsPool *datalayer.EndPointsPool // key: InferenceObjective.Spec.ModelName, value: *InferenceObjective objectives map[string]*v1alpha2.InferenceObjective // key: types.NamespacedName, value: backendmetrics.PodMetrics @@ -104,7 +102,7 @@ type datastore struct { func (ds *datastore) Clear() { ds.endPointsAndObjectivesMu.Lock() defer ds.endPointsAndObjectivesMu.Unlock() - ds.endPoints = nil + ds.endPointsPool = nil ds.objectives = make(map[string]*v1alpha2.InferenceObjective) // stop all pods go routines before clearing the pods map. ds.pods.Range(func(_, v any) bool { @@ -114,9 +112,9 @@ func (ds *datastore) Clear() { ds.pods.Clear() } -// /// InferencePool APIs /// -func (ds *datastore) EndPointsSet(ctx context.Context, reader client.Reader, pool *v1.InferencePool) error { - if pool == nil { +// /// EndPoints APIs /// +func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, endPointsPool *datalayer.EndPointsPool) error { + if endPointsPool == nil { ds.Clear() return nil } @@ -124,10 +122,10 @@ func (ds *datastore) EndPointsSet(ctx context.Context, reader client.Reader, poo ds.endPointsAndObjectivesMu.Lock() defer ds.endPointsAndObjectivesMu.Unlock() - oldPool := ds.pool - ds.pool = pool - if oldPool == nil || !reflect.DeepEqual(pool.Spec.Selector, oldPool.Spec.Selector) { - logger.V(logutil.DEFAULT).Info("Updating inference pool endpoints", "selector", pool.Spec.Selector) + oldEndPointsPool := ds.endPointsPool + ds.endPointsPool = endPointsPool + if oldEndPointsPool == nil || !reflect.DeepEqual(endPointsPool.EndPoints.Selector, endPointsPool.EndPoints.Selector) { + logger.V(logutil.DEFAULT).Info("Updating endpoints", "selector", endPointsPool.EndPoints.Selector) // A full resync is required to address two cases: // 1) At startup, the pod events may get processed before the pool is synced with the datastore, // and hence they will not be added to the store since pool selector is not known yet @@ -142,28 +140,28 @@ func (ds *datastore) EndPointsSet(ctx context.Context, reader client.Reader, poo return nil } -func (ds *datastore) PoolGet() (*v1.InferencePool, error) { +func (ds *datastore) PoolGet() (*datalayer.EndPointsPool, error) { ds.endPointsAndObjectivesMu.RLock() defer ds.endPointsAndObjectivesMu.RUnlock() if !ds.PoolHasSynced() { return nil, errPoolNotSynced } - return ds.pool, nil + return ds.endPointsPool, nil } func (ds *datastore) PoolHasSynced() bool { ds.endPointsAndObjectivesMu.RLock() defer ds.endPointsAndObjectivesMu.RUnlock() - return ds.pool != nil + return ds.endPointsPool != nil } func (ds *datastore) PoolLabelsMatch(podLabels map[string]string) bool { ds.endPointsAndObjectivesMu.RLock() defer ds.endPointsAndObjectivesMu.RUnlock() - if ds.pool == nil { + if ds.endPointsPool == nil { return false } - poolSelector := selectorFromInferencePoolSelector(ds.pool.Spec.Selector.MatchLabels) + poolSelector := labels.SelectorFromSet(ds.endPointsPool.EndPoints.Selector) podSet := labels.Set(podLabels) return poolSelector.Matches(podSet) } @@ -219,7 +217,7 @@ func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []b } func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { - if ds.pool == nil { + if ds.endPointsPool == nil { return true } @@ -229,14 +227,14 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { } modelServerMetricsPort := 0 - if len(ds.pool.Spec.TargetPorts) == 1 { + if len(ds.endPointsPool.EndPoints.TargetPorts) == 1 { modelServerMetricsPort = int(ds.modelServerMetricsPort) } pods := []*datalayer.PodInfo{} - for idx, port := range ds.pool.Spec.TargetPorts { + for idx, port := range ds.endPointsPool.EndPoints.TargetPorts { metricsPort := modelServerMetricsPort if metricsPort == 0 { - metricsPort = int(port.Number) + metricsPort = port } pods = append(pods, &datalayer.PodInfo{ @@ -246,7 +244,7 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { }, PodName: pod.Name, Address: pod.Status.PodIP, - Port: strconv.Itoa(int(port.Number)), + Port: strconv.Itoa(port), MetricsHost: net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(metricsPort)), Labels: labels, }) @@ -284,8 +282,8 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err logger := log.FromContext(ctx) podList := &corev1.PodList{} if err := reader.List(ctx, podList, &client.ListOptions{ - LabelSelector: selectorFromInferencePoolSelector(ds.pool.Spec.Selector.MatchLabels), - Namespace: ds.pool.Namespace, + LabelSelector: labels.SelectorFromSet(ds.endPointsPool.EndPoints.Selector), + Namespace: ds.endPointsPool.GKNN.Namespace, }); err != nil { return fmt.Errorf("failed to list pods - %w", err) } diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index 7acc87657..76c30c583 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -95,7 +95,7 @@ func TestPool(t *testing.T) { if diff := cmp.Diff(tt.wantPool, gotPool); diff != "" { t.Errorf("Unexpected pool diff (+got/-want): %s", diff) } - gotSynced := ds.PoolHasSynced() + gotSynced := ds.EndPointsPoolHasSynced() if diff := cmp.Diff(tt.wantSynced, gotSynced); diff != "" { t.Errorf("Unexpected synced diff (+got/-want): %s", diff) } diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index f6f7deebe..460be0d18 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -23,12 +23,12 @@ import ( "fmt" "math/rand" "net" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "strings" "time" "sigs.k8s.io/controller-runtime/pkg/log" - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" @@ -43,7 +43,7 @@ import ( // Datastore defines the interface required by the Director. type Datastore interface { - PoolGet() (*v1.InferencePool, error) + PoolGet() (*datalayer.EndPointsPool, error) ObjectiveGet(modelName string) *v1alpha2.InferenceObjective PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics } diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index a8778f7da..a312dca8a 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -20,6 +20,9 @@ import ( "context" "errors" "fmt" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "testing" "time" @@ -76,7 +79,7 @@ type mockDatastore struct { pods []backendmetrics.PodMetrics } -func (ds *mockDatastore) PoolGet() (*v1.InferencePool, error) { +func (ds *mockDatastore) PoolGet() (*datalayer.EndPointsPool, error) { return nil, nil } func (ds *mockDatastore) ObjectiveGet(_ string) *v1alpha2.InferenceObjective { @@ -120,7 +123,7 @@ func TestDirector_HandleRequest(t *testing.T) { // Datastore setup pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(t.Context(), pmf, 0) + ds := datastore.NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool()) ds.ObjectiveSet(ioFoodReview) ds.ObjectiveSet(ioFoodReviewResolve) ds.ObjectiveSet(ioFoodReviewSheddable) @@ -140,7 +143,29 @@ func TestDirector_HandleRequest(t *testing.T) { scheme := runtime.NewScheme() _ = clientgoscheme.AddToScheme(scheme) fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() - if err := ds.EndPointsSet(ctx, fakeClient, pool); err != nil { + targetPorts := make([]int, 0, len(pool.Spec.TargetPorts)) + for _, p := range pool.Spec.TargetPorts { + targetPorts = append(targetPorts, int(p.Number)) + + } + selector := make(map[string]string, len(pool.Spec.Selector.MatchLabels)) + for k, v := range pool.Spec.Selector.MatchLabels { + selector[string(k)] = string(v) + } + gknn := common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: pool.Namespace, Name: pool.Name}, + GroupKind: schema.GroupKind{Group: pool.GroupVersionKind().Group, Kind: pool.GroupVersionKind().Kind}, + } + endPoints := &datalayer.EndPoints{ + Selector: selector, + TargetPorts: targetPorts, + } + endPointsPool := &datalayer.EndPointsPool{ + EndPoints: endPoints, + StandaloneMode: false, + GKNN: gknn, + } + if err := ds.PoolSet(ctx, fakeClient, endPointsPool); err != nil { t.Fatalf("Error while setting inference pool: %v", err) } @@ -594,8 +619,31 @@ func TestGetRandomPod(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Millisecond) - ds := datastore.NewDatastore(t.Context(), pmf, 0) - err := ds.EndPointsSet(t.Context(), fakeClient, pool) + targetPorts := make([]int, 0, len(pool.Spec.TargetPorts)) + for _, p := range pool.Spec.TargetPorts { + targetPorts = append(targetPorts, int(p.Number)) + + } + selector := make(map[string]string, len(pool.Spec.Selector.MatchLabels)) + for k, v := range pool.Spec.Selector.MatchLabels { + selector[string(k)] = string(v) + } + gknn := common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: pool.Namespace, Name: pool.Name}, + GroupKind: schema.GroupKind{Group: pool.GroupVersionKind().Group, Kind: pool.GroupVersionKind().Kind}, + } + endPoints := &datalayer.EndPoints{ + Selector: selector, + TargetPorts: targetPorts, + } + endPointsPool := &datalayer.EndPointsPool{ + EndPoints: endPoints, + StandaloneMode: false, + GKNN: gknn, + } + + ds := datastore.NewDatastore(t.Context(), pmf, 0, endPointsPool) + err := ds.PoolSet(t.Context(), fakeClient, endPointsPool) if err != nil { t.Errorf("unexpected error setting pool: %s", err) } @@ -619,7 +667,7 @@ func TestDirector_HandleResponseReceived(t *testing.T) { pr1 := newTestResponseReceived("pr1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) - ds := datastore.NewDatastore(t.Context(), nil, 0) + ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool()) mockSched := &mockScheduler{} director := NewDirectorWithConfig(ds, mockSched, &mockAdmissionController{}, NewConfig().WithResponseReceivedPlugins(pr1)) @@ -656,7 +704,7 @@ func TestDirector_HandleResponseStreaming(t *testing.T) { ps1 := newTestResponseStreaming("ps1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) - ds := datastore.NewDatastore(t.Context(), nil, 0) + ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool()) mockSched := &mockScheduler{} director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseStreamingPlugins(ps1)) @@ -692,7 +740,7 @@ func TestDirector_HandleResponseComplete(t *testing.T) { pc1 := newTestResponseComplete("pc1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) - ds := datastore.NewDatastore(t.Context(), nil, 0) + ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool()) mockSched := &mockScheduler{} director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseCompletePlugins(pc1)) diff --git a/pkg/epp/util/pool/pool.go b/pkg/epp/util/pool/pool.go new file mode 100644 index 000000000..c903f5d20 --- /dev/null +++ b/pkg/epp/util/pool/pool.go @@ -0,0 +1,78 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package pool + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" +) + +func InferencePoolToEndPointsPool(inferencePool *v1.InferencePool) *datalayer.EndPointsPool { + targetPorts := make([]int, 0, len(inferencePool.Spec.TargetPorts)) + for _, p := range inferencePool.Spec.TargetPorts { + targetPorts = append(targetPorts, int(p.Number)) + + } + selector := make(map[string]string, len(inferencePool.Spec.Selector.MatchLabels)) + for k, v := range inferencePool.Spec.Selector.MatchLabels { + selector[string(k)] = string(v) + } + gknn := common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: inferencePool.Namespace, Name: inferencePool.Name}, + GroupKind: schema.GroupKind{Group: inferencePool.GroupVersionKind().Group, Kind: inferencePool.GroupVersionKind().Kind}, + } + endPoints := &datalayer.EndPoints{ + Selector: selector, + TargetPorts: targetPorts, + } + endPointsPool := &datalayer.EndPointsPool{ + EndPoints: endPoints, + StandaloneMode: false, + GKNN: gknn, + } + return endPointsPool +} + +func EndPointsPoolToInferencePool(endPointsPool *datalayer.EndPointsPool) *v1.InferencePool { + targetPorts := make([]v1.Port, 0, len(endPointsPool.EndPoints.TargetPorts)) + for _, p := range endPointsPool.EndPoints.TargetPorts { + targetPorts = append(targetPorts, v1.Port{Number: v1.PortNumber(p)}) + } + labels := make(map[v1.LabelKey]v1.LabelValue, len(endPointsPool.EndPoints.Selector)) + for k, v := range endPointsPool.EndPoints.Selector { + labels[v1.LabelKey(k)] = v1.LabelValue(v) + } + + inferencePool := &v1.InferencePool{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "inference.networking.k8s.io/v1", + Kind: "InferencePool", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: endPointsPool.GKNN.Name, + Namespace: endPointsPool.GKNN.Namespace, + }, + Spec: v1.InferencePoolSpec{ + Selector: v1.LabelSelector{MatchLabels: labels}, + TargetPorts: targetPorts, + }, + } + return inferencePool +} diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 9ce4fec64..6c0cd39a2 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -1238,7 +1238,7 @@ func BeforeSuite() func() { assert.Eventually(nil, func() bool { modelExist := serverRunner.Datastore.ObjectiveGet(modelMyModel) - synced := serverRunner.Datastore.PoolHasSynced() && modelExist != nil + synced := serverRunner.Datastore.EndPointsPoolHasSynced() && modelExist != nil return synced }, 10*time.Second, 10*time.Millisecond) From 85e4622c47c3e61b186b52a43ecafbd1bcc40c17 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Thu, 6 Nov 2025 11:02:56 -0800 Subject: [PATCH 03/10] fixed some ut --- .../inferenceobjective_reconciler_test.go | 5 ++--- .../controller/inferencepool_reconciler_test.go | 15 +++++++++------ pkg/epp/controller/pod_reconciler.go | 2 +- pkg/epp/controller/pod_reconciler_test.go | 6 +++--- pkg/epp/datalayer/endpointsPools.go | 10 +++++++--- pkg/epp/util/pool/pool.go | 13 +++++++++++++ pkg/epp/util/testing/wrappers.go | 16 ++++++++++++++++ 7 files changed, 51 insertions(+), 16 deletions(-) diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 7b65269ec..81364af4b 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -162,7 +162,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) { WithObjects(initObjs...). Build() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool()) + ds := datastore.NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, pool.ToGKNN(inferencePool))) for _, m := range test.objectivessInStore { ds.ObjectiveSet(m) } @@ -193,8 +193,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) { if len(test.wantObjectives) != len(ds.ObjectiveGetAll()) { t.Errorf("Unexpected; want: %d, got:%d", len(test.wantObjectives), len(ds.ObjectiveGetAll())) } - - if diff := diffStore(ds, diffStoreParams{wantPool: inferencePool, wantObjectives: test.wantObjectives}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: endPointsPool, wantObjectives: test.wantObjectives}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index 1ec1ae9a8..06d12b01b 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -116,14 +116,15 @@ func TestInferencePoolReconciler(t *testing.T) { ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(ctx, pmf, 0, datalayer.NewEndPointsPool()) + ds := datastore.NewDatastore(ctx, pmf, 0, datalayer.NewEndPointsPool(false, gknn)) inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: ds, PoolGKNN: gknn} // Step 1: Inception, only ready pods matching pool1 are added to the store. if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPool: pool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { + endPointsPool1 := pool.InferencePoolToEndPointsPool(pool1) + if diff := diffStore(ds, diffStoreParams{wantPool: endPointsPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -141,7 +142,8 @@ func TestInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPool: newPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + newEndPointsPool1 := pool.InferencePoolToEndPointsPool(newPool1) + if diff := diffStore(ds, diffStoreParams{wantPool: newEndPointsPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -156,7 +158,8 @@ func TestInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPool: newPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + newEndPointsPool1 = pool.InferencePoolToEndPointsPool(newPool1) + if diff := diffStore(ds, diffStoreParams{wantPool: newEndPointsPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -176,7 +179,7 @@ func TestInferencePoolReconciler(t *testing.T) { } type diffStoreParams struct { - wantPool *v1.InferencePool + wantPool *datalayer.EndPointsPool wantPods []string wantObjectives []*v1alpha2.InferenceObjective } @@ -263,7 +266,7 @@ func TestXInferencePoolReconciler(t *testing.T) { ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(ctx, pmf, 0, datalayer.NewEndPointsPool()) + ds := datastore.NewDatastore(ctx, pmf, 0, datalayer.NewEndPointsPool(false, gknn)) inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: ds, PoolGKNN: gknn} // Step 1: Inception, only ready pods matching pool1 are added to the store. diff --git a/pkg/epp/controller/pod_reconciler.go b/pkg/epp/controller/pod_reconciler.go index 8add16ef3..b3a73f313 100644 --- a/pkg/epp/controller/pod_reconciler.go +++ b/pkg/epp/controller/pod_reconciler.go @@ -41,7 +41,7 @@ type PodReconciler struct { func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) - if !c.Datastore.EndPointsPoolHasSynced() { + if !c.Datastore.PoolHasSynced() { logger.V(logutil.TRACE).Info("Skipping reconciling Pod because the EndPointsPicker is not available yet") // When the inferencePool is initialized it lists the appropriate pods and populates the datastore, so no need to requeue. return ctrl.Result{}, nil diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index f39b29852..253a60b34 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -18,7 +18,6 @@ package controller import ( "context" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "testing" "time" @@ -37,6 +36,7 @@ import ( v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" ) @@ -198,8 +198,8 @@ func TestPodReconciler(t *testing.T) { Build() // Configure the initial state of the datastore. - store := datastore.NewDatastore(t.Context(), pmf, 0) - _ = store.PoolSet(t.Context(), fakeClient, test.pool) + store := datastore.NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, pool.ToGKNN(test.pool))) + _ = store.PoolSet(t.Context(), fakeClient, pool.InferencePoolToEndPointsPool(test.pool)) for _, pod := range test.existingPods { store.PodUpdateOrAddIfNotExist(pod) } diff --git a/pkg/epp/datalayer/endpointsPools.go b/pkg/epp/datalayer/endpointsPools.go index cd395ed15..5e8051ffb 100644 --- a/pkg/epp/datalayer/endpointsPools.go +++ b/pkg/epp/datalayer/endpointsPools.go @@ -16,7 +16,9 @@ limitations under the License. package datalayer -import "sigs.k8s.io/gateway-api-inference-extension/pkg/common" +import ( + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" +) type EndPointsPool struct { EndPoints *EndPoints @@ -25,10 +27,12 @@ type EndPointsPool struct { } // NewEndPointsPool creates and returns a new empty instance of EndPointsPool. -func NewEndPointsPool() *EndPointsPool { +func NewEndPointsPool(standAloneMode bool, gknn common.GKNN) *EndPointsPool { endPoints := NewEndPoints() return &EndPointsPool{ - EndPoints: endPoints, + GKNN: gknn, + StandaloneMode: standAloneMode, + EndPoints: endPoints, } } diff --git a/pkg/epp/util/pool/pool.go b/pkg/epp/util/pool/pool.go index c903f5d20..6b5c8d043 100644 --- a/pkg/epp/util/pool/pool.go +++ b/pkg/epp/util/pool/pool.go @@ -76,3 +76,16 @@ func EndPointsPoolToInferencePool(endPointsPool *datalayer.EndPointsPool) *v1.In } return inferencePool } + +func ToGKNN(ip *v1.InferencePool) common.GKNN { + return common.GKNN{ + NamespacedName: types.NamespacedName{ + Name: ip.Name, + Namespace: ip.ObjectMeta.Namespace, + }, + GroupKind: schema.GroupKind{ + Group: ip.GroupVersionKind().Group, + Kind: ip.GroupVersionKind().Kind, + }, + } +} diff --git a/pkg/epp/util/testing/wrappers.go b/pkg/epp/util/testing/wrappers.go index 7621bff96..2ad02c55d 100644 --- a/pkg/epp/util/testing/wrappers.go +++ b/pkg/epp/util/testing/wrappers.go @@ -19,8 +19,11 @@ package testing import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" ) // PodWrapper wraps a Pod. @@ -219,6 +222,19 @@ func (m *InferencePoolWrapper) ObjRef() *v1.InferencePool { return &m.InferencePool } +func (m *InferencePoolWrapper) ToGKNN() common.GKNN { + return common.GKNN{ + NamespacedName: types.NamespacedName{ + Name: m.Name, + Namespace: m.ObjectMeta.Namespace, + }, + GroupKind: schema.GroupKind{ + Group: "inference.networking.k8s.io", + Kind: "InferencePool", + }, + } +} + // AlphaInferencePoolWrapper wraps an group "inference.networking.x-k8s.io" InferencePool. type AlphaInferencePoolWrapper struct { v1alpha2.InferencePool From 579b17ef783f4034f1337913fc9dc86d54fface2 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Thu, 6 Nov 2025 14:10:40 -0800 Subject: [PATCH 04/10] make epp controller ut pass --- .../controller/inferencepool_reconciler.go | 38 +++---------------- .../inferencepool_reconciler_test.go | 33 ++++++---------- pkg/epp/datastore/datastore.go | 2 +- pkg/epp/util/pool/pool.go | 25 +++++++++++- 4 files changed, 42 insertions(+), 56 deletions(-) diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index 156673914..06942316d 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "k8s.io/apimachinery/pkg/api/errors" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -30,6 +31,7 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + pooltuil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" ) // InferencePoolReconciler utilizes the controller runtime to reconcile Instance Gateway resources @@ -75,45 +77,15 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques c.Datastore.Clear() return ctrl.Result{}, nil } - - // 4. Convert the fetched object to the canonical v1.InferencePool. - v1infPool := &v1.InferencePool{} - + endPointsPool := &datalayer.EndPointsPool{} switch pool := obj.(type) { case *v1.InferencePool: - // If it's already a v1 object, just use it. - v1infPool = pool + endPointsPool = pooltuil.InferencePoolToEndPointsPool(pool) case *v1alpha2.InferencePool: - var err error - err = pool.ConvertTo(v1infPool) - if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to convert XInferencePool to InferencePool - %w", err) - } + endPointsPool = pooltuil.AlphaInferencePoolToEndPointsPool(pool) default: return ctrl.Result{}, fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group) } - gknn := common.GKNN{ - NamespacedName: req.NamespacedName, - GroupKind: c.PoolGKNN.GroupKind, - } - targetPorts := make([]int, 0, len(v1infPool.Spec.TargetPorts)) - for _, p := range v1infPool.Spec.TargetPorts { - targetPorts = append(targetPorts, int(p.Number)) - - } - selector := make(map[string]string, len(v1infPool.Spec.Selector.MatchLabels)) - for k, v := range v1infPool.Spec.Selector.MatchLabels { - selector[string(k)] = string(v) - } - endPoints := &datalayer.EndPoints{ - Selector: selector, - TargetPorts: targetPorts, - } - endPointsPool := &datalayer.EndPointsPool{ - EndPoints: endPoints, - StandaloneMode: false, - GKNN: gknn, - } if err := c.Datastore.PoolSet(ctx, c.Reader, endPointsPool); err != nil { return ctrl.Result{}, fmt.Errorf("failed to update datastore - %w", err) diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index 06d12b01b..9b0808348 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -18,15 +18,15 @@ package controller import ( "context" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" "testing" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -138,7 +138,6 @@ func TestInferencePoolReconciler(t *testing.T) { if err := fakeClient.Update(ctx, newPool1, &client.UpdateOptions{}); err != nil { t.Errorf("Unexpected pool update error: %v", err) } - if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } @@ -186,9 +185,7 @@ type diffStoreParams struct { func diffStore(datastore datastore.Datastore, params diffStoreParams) string { gotPool, _ := datastore.PoolGet() - // controller-runtime fake client may not populate TypeMeta (APIVersion/Kind). - // Ignore it when comparing pools. - if diff := cmp.Diff(params.wantPool, gotPool, cmpopts.IgnoreTypes(metav1.TypeMeta{})); diff != "" { + if diff := cmp.Diff(params.wantPool, gotPool); diff != "" { return "pool:" + diff } @@ -273,7 +270,8 @@ func TestXInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: pool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { + endPointsPool1 := pool.AlphaInferencePoolToEndPointsPool(pool1) + if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: endPointsPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -289,7 +287,8 @@ func TestXInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: newPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + newEndPointsPoll1 := pool.AlphaInferencePoolToEndPointsPool(newPool1) + if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: newEndPointsPoll1, wantPods: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -304,7 +303,8 @@ func TestXInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: newPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + newEndPointsPool1 := pool.AlphaInferencePoolToEndPointsPool(newPool1) + if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: newEndPointsPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -324,7 +324,7 @@ func TestXInferencePoolReconciler(t *testing.T) { } type xDiffStoreParams struct { - wantPool *v1alpha2.InferencePool + wantPool *datalayer.EndPointsPool wantPods []string wantObjectives []*v1alpha2.InferenceObjective } @@ -335,16 +335,7 @@ func xDiffStore(t *testing.T, datastore datastore.Datastore, params xDiffStorePa return "" } - gotXPool := &v1alpha2.InferencePool{} - - err := gotXPool.ConvertFrom(pool.EndPointsPoolToInferencePool(gotPool)) - if err != nil { - t.Fatalf("failed to convert InferencePool to XInferencePool: %v", err) - } - - // controller-runtime fake client may not populate TypeMeta (APIVersion/Kind). - // Ignore it when comparing pools. - if diff := cmp.Diff(params.wantPool, gotXPool, cmpopts.IgnoreTypes(metav1.TypeMeta{})); diff != "" { + if diff := cmp.Diff(params.wantPool, gotPool); diff != "" { return "pool:" + diff } diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index f9f5360f4..2384161eb 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -124,7 +124,7 @@ func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, endPoint oldEndPointsPool := ds.endPointsPool ds.endPointsPool = endPointsPool - if oldEndPointsPool == nil || !reflect.DeepEqual(endPointsPool.EndPoints.Selector, endPointsPool.EndPoints.Selector) { + if oldEndPointsPool == nil || !reflect.DeepEqual(oldEndPointsPool.EndPoints.Selector, endPointsPool.EndPoints.Selector) { logger.V(logutil.DEFAULT).Info("Updating endpoints", "selector", endPointsPool.EndPoints.Selector) // A full resync is required to address two cases: // 1) At startup, the pod events may get processed before the pool is synced with the datastore, diff --git a/pkg/epp/util/pool/pool.go b/pkg/epp/util/pool/pool.go index 6b5c8d043..b0e443d7a 100644 --- a/pkg/epp/util/pool/pool.go +++ b/pkg/epp/util/pool/pool.go @@ -20,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + v1alpha2 "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" ) @@ -36,7 +37,29 @@ func InferencePoolToEndPointsPool(inferencePool *v1.InferencePool) *datalayer.En } gknn := common.GKNN{ NamespacedName: types.NamespacedName{Namespace: inferencePool.Namespace, Name: inferencePool.Name}, - GroupKind: schema.GroupKind{Group: inferencePool.GroupVersionKind().Group, Kind: inferencePool.GroupVersionKind().Kind}, + GroupKind: schema.GroupKind{Group: "inference.networking.k8s.io", Kind: "InferencePool"}, + } + endPoints := &datalayer.EndPoints{ + Selector: selector, + TargetPorts: targetPorts, + } + endPointsPool := &datalayer.EndPointsPool{ + EndPoints: endPoints, + StandaloneMode: false, + GKNN: gknn, + } + return endPointsPool +} + +func AlphaInferencePoolToEndPointsPool(inferencePool *v1alpha2.InferencePool) *datalayer.EndPointsPool { + targetPorts := []int{int(inferencePool.Spec.TargetPortNumber)} + selector := make(map[string]string, len(inferencePool.Spec.Selector)) + for k, v := range inferencePool.Spec.Selector { + selector[string(k)] = string(v) + } + gknn := common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: inferencePool.Namespace, Name: inferencePool.Name}, + GroupKind: schema.GroupKind{Group: "inference.networking.x-k8s.io", Kind: "InferencePool"}, } endPoints := &datalayer.EndPoints{ Selector: selector, From e9d704d40b52adb7041a8e2974c39745c98ec8b1 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Thu, 6 Nov 2025 14:54:11 -0800 Subject: [PATCH 05/10] make ut pass --- pkg/epp/backend/metrics/pod_metrics_test.go | 3 +- pkg/epp/datalayer/metrics/logger.go | 6 +-- pkg/epp/datastore/datastore.go | 2 +- pkg/epp/datastore/datastore_test.go | 30 ++++++++----- pkg/epp/handlers/server.go | 4 +- pkg/epp/metrics/collectors/inference_pool.go | 2 +- .../metrics/collectors/inference_pool_test.go | 21 +++++---- pkg/epp/requestcontrol/director_test.go | 44 +++++++------------ pkg/epp/util/pool/pool.go | 6 +++ test/utils/server.go | 13 +++++- 10 files changed, 71 insertions(+), 60 deletions(-) diff --git a/pkg/epp/backend/metrics/pod_metrics_test.go b/pkg/epp/backend/metrics/pod_metrics_test.go index 9d32f338e..eff9547ad 100644 --- a/pkg/epp/backend/metrics/pod_metrics_test.go +++ b/pkg/epp/backend/metrics/pod_metrics_test.go @@ -25,7 +25,6 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/types" - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" ) @@ -87,7 +86,7 @@ func TestMetricsRefresh(t *testing.T) { type fakeDataStore struct{} func (f *fakeDataStore) PoolGet() (*datalayer.EndPointsPool, error) { - return datalayer.NewEndPointsPool(), nil + return &datalayer.EndPointsPool{}, nil } func (f *fakeDataStore) PodList(func(PodMetrics) bool) []PodMetrics { diff --git a/pkg/epp/datalayer/metrics/logger.go b/pkg/epp/datalayer/metrics/logger.go index fac757dbe..75cbb8414 100644 --- a/pkg/epp/datalayer/metrics/logger.go +++ b/pkg/epp/datalayer/metrics/logger.go @@ -116,9 +116,9 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore datalayer.PoolInfo, totals := calculateTotals(podMetrics) podCount := len(podMetrics) - metrics.RecordInferencePoolAvgKVCache(pool.Name, totals.kvCache/float64(podCount)) - metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(totals.queueSize/podCount)) - metrics.RecordInferencePoolReadyPods(pool.Name, float64(podCount)) + metrics.RecordInferencePoolAvgKVCache(pool.GKNN.Name, totals.kvCache/float64(podCount)) + metrics.RecordInferencePoolAvgQueueSize(pool.GKNN.Name, float64(totals.queueSize/podCount)) + metrics.RecordInferencePoolReadyPods(pool.GKNN.Name, float64(podCount)) } // totals holds aggregated metric values diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index 2384161eb..01b12dd54 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -217,7 +217,7 @@ func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []b } func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { - if ds.endPointsPool == nil { + if ds.endPointsPool == nil || ds.endPointsPool.EndPoints == nil { return true } diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index 76c30c583..df1907398 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client/fake" + pooltuil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" @@ -86,16 +87,18 @@ func TestPool(t *testing.T) { WithScheme(scheme). Build() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := NewDatastore(context.Background(), pmf, 0) - _ = ds.EndPointsSet(context.Background(), fakeClient, tt.inferencePool) + gknn := pooltuil.ToGKNN(tt.inferencePool) + endPointPool := datalayer.NewEndPointsPool(false, gknn) + ds := NewDatastore(context.Background(), pmf, 0, endPointPool) + _ = ds.PoolSet(context.Background(), fakeClient, pooltuil.InferencePoolToEndPointsPool(tt.inferencePool)) gotPool, gotErr := ds.PoolGet() if diff := cmp.Diff(tt.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { t.Errorf("Unexpected error diff (+got/-want): %s", diff) } - if diff := cmp.Diff(tt.wantPool, gotPool); diff != "" { + if diff := cmp.Diff(pooltuil.InferencePoolToEndPointsPool(tt.wantPool), gotPool); diff != "" { t.Errorf("Unexpected pool diff (+got/-want): %s", diff) } - gotSynced := ds.EndPointsPoolHasSynced() + gotSynced := ds.PoolHasSynced() if diff := cmp.Diff(tt.wantSynced, gotSynced); diff != "" { t.Errorf("Unexpected synced diff (+got/-want): %s", diff) } @@ -120,6 +123,10 @@ func TestObjective(t *testing.T) { Priority(2).ObjRef() // Same object name as model2ts, different model name. model2chat := testutil.MakeInferenceObjective(model2ts.Name).ObjRef() + pool1Selector := map[string]string{"app": "vllm_v1"} + pool1 := testutil.MakeInferencePool("pool1"). + Namespace("default"). + Selector(pool1Selector).ObjRef() tests := []struct { name string @@ -193,7 +200,7 @@ func TestObjective(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := NewDatastore(t.Context(), pmf, 0) + ds := NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, pooltuil.ToGKNN(pool1))) for _, m := range test.existingModels { ds.ObjectiveSet(m) } @@ -327,8 +334,9 @@ func TestMetrics(t *testing.T) { WithScheme(scheme). Build() pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond) - ds := NewDatastore(ctx, pmf, 0) - _ = ds.EndPointsSet(ctx, fakeClient, inferencePool) + gknn := pooltuil.ToGKNN(inferencePool) + ds := NewDatastore(ctx, pmf, 0, datalayer.NewEndPointsPool(false, gknn)) + _ = ds.PoolSet(ctx, fakeClient, pooltuil.InferencePoolToEndPointsPool(inferencePool)) for _, pod := range test.storePods { ds.PodUpdateOrAddIfNotExist(pod) } @@ -395,9 +403,9 @@ func TestPods(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := NewDatastore(t.Context(), pmf, 0) + ds := NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, pooltuil.ToGKNN(inferencePool))) fakeClient := fake.NewFakeClient() - if err := ds.EndPointsSet(ctx, fakeClient, inferencePool); err != nil { + if err := ds.PoolSet(ctx, fakeClient, pooltuil.InferencePoolToEndPointsPool(inferencePool)); err != nil { t.Error(err) } for _, pod := range test.existingPods { @@ -579,9 +587,9 @@ func TestPodInfo(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := NewDatastore(t.Context(), pmf, 0) + ds := NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, pooltuil.ToGKNN(test.pool))) fakeClient := fake.NewFakeClient() - if err := ds.EndPointsSet(ctx, fakeClient, test.pool); err != nil { + if err := ds.PoolSet(ctx, fakeClient, pooltuil.InferencePoolToEndPointsPool(test.pool)); err != nil { t.Error(err) } for _, pod := range test.existingPods { diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 0d5305574..861d2a040 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -30,8 +30,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" @@ -61,7 +61,7 @@ type Director interface { } type Datastore interface { - PoolGet() (*v1.InferencePool, error) + PoolGet() (*datalayer.EndPointsPool, error) } // Server implements the Envoy external processing server. diff --git a/pkg/epp/metrics/collectors/inference_pool.go b/pkg/epp/metrics/collectors/inference_pool.go index ec3def164..1bb6e206e 100644 --- a/pkg/epp/metrics/collectors/inference_pool.go +++ b/pkg/epp/metrics/collectors/inference_pool.go @@ -73,7 +73,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) { descInferencePoolPerPodQueueSize, prometheus.GaugeValue, float64(pod.GetMetrics().WaitingQueueSize), - pool.Name, + pool.GKNN.Name, pod.GetPod().NamespacedName.Name, ) } diff --git a/pkg/epp/metrics/collectors/inference_pool_test.go b/pkg/epp/metrics/collectors/inference_pool_test.go index e6a9c574c..267066185 100644 --- a/pkg/epp/metrics/collectors/inference_pool_test.go +++ b/pkg/epp/metrics/collectors/inference_pool_test.go @@ -28,6 +28,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/component-base/metrics/testutil" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + poolutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" @@ -50,7 +53,7 @@ var ( func TestNoMetricsCollected(t *testing.T) { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(context.Background(), pmf, 0) + ds := datastore.NewDatastore(context.Background(), pmf, 0, datalayer.NewEndPointsPool(false, common.GKNN{})) collector := &inferencePoolMetricsCollector{ ds: ds, @@ -68,13 +71,6 @@ func TestMetricsCollected(t *testing.T) { }, } pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond) - ds := datastore.NewDatastore(context.Background(), pmf, 0) - - scheme := runtime.NewScheme() - fakeClient := fake.NewClientBuilder(). - WithScheme(scheme). - Build() - inferencePool := &v1.InferencePool{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pool", @@ -83,7 +79,14 @@ func TestMetricsCollected(t *testing.T) { TargetPorts: []v1.Port{{Number: v1.PortNumber(int32(8000))}}, }, } - _ = ds.EndPointsSet(context.Background(), fakeClient, inferencePool) + ds := datastore.NewDatastore(context.Background(), pmf, 0, datalayer.NewEndPointsPool(false, poolutil.ToGKNN(inferencePool))) + + scheme := runtime.NewScheme() + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + Build() + + _ = ds.PoolSet(context.Background(), fakeClient, poolutil.InferencePoolToEndPointsPool(inferencePool)) _ = ds.PodUpdateOrAddIfNotExist(pod1) time.Sleep(1 * time.Second) diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index a312dca8a..b2c955723 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -20,9 +20,12 @@ import ( "context" "errors" "fmt" + "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + poolutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" + "testing" "time" @@ -120,14 +123,6 @@ func TestDirector_HandleRequest(t *testing.T) { CreationTimestamp(metav1.Unix(1000, 0)). Priority(1). ObjRef() - - // Datastore setup - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool()) - ds.ObjectiveSet(ioFoodReview) - ds.ObjectiveSet(ioFoodReviewResolve) - ds.ObjectiveSet(ioFoodReviewSheddable) - pool := &v1.InferencePool{ ObjectMeta: metav1.ObjectMeta{Name: "test-pool", Namespace: "default"}, Spec: v1.InferencePoolSpec{ @@ -140,6 +135,13 @@ func TestDirector_HandleRequest(t *testing.T) { }, } + // Datastore setup + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) + ds := datastore.NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, poolutil.ToGKNN(pool))) + ds.ObjectiveSet(ioFoodReview) + ds.ObjectiveSet(ioFoodReviewResolve) + ds.ObjectiveSet(ioFoodReviewSheddable) + scheme := runtime.NewScheme() _ = clientgoscheme.AddToScheme(scheme) fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() @@ -148,24 +150,8 @@ func TestDirector_HandleRequest(t *testing.T) { targetPorts = append(targetPorts, int(p.Number)) } - selector := make(map[string]string, len(pool.Spec.Selector.MatchLabels)) - for k, v := range pool.Spec.Selector.MatchLabels { - selector[string(k)] = string(v) - } - gknn := common.GKNN{ - NamespacedName: types.NamespacedName{Namespace: pool.Namespace, Name: pool.Name}, - GroupKind: schema.GroupKind{Group: pool.GroupVersionKind().Group, Kind: pool.GroupVersionKind().Kind}, - } - endPoints := &datalayer.EndPoints{ - Selector: selector, - TargetPorts: targetPorts, - } - endPointsPool := &datalayer.EndPointsPool{ - EndPoints: endPoints, - StandaloneMode: false, - GKNN: gknn, - } - if err := ds.PoolSet(ctx, fakeClient, endPointsPool); err != nil { + + if err := ds.PoolSet(ctx, fakeClient, poolutil.InferencePoolToEndPointsPool(pool)); err != nil { t.Fatalf("Error while setting inference pool: %v", err) } @@ -667,7 +653,7 @@ func TestDirector_HandleResponseReceived(t *testing.T) { pr1 := newTestResponseReceived("pr1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) - ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool()) + ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool(false, common.GKNN{})) mockSched := &mockScheduler{} director := NewDirectorWithConfig(ds, mockSched, &mockAdmissionController{}, NewConfig().WithResponseReceivedPlugins(pr1)) @@ -704,7 +690,7 @@ func TestDirector_HandleResponseStreaming(t *testing.T) { ps1 := newTestResponseStreaming("ps1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) - ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool()) + ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool(false, common.GKNN{})) mockSched := &mockScheduler{} director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseStreamingPlugins(ps1)) @@ -740,7 +726,7 @@ func TestDirector_HandleResponseComplete(t *testing.T) { pc1 := newTestResponseComplete("pc1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) - ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool()) + ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool(false, common.GKNN{})) mockSched := &mockScheduler{} director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseCompletePlugins(pc1)) diff --git a/pkg/epp/util/pool/pool.go b/pkg/epp/util/pool/pool.go index b0e443d7a..004627254 100644 --- a/pkg/epp/util/pool/pool.go +++ b/pkg/epp/util/pool/pool.go @@ -26,6 +26,9 @@ import ( ) func InferencePoolToEndPointsPool(inferencePool *v1.InferencePool) *datalayer.EndPointsPool { + if inferencePool == nil { + return nil + } targetPorts := make([]int, 0, len(inferencePool.Spec.TargetPorts)) for _, p := range inferencePool.Spec.TargetPorts { targetPorts = append(targetPorts, int(p.Number)) @@ -101,6 +104,9 @@ func EndPointsPoolToInferencePool(endPointsPool *datalayer.EndPointsPool) *v1.In } func ToGKNN(ip *v1.InferencePool) common.GKNN { + if ip == nil { + return common.GKNN{} + } return common.GKNN{ NamespacedName: types.NamespacedName{ Name: ip.Name, diff --git a/test/utils/server.go b/test/utils/server.go index 76060c105..5afc5ad05 100644 --- a/test/utils/server.go +++ b/test/utils/server.go @@ -29,9 +29,14 @@ import ( "google.golang.org/grpc/test/bufconn" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + pooltuil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" @@ -50,7 +55,11 @@ func PrepareForTestStreamingServer(objectives []*v1alpha2.InferenceObjective, po pmc := &metrics.FakePodMetricsClient{} pmf := metrics.NewPodMetricsFactory(pmc, time.Second) - ds := datastore.NewDatastore(ctx, pmf, 0) + endPointsPool := datalayer.NewEndPointsPool(false, common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: namespace, Name: poolName}, + GroupKind: schema.GroupKind{Group: "inference.networking.k8s.io", Kind: "InferencePool"}, + }) + ds := datastore.NewDatastore(ctx, pmf, 0, endPointsPool) initObjs := []client.Object{} for _, objective := range objectives { @@ -72,7 +81,7 @@ func PrepareForTestStreamingServer(objectives []*v1alpha2.InferenceObjective, po Build() pool := testutil.MakeInferencePool(poolName).Namespace(namespace).ObjRef() pool.Spec.TargetPorts = []v1.Port{{Number: v1.PortNumber(poolPort)}} - _ = ds.EndPointsSet(context.Background(), fakeClient, pool) + _ = ds.PoolSet(context.Background(), fakeClient, pooltuil.InferencePoolToEndPointsPool(pool)) return ctx, cancel, ds, pmc } From 42cb2189eff6f9503cf4cca321a841df1b4958e8 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 7 Nov 2025 10:44:02 -0800 Subject: [PATCH 06/10] fixed build --- cmd/epp/runner/health.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/epp/runner/health.go b/cmd/epp/runner/health.go index c80cc4d20..1edbcff8e 100644 --- a/cmd/epp/runner/health.go +++ b/cmd/epp/runner/health.go @@ -44,7 +44,7 @@ const ( ) func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) { - isLive := s.datastore.EndPointsPoolHasSynced() + isLive := s.datastore.PoolHasSynced() // If leader election is disabled, use current logic: all checks are based on whether the pool has synced. if !s.leaderElectionEnabled { From 943c53debe29a3489b6cc886ebc270f2e96fcda9 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Fri, 7 Nov 2025 10:46:54 -0800 Subject: [PATCH 07/10] fixed build --- cmd/epp/runner/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 8df79a891..e38bca971 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -199,7 +199,7 @@ func (r *Runner) Run(ctx context.Context) error { return err } //Setup EndPointsPool - endPointsPool := datalayer.NewEndPointsPool() + endPointsPool := datalayer.NewEndPointsPool(false, common.GKNN{}) if *poolName != "" { // Determine pool namespace: if --pool-namespace is non-empty, use it; else NAMESPACE env var; else default resolvePoolNamespace := func() string { From 1e388217ff27d5486a7a3e4009e29a99fbff7532 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Mon, 10 Nov 2025 12:54:00 -0800 Subject: [PATCH 08/10] fixed build failure --- test/integration/epp/hermetic_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 6c0cd39a2..020beb59b 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -56,6 +56,7 @@ import ( crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "sigs.k8s.io/yaml" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" @@ -1170,11 +1171,14 @@ func BeforeSuite() func() { serverRunner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond) // Adjust from defaults - serverRunner.PoolGKNN = common.GKNN{ + poolGKNN := common.GKNN{ NamespacedName: types.NamespacedName{Namespace: testNamespace, Name: testPoolName}, GroupKind: schema.GroupKind{Group: v1.GroupVersion.Group, Kind: "InferencePool"}, } - serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf, 0) + endPointsPool := datalayer.NewEndPointsPool(false, poolGKNN) + serverRunner.EndPointsPool = datalayer.NewEndPointsPool(false, poolGKNN) + + serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf, 0, endPointsPool) kvCacheUtilizationScorer := scorer.NewKVCacheUtilizationScorer() queueingScorer := scorer.NewQueueScorer() @@ -1238,7 +1242,7 @@ func BeforeSuite() func() { assert.Eventually(nil, func() bool { modelExist := serverRunner.Datastore.ObjectiveGet(modelMyModel) - synced := serverRunner.Datastore.EndPointsPoolHasSynced() && modelExist != nil + synced := serverRunner.Datastore.PoolHasSynced() && modelExist != nil return synced }, 10*time.Second, 10*time.Millisecond) From 30fd66743409760e3aa9bb3ac786a6d9d171f22f Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Mon, 10 Nov 2025 13:16:57 -0800 Subject: [PATCH 09/10] fixed lint --- cmd/epp/runner/runner.go | 6 +++--- .../inferenceobjective_reconciler_test.go | 3 ++- pkg/epp/controller/inferencepool_reconciler.go | 2 +- .../controller/inferencepool_reconciler_test.go | 10 +++++----- pkg/epp/controller/pod_reconciler_test.go | 3 ++- pkg/epp/datastore/datastore.go | 14 -------------- pkg/epp/requestcontrol/director.go | 3 ++- pkg/epp/requestcontrol/director_test.go | 5 ----- pkg/epp/server/controller_manager.go | 5 +++-- pkg/epp/server/runserver.go | 3 ++- pkg/epp/util/pool/pool.go | 2 +- 11 files changed, 21 insertions(+), 35 deletions(-) diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index e38bca971..88a3fed8a 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -198,7 +198,7 @@ func (r *Runner) Run(ctx context.Context) error { setupLog.Error(err, "Failed to get Kubernetes rest config") return err } - //Setup EndPointsPool + // Setup EndPointsPool endPointsPool := datalayer.NewEndPointsPool(false, common.GKNN{}) if *poolName != "" { // Determine pool namespace: if --pool-namespace is non-empty, use it; else NAMESPACE env var; else default @@ -585,7 +585,7 @@ func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore. func validateFlags() error { if (*poolName != "" && *selector != "") || (*poolName == "" && *selector == "") { - return fmt.Errorf("either poolName or selector must be set") + return errors.New("either poolName or selector must be set") } if *selector != "" { targetPortsList, err := strToUniqueIntSlice(*targetPorts) @@ -649,7 +649,7 @@ func strToMap(s string) (map[string]string, error) { } kv := strings.Split(trimmedPair, ":") if len(kv) != 2 { - return nil, fmt.Errorf("invalid format, expected key:value paris") + return nil, errors.New("invalid format, expected key:value paris") } m[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1]) } diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 81364af4b..66abc98ac 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -18,10 +18,11 @@ package controller import ( "context" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "testing" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + "github.com/google/go-cmp/cmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index 06942316d..8ff8a86f1 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -77,7 +77,7 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques c.Datastore.Clear() return ctrl.Result{}, nil } - endPointsPool := &datalayer.EndPointsPool{} + var endPointsPool *datalayer.EndPointsPool switch pool := obj.(type) { case *v1.InferencePool: endPointsPool = pooltuil.InferencePoolToEndPointsPool(pool) diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index 9b0808348..6b3a14f3e 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -271,7 +271,7 @@ func TestXInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } endPointsPool1 := pool.AlphaInferencePoolToEndPointsPool(pool1) - if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: endPointsPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { + if diff := xDiffStore(ds, xDiffStoreParams{wantPool: endPointsPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -288,7 +288,7 @@ func TestXInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } newEndPointsPoll1 := pool.AlphaInferencePoolToEndPointsPool(newPool1) - if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: newEndPointsPoll1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + if diff := xDiffStore(ds, xDiffStoreParams{wantPool: newEndPointsPoll1, wantPods: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -304,7 +304,7 @@ func TestXInferencePoolReconciler(t *testing.T) { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } newEndPointsPool1 := pool.AlphaInferencePoolToEndPointsPool(newPool1) - if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: newEndPointsPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + if diff := xDiffStore(ds, xDiffStoreParams{wantPool: newEndPointsPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -318,7 +318,7 @@ func TestXInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := xDiffStore(t, ds, xDiffStoreParams{wantPods: []string{}}); diff != "" { + if diff := xDiffStore(ds, xDiffStoreParams{wantPods: []string{}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } } @@ -329,7 +329,7 @@ type xDiffStoreParams struct { wantObjectives []*v1alpha2.InferenceObjective } -func xDiffStore(t *testing.T, datastore datastore.Datastore, params xDiffStoreParams) string { +func xDiffStore(datastore datastore.Datastore, params xDiffStoreParams) string { gotPool, _ := datastore.PoolGet() if gotPool == nil && params.wantPool == nil { return "" diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index 253a60b34..989b11069 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -18,10 +18,11 @@ package controller import ( "context" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "testing" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index 01b12dd54..6e662e220 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -31,7 +31,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" @@ -87,7 +86,6 @@ type datastore struct { parentCtx context.Context // endPointsAndObjectivesMu is used to synchronize access to pool and the objectives map. endPointsAndObjectivesMu sync.RWMutex - standaloneMode bool endPointsPool *datalayer.EndPointsPool // key: InferenceObjective.Spec.ModelName, value: *InferenceObjective objectives map[string]*v1alpha2.InferenceObjective @@ -314,15 +312,3 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err return nil } - -func selectorFromInferencePoolSelector(selector map[v1.LabelKey]v1.LabelValue) labels.Selector { - return labels.SelectorFromSet(stripLabelKeyAliasFromLabelMap(selector)) -} - -func stripLabelKeyAliasFromLabelMap(labels map[v1.LabelKey]v1.LabelValue) map[string]string { - outMap := make(map[string]string) - for k, v := range labels { - outMap[string(k)] = string(v) - } - return outMap -} diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index 460be0d18..7b9b23816 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -23,10 +23,11 @@ import ( "fmt" "math/rand" "net" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "strings" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index b2c955723..faa092b6f 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -145,11 +145,6 @@ func TestDirector_HandleRequest(t *testing.T) { scheme := runtime.NewScheme() _ = clientgoscheme.AddToScheme(scheme) fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() - targetPorts := make([]int, 0, len(pool.Spec.TargetPorts)) - for _, p := range pool.Spec.TargetPorts { - targetPorts = append(targetPorts, int(p.Number)) - - } if err := ds.PoolSet(ctx, fakeClient, poolutil.InferencePoolToEndPointsPool(pool)); err != nil { t.Fatalf("Error while setting inference pool: %v", err) diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index e3ca2a15c..4b5102f45 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -18,7 +18,6 @@ package server import ( "fmt" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" @@ -31,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" @@ -76,8 +76,9 @@ func defaultManagerOptions(endPointsPool *datalayer.EndPointsPool, metricsServer "metadata.name": endPointsPool.GKNN.Name, })}}, } + default: + return ctrl.Options{}, fmt.Errorf("unknown group: %s", endPointsPool.GKNN.Group) } - } return opt, nil diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index c79054bd5..86ee780be 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -20,9 +20,10 @@ import ( "context" "crypto/tls" "fmt" - "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/go-logr/logr" "google.golang.org/grpc" diff --git a/pkg/epp/util/pool/pool.go b/pkg/epp/util/pool/pool.go index 004627254..c720b60cd 100644 --- a/pkg/epp/util/pool/pool.go +++ b/pkg/epp/util/pool/pool.go @@ -110,7 +110,7 @@ func ToGKNN(ip *v1.InferencePool) common.GKNN { return common.GKNN{ NamespacedName: types.NamespacedName{ Name: ip.Name, - Namespace: ip.ObjectMeta.Namespace, + Namespace: ip.Namespace, }, GroupKind: schema.GroupKind{ Group: ip.GroupVersionKind().Group, From 9e853776b462b55c8f05a99c0d9803dcdb3fa0a9 Mon Sep 17 00:00:00 2001 From: Xiyue Yu Date: Mon, 10 Nov 2025 13:35:21 -0800 Subject: [PATCH 10/10] fix format --- pkg/epp/util/pool/pool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/epp/util/pool/pool.go b/pkg/epp/util/pool/pool.go index c720b60cd..f5126face 100644 --- a/pkg/epp/util/pool/pool.go +++ b/pkg/epp/util/pool/pool.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -13,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package pool import (