diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index cbd3ea024..88a3fed8a 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -26,6 +26,8 @@ import ( "net/http/pprof" "os" "runtime" + "strconv" + "strings" "sync/atomic" "github.com/go-logr/logr" @@ -100,6 +102,8 @@ var ( poolName = flag.String("pool-name", runserver.DefaultPoolName, "Name of the InferencePool this Endpoint Picker is associated with.") poolGroup = flag.String("pool-group", runserver.DefaultPoolGroup, "group of the InferencePool this Endpoint Picker is associated with.") poolNamespace = flag.String("pool-namespace", "", "Namespace of the InferencePool this Endpoint Picker is associated with.") + selector = flag.String("selector", "", "selector to filter pods on. Format: a comma-separated list of labels, e.g., 'app: vllm-llama3-8b-instruct,env=prod'.") + targetPorts = flag.String("target-ports", "", "target ports of model server pods. Format: a comma-separated list of labels, e.g., '3000,3001,3002'") logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity") secureServing = flag.Bool("secure-serving", runserver.DefaultSecureServing, "Enables secure serving. Defaults to true.") healthChecking = flag.Bool("health-checking", runserver.DefaultHealthChecking, "Enables health checking") @@ -194,6 +198,64 @@ func (r *Runner) Run(ctx context.Context) error { setupLog.Error(err, "Failed to get Kubernetes rest config") return err } + // Setup EndPointsPool + endPointsPool := datalayer.NewEndPointsPool(false, common.GKNN{}) + if *poolName != "" { + // Determine pool namespace: if --pool-namespace is non-empty, use it; else NAMESPACE env var; else default + resolvePoolNamespace := func() string { + if *poolNamespace != "" { + return *poolNamespace + } + if nsEnv := os.Getenv("NAMESPACE"); nsEnv != "" { + return nsEnv + } + return runserver.DefaultPoolNamespace + } + resolvedPoolNamespace := resolvePoolNamespace() + poolNamespacedName := types.NamespacedName{ + Name: *poolName, + Namespace: resolvedPoolNamespace, + } + poolGroupKind := schema.GroupKind{ + Group: *poolGroup, + Kind: "InferencePool", + } + poolGKNN := common.GKNN{ + NamespacedName: poolNamespacedName, + GroupKind: poolGroupKind, + } + endPointsPool.GKNN = poolGKNN + } + + if *selector != "" { + endPointsPool.EndPoints.Selector, err = strToMap(*selector) + if err != nil { + setupLog.Error(err, "Failed to parse flag %q with error: %w", "selector", err) + return err + } + endPointsPool.EndPoints.TargetPorts, err = strToUniqueIntSlice(*targetPorts) + if err != nil { + setupLog.Error(err, "Failed to parse flag %q with error: %w", "target-ports", err) + } + endPointsPool.StandaloneMode = true + + // Determine EPP namespace: NAMESPACE env var; else default + eppNsEnv := os.Getenv("EPP_NAMESPACE") + if eppNsEnv == "" { + setupLog.Error(err, "Failed to get environment variable EPP_NAMESPACE") + } + // Determine EPP name: EPP_NAME env var + eppNameEnv := os.Getenv("EPP_NAME") + if eppNameEnv == "" { + setupLog.Error(err, "Failed to get environment variable EPP_NAME") + + } + endPointsPool.GKNN = common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: eppNsEnv, Name: eppNameEnv}, + GroupKind: schema.GroupKind{Kind: "apps", Group: "Deployment"}, + } + + } // --- Setup Datastore --- useDatalayerV2 := env.GetEnvBool(enableExperimentalDatalayerV2, false, setupLog) @@ -201,7 +263,7 @@ func (r *Runner) Run(ctx context.Context) error { if err != nil { return err } - datastore := datastore.NewDatastore(ctx, epf, int32(*modelServerMetricsPort)) + datastore := datastore.NewDatastore(ctx, epf, int32(*modelServerMetricsPort), endPointsPool) // --- Setup Metrics Server --- customCollectors := []prometheus.Collector{collectors.NewInferencePoolMetricsCollector(datastore)} @@ -223,34 +285,10 @@ func (r *Runner) Run(ctx context.Context) error { }(), } - // Determine pool namespace: if --pool-namespace is non-empty, use it; else NAMESPACE env var; else default - resolvePoolNamespace := func() string { - if *poolNamespace != "" { - return *poolNamespace - } - if nsEnv := os.Getenv("NAMESPACE"); nsEnv != "" { - return nsEnv - } - return runserver.DefaultPoolNamespace - } - resolvedPoolNamespace := resolvePoolNamespace() - poolNamespacedName := types.NamespacedName{ - Name: *poolName, - Namespace: resolvedPoolNamespace, - } - poolGroupKind := schema.GroupKind{ - Group: *poolGroup, - Kind: "InferencePool", - } - poolGKNN := common.GKNN{ - NamespacedName: poolNamespacedName, - GroupKind: poolGroupKind, - } - isLeader := &atomic.Bool{} isLeader.Store(false) - mgr, err := runserver.NewDefaultManager(poolGKNN, cfg, metricsServerOptions, *haEnableLeaderElection) + mgr, err := runserver.NewDefaultManager(endPointsPool, cfg, metricsServerOptions, *haEnableLeaderElection) if err != nil { setupLog.Error(err, "Failed to create controller manager") return err @@ -339,8 +377,7 @@ func (r *Runner) Run(ctx context.Context) error { // --- Setup ExtProc Server Runner --- serverRunner := &runserver.ExtProcServerRunner{ GrpcPort: *grpcPort, - PoolNamespacedName: poolNamespacedName, - PoolGKNN: poolGKNN, + EndPointsPool: endPointsPool, Datastore: datastore, SecureServing: *secureServing, HealthChecking: *healthChecking, @@ -547,9 +584,19 @@ func registerHealthServer(mgr manager.Manager, logger logr.Logger, ds datastore. } func validateFlags() error { - if *poolName == "" { - return fmt.Errorf("required %q flag not set", "poolName") + if (*poolName != "" && *selector != "") || (*poolName == "" && *selector == "") { + return errors.New("either poolName or selector must be set") } + if *selector != "" { + targetPortsList, err := strToUniqueIntSlice(*targetPorts) + if err != nil { + return fmt.Errorf("unexpected value for %q flag with error %w", "target-ports", err) + } + if len(targetPortsList) == 0 || len(targetPortsList) > 8 { + return fmt.Errorf("flag %q should have length from 1 to 8", "target-ports") + } + } + if *configText != "" && *configFile != "" { return fmt.Errorf("both the %q and %q flags can not be set at the same time", "configText", "configFile") } @@ -560,6 +607,55 @@ func validateFlags() error { return nil } +func strToUniqueIntSlice(s string) ([]int, error) { + seen := make(map[int]struct{}) + var intList []int + + if s == "" { + return intList, nil + } + + strList := strings.Split(s, ",") + + for _, str := range strList { + trimmedStr := strings.TrimSpace(str) + if trimmedStr == "" { + continue + } + portInt, err := strconv.Atoi(trimmedStr) + if err != nil { + return nil, fmt.Errorf("invalid number: '%s' is not an integer", trimmedStr) + } + + if _, ok := seen[portInt]; !ok { + seen[portInt] = struct{}{} + intList = append(intList, portInt) + } + } + return intList, nil +} + +func strToMap(s string) (map[string]string, error) { + m := make(map[string]string) + if s == "" { + return m, nil + } + + mPairs := strings.Split(s, ",") + for _, pair := range mPairs { + trimmedPair := strings.TrimSpace(pair) + if trimmedPair == "" { + continue + } + kv := strings.Split(trimmedPair, ":") + if len(kv) != 2 { + return nil, errors.New("invalid format, expected key:value paris") + } + m[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1]) + } + return m, nil +} + func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logger) { if mapping.TotalQueuedRequests == nil { logger.Info("Not scraping metric: TotalQueuedRequests") diff --git a/pkg/epp/backend/metrics/logger.go b/pkg/epp/backend/metrics/logger.go index 69fc404e7..9f60ba76a 100644 --- a/pkg/epp/backend/metrics/logger.go +++ b/pkg/epp/backend/metrics/logger.go @@ -97,7 +97,7 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore datalayer.PoolInfo, } podTotalCount := len(podMetrics) - metrics.RecordInferencePoolAvgKVCache(pool.Name, kvCacheTotal/float64(podTotalCount)) - metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(queueTotal/podTotalCount)) - metrics.RecordInferencePoolReadyPods(pool.Name, float64(podTotalCount)) + metrics.RecordInferencePoolAvgKVCache(pool.GKNN.Name, kvCacheTotal/float64(podTotalCount)) + metrics.RecordInferencePoolAvgQueueSize(pool.GKNN.Name, float64(queueTotal/podTotalCount)) + metrics.RecordInferencePoolReadyPods(pool.GKNN.Name, float64(podTotalCount)) } diff --git a/pkg/epp/backend/metrics/pod_metrics_test.go b/pkg/epp/backend/metrics/pod_metrics_test.go index b0297cd1e..eff9547ad 100644 --- a/pkg/epp/backend/metrics/pod_metrics_test.go +++ b/pkg/epp/backend/metrics/pod_metrics_test.go @@ -25,7 +25,6 @@ import ( "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/types" - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" ) @@ -86,8 +85,8 @@ func TestMetricsRefresh(t *testing.T) { type fakeDataStore struct{} -func (f *fakeDataStore) PoolGet() (*v1.InferencePool, error) { - return &v1.InferencePool{Spec: v1.InferencePoolSpec{TargetPorts: []v1.Port{{Number: 8000}}}}, nil +func (f *fakeDataStore) PoolGet() (*datalayer.EndPointsPool, error) { + return &datalayer.EndPointsPool{}, nil } func (f *fakeDataStore) PodList(func(PodMetrics) bool) []PodMetrics { diff --git a/pkg/epp/controller/inferenceobjective_reconciler_test.go b/pkg/epp/controller/inferenceobjective_reconciler_test.go index 4ceff5d07..66abc98ac 100644 --- a/pkg/epp/controller/inferenceobjective_reconciler_test.go +++ b/pkg/epp/controller/inferenceobjective_reconciler_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + "github.com/google/go-cmp/cmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -36,16 +38,17 @@ import ( "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" ) var ( - pool = utiltest.MakeInferencePool("test-pool1").Namespace("ns1").ObjRef() + inferencePool = utiltest.MakeInferencePool("test-pool1").Namespace("ns1").ObjRef() infObjective1 = utiltest.MakeInferenceObjective("model1"). - Namespace(pool.Namespace). + Namespace(inferencePool.Namespace). Priority(1). CreationTimestamp(metav1.Unix(1000, 0)). - PoolName(pool.Name). + PoolName(inferencePool.Name). PoolGroup("inference.networking.k8s.io").ObjRef() infObjective1Pool2 = utiltest.MakeInferenceObjective(infObjective1.Name). Namespace(infObjective1.Namespace). @@ -57,24 +60,24 @@ var ( Namespace(infObjective1.Namespace). Priority(2). CreationTimestamp(metav1.Unix(1003, 0)). - PoolName(pool.Name). + PoolName(inferencePool.Name). PoolGroup("inference.networking.k8s.io").ObjRef() infObjective1Deleted = utiltest.MakeInferenceObjective(infObjective1.Name). Namespace(infObjective1.Namespace). CreationTimestamp(metav1.Unix(1004, 0)). DeletionTimestamp(). - PoolName(pool.Name). + PoolName(inferencePool.Name). PoolGroup("inference.networking.k8s.io").ObjRef() infObjective1DiffGroup = utiltest.MakeInferenceObjective(infObjective1.Name). - Namespace(pool.Namespace). + Namespace(inferencePool.Namespace). Priority(1). CreationTimestamp(metav1.Unix(1005, 0)). - PoolName(pool.Name). + PoolName(inferencePool.Name). PoolGroup("inference.networking.x-k8s.io").ObjRef() infObjective2 = utiltest.MakeInferenceObjective("model2"). - Namespace(pool.Namespace). + Namespace(inferencePool.Namespace). CreationTimestamp(metav1.Unix(1000, 0)). - PoolName(pool.Name). + PoolName(inferencePool.Name). PoolGroup("inference.networking.k8s.io").ObjRef() ) @@ -120,7 +123,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) { { name: "Objective not found, no matching existing objective to delete", objectivessInStore: []*v1alpha2.InferenceObjective{infObjective1}, - incomingReq: &types.NamespacedName{Name: "non-existent-objective", Namespace: pool.Namespace}, + incomingReq: &types.NamespacedName{Name: "non-existent-objective", Namespace: inferencePool.Namespace}, wantObjectives: []*v1alpha2.InferenceObjective{infObjective1}, }, { @@ -160,17 +163,18 @@ func TestInferenceObjectiveReconciler(t *testing.T) { WithObjects(initObjs...). Build() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(t.Context(), pmf, 0) + ds := datastore.NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, pool.ToGKNN(inferencePool))) for _, m := range test.objectivessInStore { ds.ObjectiveSet(m) } - _ = ds.PoolSet(context.Background(), fakeClient, pool) + endPointsPool := pool.InferencePoolToEndPointsPool(inferencePool) + _ = ds.PoolSet(context.Background(), fakeClient, endPointsPool) reconciler := &InferenceObjectiveReconciler{ Reader: fakeClient, Datastore: ds, PoolGKNN: common.GKNN{ - NamespacedName: types.NamespacedName{Name: pool.Name, Namespace: pool.Namespace}, - GroupKind: schema.GroupKind{Group: pool.GroupVersionKind().Group, Kind: pool.GroupVersionKind().Kind}, + NamespacedName: types.NamespacedName{Name: inferencePool.Name, Namespace: inferencePool.Namespace}, + GroupKind: schema.GroupKind{Group: inferencePool.GroupVersionKind().Group, Kind: inferencePool.GroupVersionKind().Kind}, }, } if test.incomingReq == nil { @@ -190,8 +194,7 @@ func TestInferenceObjectiveReconciler(t *testing.T) { if len(test.wantObjectives) != len(ds.ObjectiveGetAll()) { t.Errorf("Unexpected; want: %d, got:%d", len(test.wantObjectives), len(ds.ObjectiveGetAll())) } - - if diff := diffStore(ds, diffStoreParams{wantPool: pool, wantObjectives: test.wantObjectives}); diff != "" { + if diff := diffStore(ds, diffStoreParams{wantPool: endPointsPool, wantObjectives: test.wantObjectives}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } diff --git a/pkg/epp/controller/inferencepool_reconciler.go b/pkg/epp/controller/inferencepool_reconciler.go index 3b52de0ae..8ff8a86f1 100644 --- a/pkg/epp/controller/inferencepool_reconciler.go +++ b/pkg/epp/controller/inferencepool_reconciler.go @@ -24,12 +24,14 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/common" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" + pooltuil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" ) // InferencePoolReconciler utilizes the controller runtime to reconcile Instance Gateway resources @@ -75,25 +77,17 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques c.Datastore.Clear() return ctrl.Result{}, nil } - - // 4. Convert the fetched object to the canonical v1.InferencePool. - v1infPool := &v1.InferencePool{} - + var endPointsPool *datalayer.EndPointsPool switch pool := obj.(type) { case *v1.InferencePool: - // If it's already a v1 object, just use it. - v1infPool = pool + endPointsPool = pooltuil.InferencePoolToEndPointsPool(pool) case *v1alpha2.InferencePool: - var err error - err = pool.ConvertTo(v1infPool) - if err != nil { - return ctrl.Result{}, fmt.Errorf("failed to convert XInferencePool to InferencePool - %w", err) - } + endPointsPool = pooltuil.AlphaInferencePoolToEndPointsPool(pool) default: return ctrl.Result{}, fmt.Errorf("unsupported API group: %s", c.PoolGKNN.Group) } - if err := c.Datastore.PoolSet(ctx, c.Reader, v1infPool); err != nil { + if err := c.Datastore.PoolSet(ctx, c.Reader, endPointsPool); err != nil { return ctrl.Result{}, fmt.Errorf("failed to update datastore - %w", err) } diff --git a/pkg/epp/controller/inferencepool_reconciler_test.go b/pkg/epp/controller/inferencepool_reconciler_test.go index a2bce1256..6b3a14f3e 100644 --- a/pkg/epp/controller/inferencepool_reconciler_test.go +++ b/pkg/epp/controller/inferencepool_reconciler_test.go @@ -21,10 +21,12 @@ import ( "testing" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -114,14 +116,15 @@ func TestInferencePoolReconciler(t *testing.T) { ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(ctx, pmf, 0) + ds := datastore.NewDatastore(ctx, pmf, 0, datalayer.NewEndPointsPool(false, gknn)) inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: ds, PoolGKNN: gknn} // Step 1: Inception, only ready pods matching pool1 are added to the store. if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPool: pool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { + endPointsPool1 := pool.InferencePoolToEndPointsPool(pool1) + if diff := diffStore(ds, diffStoreParams{wantPool: endPointsPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -135,11 +138,11 @@ func TestInferencePoolReconciler(t *testing.T) { if err := fakeClient.Update(ctx, newPool1, &client.UpdateOptions{}); err != nil { t.Errorf("Unexpected pool update error: %v", err) } - if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPool: newPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + newEndPointsPool1 := pool.InferencePoolToEndPointsPool(newPool1) + if diff := diffStore(ds, diffStoreParams{wantPool: newEndPointsPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -154,7 +157,8 @@ func TestInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := diffStore(ds, diffStoreParams{wantPool: newPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + newEndPointsPool1 = pool.InferencePoolToEndPointsPool(newPool1) + if diff := diffStore(ds, diffStoreParams{wantPool: newEndPointsPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -174,16 +178,14 @@ func TestInferencePoolReconciler(t *testing.T) { } type diffStoreParams struct { - wantPool *v1.InferencePool + wantPool *datalayer.EndPointsPool wantPods []string wantObjectives []*v1alpha2.InferenceObjective } func diffStore(datastore datastore.Datastore, params diffStoreParams) string { gotPool, _ := datastore.PoolGet() - // controller-runtime fake client may not populate TypeMeta (APIVersion/Kind). - // Ignore it when comparing pools. - if diff := cmp.Diff(params.wantPool, gotPool, cmpopts.IgnoreTypes(metav1.TypeMeta{})); diff != "" { + if diff := cmp.Diff(params.wantPool, gotPool); diff != "" { return "pool:" + diff } @@ -261,14 +263,15 @@ func TestXInferencePoolReconciler(t *testing.T) { ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(ctx, pmf, 0) + ds := datastore.NewDatastore(ctx, pmf, 0, datalayer.NewEndPointsPool(false, gknn)) inferencePoolReconciler := &InferencePoolReconciler{Reader: fakeClient, Datastore: ds, PoolGKNN: gknn} // Step 1: Inception, only ready pods matching pool1 are added to the store. if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: pool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { + endPointsPool1 := pool.AlphaInferencePoolToEndPointsPool(pool1) + if diff := xDiffStore(ds, xDiffStoreParams{wantPool: endPointsPool1, wantPods: []string{"pod1-rank-0", "pod2-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -284,7 +287,8 @@ func TestXInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: newPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + newEndPointsPoll1 := pool.AlphaInferencePoolToEndPointsPool(newPool1) + if diff := xDiffStore(ds, xDiffStoreParams{wantPool: newEndPointsPoll1, wantPods: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -299,7 +303,8 @@ func TestXInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := xDiffStore(t, ds, xDiffStoreParams{wantPool: newPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { + newEndPointsPool1 := pool.AlphaInferencePoolToEndPointsPool(newPool1) + if diff := xDiffStore(ds, xDiffStoreParams{wantPool: newEndPointsPool1, wantPods: []string{"pod5-rank-0"}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } @@ -313,33 +318,24 @@ func TestXInferencePoolReconciler(t *testing.T) { if _, err := inferencePoolReconciler.Reconcile(ctx, req); err != nil { t.Errorf("Unexpected InferencePool reconcile error: %v", err) } - if diff := xDiffStore(t, ds, xDiffStoreParams{wantPods: []string{}}); diff != "" { + if diff := xDiffStore(ds, xDiffStoreParams{wantPods: []string{}}); diff != "" { t.Errorf("Unexpected diff (+got/-want): %s", diff) } } type xDiffStoreParams struct { - wantPool *v1alpha2.InferencePool + wantPool *datalayer.EndPointsPool wantPods []string wantObjectives []*v1alpha2.InferenceObjective } -func xDiffStore(t *testing.T, datastore datastore.Datastore, params xDiffStoreParams) string { +func xDiffStore(datastore datastore.Datastore, params xDiffStoreParams) string { gotPool, _ := datastore.PoolGet() if gotPool == nil && params.wantPool == nil { return "" } - gotXPool := &v1alpha2.InferencePool{} - - err := gotXPool.ConvertFrom(gotPool) - if err != nil { - t.Fatalf("failed to convert InferencePool to XInferencePool: %v", err) - } - - // controller-runtime fake client may not populate TypeMeta (APIVersion/Kind). - // Ignore it when comparing pools. - if diff := cmp.Diff(params.wantPool, gotXPool, cmpopts.IgnoreTypes(metav1.TypeMeta{})); diff != "" { + if diff := cmp.Diff(params.wantPool, gotPool); diff != "" { return "pool:" + diff } diff --git a/pkg/epp/controller/pod_reconciler.go b/pkg/epp/controller/pod_reconciler.go index b3a78ef92..b3a73f313 100644 --- a/pkg/epp/controller/pod_reconciler.go +++ b/pkg/epp/controller/pod_reconciler.go @@ -42,7 +42,7 @@ type PodReconciler struct { func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) if !c.Datastore.PoolHasSynced() { - logger.V(logutil.TRACE).Info("Skipping reconciling Pod because the InferencePool is not available yet") + logger.V(logutil.TRACE).Info("Skipping reconciling Pod because the EndPointsPicker is not available yet") // When the inferencePool is initialized it lists the appropriate pods and populates the datastore, so no need to requeue. return ctrl.Result{}, nil } diff --git a/pkg/epp/controller/pod_reconciler_test.go b/pkg/epp/controller/pod_reconciler_test.go index 28f817310..989b11069 100644 --- a/pkg/epp/controller/pod_reconciler_test.go +++ b/pkg/epp/controller/pod_reconciler_test.go @@ -21,6 +21,8 @@ import ( "testing" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" corev1 "k8s.io/api/core/v1" @@ -35,6 +37,7 @@ import ( v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" utiltest "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing" ) @@ -196,8 +199,8 @@ func TestPodReconciler(t *testing.T) { Build() // Configure the initial state of the datastore. - store := datastore.NewDatastore(t.Context(), pmf, 0) - _ = store.PoolSet(t.Context(), fakeClient, test.pool) + store := datastore.NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, pool.ToGKNN(test.pool))) + _ = store.PoolSet(t.Context(), fakeClient, pool.InferencePoolToEndPointsPool(test.pool)) for _, pod := range test.existingPods { store.PodUpdateOrAddIfNotExist(pod) } diff --git a/pkg/epp/datalayer/endpointsPools.go b/pkg/epp/datalayer/endpointsPools.go new file mode 100644 index 000000000..5e8051ffb --- /dev/null +++ b/pkg/epp/datalayer/endpointsPools.go @@ -0,0 +1,50 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datalayer + +import ( + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" +) + +type EndPointsPool struct { + EndPoints *EndPoints + StandaloneMode bool + GKNN common.GKNN +} + +// NewEndPointsPool creates and returns a new empty instance of EndPointsPool. +func NewEndPointsPool(standAloneMode bool, gknn common.GKNN) *EndPointsPool { + endPoints := NewEndPoints() + return &EndPointsPool{ + GKNN: gknn, + StandaloneMode: standAloneMode, + EndPoints: endPoints, + } +} + +type EndPoints struct { + Selector map[string]string + TargetPorts []int +} + +// NewEndPoints creates and returns a new empty instance of EndPointsPool. +func NewEndPoints() *EndPoints { + return &EndPoints{ + Selector: make(map[string]string), + TargetPorts: []int{}, + } +} diff --git a/pkg/epp/datalayer/factory.go b/pkg/epp/datalayer/factory.go index 989527c6c..4ed4d08ef 100644 --- a/pkg/epp/datalayer/factory.go +++ b/pkg/epp/datalayer/factory.go @@ -23,8 +23,6 @@ import ( "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/log" - - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" ) // PoolInfo represents the DataStore information needed for endpoints. @@ -36,7 +34,7 @@ import ( // - Global metrics logging uses PoolGet solely for error return and PodList to enumerate // all endpoints for metrics summarization. type PoolInfo interface { - PoolGet() (*v1.InferencePool, error) + PoolGet() (*EndPointsPool, error) PodList(func(Endpoint) bool) []Endpoint } diff --git a/pkg/epp/datalayer/metrics/logger.go b/pkg/epp/datalayer/metrics/logger.go index fac757dbe..75cbb8414 100644 --- a/pkg/epp/datalayer/metrics/logger.go +++ b/pkg/epp/datalayer/metrics/logger.go @@ -116,9 +116,9 @@ func refreshPrometheusMetrics(logger logr.Logger, datastore datalayer.PoolInfo, totals := calculateTotals(podMetrics) podCount := len(podMetrics) - metrics.RecordInferencePoolAvgKVCache(pool.Name, totals.kvCache/float64(podCount)) - metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(totals.queueSize/podCount)) - metrics.RecordInferencePoolReadyPods(pool.Name, float64(podCount)) + metrics.RecordInferencePoolAvgKVCache(pool.GKNN.Name, totals.kvCache/float64(podCount)) + metrics.RecordInferencePoolAvgQueueSize(pool.GKNN.Name, float64(totals.queueSize/podCount)) + metrics.RecordInferencePoolReadyPods(pool.GKNN.Name, float64(podCount)) } // totals holds aggregated metric values diff --git a/pkg/epp/datastore/datastore.go b/pkg/epp/datastore/datastore.go index dade69469..6e662e220 100644 --- a/pkg/epp/datastore/datastore.go +++ b/pkg/epp/datastore/datastore.go @@ -31,7 +31,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" @@ -49,8 +48,8 @@ type Datastore interface { // PoolSet sets the given pool in datastore. If the given pool has different label selector than the previous pool // that was stored, the function triggers a resync of the pods to keep the datastore updated. If the given pool // is nil, this call triggers the datastore.Clear() function. - PoolSet(ctx context.Context, reader client.Reader, pool *v1.InferencePool) error - PoolGet() (*v1.InferencePool, error) + PoolSet(ctx context.Context, reader client.Reader, endPointsPool *datalayer.EndPointsPool) error + PoolGet() (*datalayer.EndPointsPool, error) PoolHasSynced() bool PoolLabelsMatch(podLabels map[string]string) bool @@ -69,14 +68,15 @@ type Datastore interface { Clear() } -func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32) Datastore { +func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory, modelServerMetricsPort int32, endPointsPool *datalayer.EndPointsPool) Datastore { store := &datastore{ - parentCtx: parentCtx, - poolAndObjectivesMu: sync.RWMutex{}, - objectives: make(map[string]*v1alpha2.InferenceObjective), - pods: &sync.Map{}, - modelServerMetricsPort: modelServerMetricsPort, - epf: epFactory, + parentCtx: parentCtx, + endPointsAndObjectivesMu: sync.RWMutex{}, + endPointsPool: endPointsPool, + objectives: make(map[string]*v1alpha2.InferenceObjective), + pods: &sync.Map{}, + modelServerMetricsPort: modelServerMetricsPort, + epf: epFactory, } return store } @@ -84,9 +84,9 @@ func NewDatastore(parentCtx context.Context, epFactory datalayer.EndpointFactory type datastore struct { // parentCtx controls the lifecycle of the background metrics goroutines that spawn up by the datastore. parentCtx context.Context - // poolAndObjectivesMu is used to synchronize access to pool and the objectives map. - poolAndObjectivesMu sync.RWMutex - pool *v1.InferencePool + // endPointsAndObjectivesMu is used to synchronize access to pool and the objectives map. + endPointsAndObjectivesMu sync.RWMutex + endPointsPool *datalayer.EndPointsPool // key: InferenceObjective.Spec.ModelName, value: *InferenceObjective objectives map[string]*v1alpha2.InferenceObjective // key: types.NamespacedName, value: backendmetrics.PodMetrics @@ -98,9 +98,9 @@ type datastore struct { } func (ds *datastore) Clear() { - ds.poolAndObjectivesMu.Lock() - defer ds.poolAndObjectivesMu.Unlock() - ds.pool = nil + ds.endPointsAndObjectivesMu.Lock() + defer ds.endPointsAndObjectivesMu.Unlock() + ds.endPointsPool = nil ds.objectives = make(map[string]*v1alpha2.InferenceObjective) // stop all pods go routines before clearing the pods map. ds.pods.Range(func(_, v any) bool { @@ -110,20 +110,20 @@ func (ds *datastore) Clear() { ds.pods.Clear() } -// /// InferencePool APIs /// -func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, pool *v1.InferencePool) error { - if pool == nil { +// /// EndPoints APIs /// +func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, endPointsPool *datalayer.EndPointsPool) error { + if endPointsPool == nil { ds.Clear() return nil } logger := log.FromContext(ctx) - ds.poolAndObjectivesMu.Lock() - defer ds.poolAndObjectivesMu.Unlock() + ds.endPointsAndObjectivesMu.Lock() + defer ds.endPointsAndObjectivesMu.Unlock() - oldPool := ds.pool - ds.pool = pool - if oldPool == nil || !reflect.DeepEqual(pool.Spec.Selector, oldPool.Spec.Selector) { - logger.V(logutil.DEFAULT).Info("Updating inference pool endpoints", "selector", pool.Spec.Selector) + oldEndPointsPool := ds.endPointsPool + ds.endPointsPool = endPointsPool + if oldEndPointsPool == nil || !reflect.DeepEqual(oldEndPointsPool.EndPoints.Selector, endPointsPool.EndPoints.Selector) { + logger.V(logutil.DEFAULT).Info("Updating endpoints", "selector", endPointsPool.EndPoints.Selector) // A full resync is required to address two cases: // 1) At startup, the pod events may get processed before the pool is synced with the datastore, // and hence they will not be added to the store since pool selector is not known yet @@ -138,42 +138,42 @@ func (ds *datastore) PoolSet(ctx context.Context, reader client.Reader, pool *v1 return nil } -func (ds *datastore) PoolGet() (*v1.InferencePool, error) { - ds.poolAndObjectivesMu.RLock() - defer ds.poolAndObjectivesMu.RUnlock() +func (ds *datastore) PoolGet() (*datalayer.EndPointsPool, error) { + ds.endPointsAndObjectivesMu.RLock() + defer ds.endPointsAndObjectivesMu.RUnlock() if !ds.PoolHasSynced() { return nil, errPoolNotSynced } - return ds.pool, nil + return ds.endPointsPool, nil } func (ds *datastore) PoolHasSynced() bool { - ds.poolAndObjectivesMu.RLock() - defer ds.poolAndObjectivesMu.RUnlock() - return ds.pool != nil + ds.endPointsAndObjectivesMu.RLock() + defer ds.endPointsAndObjectivesMu.RUnlock() + return ds.endPointsPool != nil } func (ds *datastore) PoolLabelsMatch(podLabels map[string]string) bool { - ds.poolAndObjectivesMu.RLock() - defer ds.poolAndObjectivesMu.RUnlock() - if ds.pool == nil { + ds.endPointsAndObjectivesMu.RLock() + defer ds.endPointsAndObjectivesMu.RUnlock() + if ds.endPointsPool == nil { return false } - poolSelector := selectorFromInferencePoolSelector(ds.pool.Spec.Selector.MatchLabels) + poolSelector := labels.SelectorFromSet(ds.endPointsPool.EndPoints.Selector) podSet := labels.Set(podLabels) return poolSelector.Matches(podSet) } func (ds *datastore) ObjectiveSet(infObjective *v1alpha2.InferenceObjective) { - ds.poolAndObjectivesMu.Lock() - defer ds.poolAndObjectivesMu.Unlock() + ds.endPointsAndObjectivesMu.Lock() + defer ds.endPointsAndObjectivesMu.Unlock() // Set the objective. ds.objectives[infObjective.Name] = infObjective } func (ds *datastore) ObjectiveGet(objectiveName string) *v1alpha2.InferenceObjective { - ds.poolAndObjectivesMu.RLock() - defer ds.poolAndObjectivesMu.RUnlock() + ds.endPointsAndObjectivesMu.RLock() + defer ds.endPointsAndObjectivesMu.RUnlock() iObj, ok := ds.objectives[objectiveName] if !ok { return nil @@ -182,14 +182,14 @@ func (ds *datastore) ObjectiveGet(objectiveName string) *v1alpha2.InferenceObjec } func (ds *datastore) ObjectiveDelete(namespacedName types.NamespacedName) { - ds.poolAndObjectivesMu.Lock() - defer ds.poolAndObjectivesMu.Unlock() + ds.endPointsAndObjectivesMu.Lock() + defer ds.endPointsAndObjectivesMu.Unlock() delete(ds.objectives, namespacedName.Name) } func (ds *datastore) ObjectiveGetAll() []*v1alpha2.InferenceObjective { - ds.poolAndObjectivesMu.RLock() - defer ds.poolAndObjectivesMu.RUnlock() + ds.endPointsAndObjectivesMu.RLock() + defer ds.endPointsAndObjectivesMu.RUnlock() res := []*v1alpha2.InferenceObjective{} for _, v := range ds.objectives { res = append(res, v) @@ -215,7 +215,7 @@ func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []b } func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { - if ds.pool == nil { + if ds.endPointsPool == nil || ds.endPointsPool.EndPoints == nil { return true } @@ -225,14 +225,14 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { } modelServerMetricsPort := 0 - if len(ds.pool.Spec.TargetPorts) == 1 { + if len(ds.endPointsPool.EndPoints.TargetPorts) == 1 { modelServerMetricsPort = int(ds.modelServerMetricsPort) } pods := []*datalayer.PodInfo{} - for idx, port := range ds.pool.Spec.TargetPorts { + for idx, port := range ds.endPointsPool.EndPoints.TargetPorts { metricsPort := modelServerMetricsPort if metricsPort == 0 { - metricsPort = int(port.Number) + metricsPort = port } pods = append(pods, &datalayer.PodInfo{ @@ -242,7 +242,7 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool { }, PodName: pod.Name, Address: pod.Status.PodIP, - Port: strconv.Itoa(int(port.Number)), + Port: strconv.Itoa(port), MetricsHost: net.JoinHostPort(pod.Status.PodIP, strconv.Itoa(metricsPort)), Labels: labels, }) @@ -280,8 +280,8 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err logger := log.FromContext(ctx) podList := &corev1.PodList{} if err := reader.List(ctx, podList, &client.ListOptions{ - LabelSelector: selectorFromInferencePoolSelector(ds.pool.Spec.Selector.MatchLabels), - Namespace: ds.pool.Namespace, + LabelSelector: labels.SelectorFromSet(ds.endPointsPool.EndPoints.Selector), + Namespace: ds.endPointsPool.GKNN.Namespace, }); err != nil { return fmt.Errorf("failed to list pods - %w", err) } @@ -312,15 +312,3 @@ func (ds *datastore) podResyncAll(ctx context.Context, reader client.Reader) err return nil } - -func selectorFromInferencePoolSelector(selector map[v1.LabelKey]v1.LabelValue) labels.Selector { - return labels.SelectorFromSet(stripLabelKeyAliasFromLabelMap(selector)) -} - -func stripLabelKeyAliasFromLabelMap(labels map[v1.LabelKey]v1.LabelValue) map[string]string { - outMap := make(map[string]string) - for k, v := range labels { - outMap[string(k)] = string(v) - } - return outMap -} diff --git a/pkg/epp/datastore/datastore_test.go b/pkg/epp/datastore/datastore_test.go index ee59071e6..df1907398 100644 --- a/pkg/epp/datastore/datastore_test.go +++ b/pkg/epp/datastore/datastore_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client/fake" + pooltuil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" @@ -86,13 +87,15 @@ func TestPool(t *testing.T) { WithScheme(scheme). Build() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := NewDatastore(context.Background(), pmf, 0) - _ = ds.PoolSet(context.Background(), fakeClient, tt.inferencePool) + gknn := pooltuil.ToGKNN(tt.inferencePool) + endPointPool := datalayer.NewEndPointsPool(false, gknn) + ds := NewDatastore(context.Background(), pmf, 0, endPointPool) + _ = ds.PoolSet(context.Background(), fakeClient, pooltuil.InferencePoolToEndPointsPool(tt.inferencePool)) gotPool, gotErr := ds.PoolGet() if diff := cmp.Diff(tt.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { t.Errorf("Unexpected error diff (+got/-want): %s", diff) } - if diff := cmp.Diff(tt.wantPool, gotPool); diff != "" { + if diff := cmp.Diff(pooltuil.InferencePoolToEndPointsPool(tt.wantPool), gotPool); diff != "" { t.Errorf("Unexpected pool diff (+got/-want): %s", diff) } gotSynced := ds.PoolHasSynced() @@ -120,6 +123,10 @@ func TestObjective(t *testing.T) { Priority(2).ObjRef() // Same object name as model2ts, different model name. model2chat := testutil.MakeInferenceObjective(model2ts.Name).ObjRef() + pool1Selector := map[string]string{"app": "vllm_v1"} + pool1 := testutil.MakeInferencePool("pool1"). + Namespace("default"). + Selector(pool1Selector).ObjRef() tests := []struct { name string @@ -193,7 +200,7 @@ func TestObjective(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := NewDatastore(t.Context(), pmf, 0) + ds := NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, pooltuil.ToGKNN(pool1))) for _, m := range test.existingModels { ds.ObjectiveSet(m) } @@ -327,8 +334,9 @@ func TestMetrics(t *testing.T) { WithScheme(scheme). Build() pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond) - ds := NewDatastore(ctx, pmf, 0) - _ = ds.PoolSet(ctx, fakeClient, inferencePool) + gknn := pooltuil.ToGKNN(inferencePool) + ds := NewDatastore(ctx, pmf, 0, datalayer.NewEndPointsPool(false, gknn)) + _ = ds.PoolSet(ctx, fakeClient, pooltuil.InferencePoolToEndPointsPool(inferencePool)) for _, pod := range test.storePods { ds.PodUpdateOrAddIfNotExist(pod) } @@ -395,9 +403,9 @@ func TestPods(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := NewDatastore(t.Context(), pmf, 0) + ds := NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, pooltuil.ToGKNN(inferencePool))) fakeClient := fake.NewFakeClient() - if err := ds.PoolSet(ctx, fakeClient, inferencePool); err != nil { + if err := ds.PoolSet(ctx, fakeClient, pooltuil.InferencePoolToEndPointsPool(inferencePool)); err != nil { t.Error(err) } for _, pod := range test.existingPods { @@ -579,9 +587,9 @@ func TestPodInfo(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := context.Background() pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := NewDatastore(t.Context(), pmf, 0) + ds := NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, pooltuil.ToGKNN(test.pool))) fakeClient := fake.NewFakeClient() - if err := ds.PoolSet(ctx, fakeClient, test.pool); err != nil { + if err := ds.PoolSet(ctx, fakeClient, pooltuil.InferencePoolToEndPointsPool(test.pool)); err != nil { t.Error(err) } for _, pod := range test.existingPods { diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 0d5305574..861d2a040 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -30,8 +30,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics" schedulingtypes "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" @@ -61,7 +61,7 @@ type Director interface { } type Datastore interface { - PoolGet() (*v1.InferencePool, error) + PoolGet() (*datalayer.EndPointsPool, error) } // Server implements the Envoy external processing server. diff --git a/pkg/epp/metrics/collectors/inference_pool.go b/pkg/epp/metrics/collectors/inference_pool.go index ec3def164..1bb6e206e 100644 --- a/pkg/epp/metrics/collectors/inference_pool.go +++ b/pkg/epp/metrics/collectors/inference_pool.go @@ -73,7 +73,7 @@ func (c *inferencePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) { descInferencePoolPerPodQueueSize, prometheus.GaugeValue, float64(pod.GetMetrics().WaitingQueueSize), - pool.Name, + pool.GKNN.Name, pod.GetPod().NamespacedName.Name, ) } diff --git a/pkg/epp/metrics/collectors/inference_pool_test.go b/pkg/epp/metrics/collectors/inference_pool_test.go index af2923e50..267066185 100644 --- a/pkg/epp/metrics/collectors/inference_pool_test.go +++ b/pkg/epp/metrics/collectors/inference_pool_test.go @@ -28,6 +28,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/component-base/metrics/testutil" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + poolutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" @@ -50,7 +53,7 @@ var ( func TestNoMetricsCollected(t *testing.T) { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(context.Background(), pmf, 0) + ds := datastore.NewDatastore(context.Background(), pmf, 0, datalayer.NewEndPointsPool(false, common.GKNN{})) collector := &inferencePoolMetricsCollector{ ds: ds, @@ -68,13 +71,6 @@ func TestMetricsCollected(t *testing.T) { }, } pmf := backendmetrics.NewPodMetricsFactory(pmc, time.Millisecond) - ds := datastore.NewDatastore(context.Background(), pmf, 0) - - scheme := runtime.NewScheme() - fakeClient := fake.NewClientBuilder(). - WithScheme(scheme). - Build() - inferencePool := &v1.InferencePool{ ObjectMeta: metav1.ObjectMeta{ Name: "test-pool", @@ -83,7 +79,14 @@ func TestMetricsCollected(t *testing.T) { TargetPorts: []v1.Port{{Number: v1.PortNumber(int32(8000))}}, }, } - _ = ds.PoolSet(context.Background(), fakeClient, inferencePool) + ds := datastore.NewDatastore(context.Background(), pmf, 0, datalayer.NewEndPointsPool(false, poolutil.ToGKNN(inferencePool))) + + scheme := runtime.NewScheme() + fakeClient := fake.NewClientBuilder(). + WithScheme(scheme). + Build() + + _ = ds.PoolSet(context.Background(), fakeClient, poolutil.InferencePoolToEndPointsPool(inferencePool)) _ = ds.PodUpdateOrAddIfNotExist(pod1) time.Sleep(1 * time.Second) diff --git a/pkg/epp/requestcontrol/director.go b/pkg/epp/requestcontrol/director.go index f6f7deebe..7b9b23816 100644 --- a/pkg/epp/requestcontrol/director.go +++ b/pkg/epp/requestcontrol/director.go @@ -26,9 +26,10 @@ import ( "strings" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + "sigs.k8s.io/controller-runtime/pkg/log" - v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" @@ -43,7 +44,7 @@ import ( // Datastore defines the interface required by the Director. type Datastore interface { - PoolGet() (*v1.InferencePool, error) + PoolGet() (*datalayer.EndPointsPool, error) ObjectiveGet(modelName string) *v1alpha2.InferenceObjective PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics } diff --git a/pkg/epp/requestcontrol/director_test.go b/pkg/epp/requestcontrol/director_test.go index 8cb9c91a5..faa092b6f 100644 --- a/pkg/epp/requestcontrol/director_test.go +++ b/pkg/epp/requestcontrol/director_test.go @@ -20,6 +20,12 @@ import ( "context" "errors" "fmt" + + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + poolutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" + "testing" "time" @@ -76,7 +82,7 @@ type mockDatastore struct { pods []backendmetrics.PodMetrics } -func (ds *mockDatastore) PoolGet() (*v1.InferencePool, error) { +func (ds *mockDatastore) PoolGet() (*datalayer.EndPointsPool, error) { return nil, nil } func (ds *mockDatastore) ObjectiveGet(_ string) *v1alpha2.InferenceObjective { @@ -117,14 +123,6 @@ func TestDirector_HandleRequest(t *testing.T) { CreationTimestamp(metav1.Unix(1000, 0)). Priority(1). ObjRef() - - // Datastore setup - pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) - ds := datastore.NewDatastore(t.Context(), pmf, 0) - ds.ObjectiveSet(ioFoodReview) - ds.ObjectiveSet(ioFoodReviewResolve) - ds.ObjectiveSet(ioFoodReviewSheddable) - pool := &v1.InferencePool{ ObjectMeta: metav1.ObjectMeta{Name: "test-pool", Namespace: "default"}, Spec: v1.InferencePoolSpec{ @@ -137,10 +135,18 @@ func TestDirector_HandleRequest(t *testing.T) { }, } + // Datastore setup + pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second) + ds := datastore.NewDatastore(t.Context(), pmf, 0, datalayer.NewEndPointsPool(false, poolutil.ToGKNN(pool))) + ds.ObjectiveSet(ioFoodReview) + ds.ObjectiveSet(ioFoodReviewResolve) + ds.ObjectiveSet(ioFoodReviewSheddable) + scheme := runtime.NewScheme() _ = clientgoscheme.AddToScheme(scheme) fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() - if err := ds.PoolSet(ctx, fakeClient, pool); err != nil { + + if err := ds.PoolSet(ctx, fakeClient, poolutil.InferencePoolToEndPointsPool(pool)); err != nil { t.Fatalf("Error while setting inference pool: %v", err) } @@ -594,8 +600,31 @@ func TestGetRandomPod(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Millisecond) - ds := datastore.NewDatastore(t.Context(), pmf, 0) - err := ds.PoolSet(t.Context(), fakeClient, pool) + targetPorts := make([]int, 0, len(pool.Spec.TargetPorts)) + for _, p := range pool.Spec.TargetPorts { + targetPorts = append(targetPorts, int(p.Number)) + + } + selector := make(map[string]string, len(pool.Spec.Selector.MatchLabels)) + for k, v := range pool.Spec.Selector.MatchLabels { + selector[string(k)] = string(v) + } + gknn := common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: pool.Namespace, Name: pool.Name}, + GroupKind: schema.GroupKind{Group: pool.GroupVersionKind().Group, Kind: pool.GroupVersionKind().Kind}, + } + endPoints := &datalayer.EndPoints{ + Selector: selector, + TargetPorts: targetPorts, + } + endPointsPool := &datalayer.EndPointsPool{ + EndPoints: endPoints, + StandaloneMode: false, + GKNN: gknn, + } + + ds := datastore.NewDatastore(t.Context(), pmf, 0, endPointsPool) + err := ds.PoolSet(t.Context(), fakeClient, endPointsPool) if err != nil { t.Errorf("unexpected error setting pool: %s", err) } @@ -619,7 +648,7 @@ func TestDirector_HandleResponseReceived(t *testing.T) { pr1 := newTestResponseReceived("pr1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) - ds := datastore.NewDatastore(t.Context(), nil, 0) + ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool(false, common.GKNN{})) mockSched := &mockScheduler{} director := NewDirectorWithConfig(ds, mockSched, &mockAdmissionController{}, NewConfig().WithResponseReceivedPlugins(pr1)) @@ -656,7 +685,7 @@ func TestDirector_HandleResponseStreaming(t *testing.T) { ps1 := newTestResponseStreaming("ps1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) - ds := datastore.NewDatastore(t.Context(), nil, 0) + ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool(false, common.GKNN{})) mockSched := &mockScheduler{} director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseStreamingPlugins(ps1)) @@ -692,7 +721,7 @@ func TestDirector_HandleResponseComplete(t *testing.T) { pc1 := newTestResponseComplete("pc1") ctx := logutil.NewTestLoggerIntoContext(context.Background()) - ds := datastore.NewDatastore(t.Context(), nil, 0) + ds := datastore.NewDatastore(t.Context(), nil, 0, datalayer.NewEndPointsPool(false, common.GKNN{})) mockSched := &mockScheduler{} director := NewDirectorWithConfig(ds, mockSched, nil, NewConfig().WithResponseCompletePlugins(pc1)) diff --git a/pkg/epp/server/controller_manager.go b/pkg/epp/server/controller_manager.go index 47e4f12d4..4b5102f45 100644 --- a/pkg/epp/server/controller_manager.go +++ b/pkg/epp/server/controller_manager.go @@ -30,10 +30,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" ) var scheme = runtime.NewScheme() @@ -45,48 +45,48 @@ func init() { } // defaultManagerOptions returns the default options used to create the manager. -func defaultManagerOptions(gknn common.GKNN, metricsServerOptions metricsserver.Options) (ctrl.Options, error) { +func defaultManagerOptions(endPointsPool *datalayer.EndPointsPool, metricsServerOptions metricsserver.Options) (ctrl.Options, error) { opt := ctrl.Options{ Scheme: scheme, Cache: cache.Options{ ByObject: map[client.Object]cache.ByObject{ &corev1.Pod{}: { Namespaces: map[string]cache.Config{ - gknn.Namespace: {}, - }, - }, - &v1alpha2.InferenceObjective{}: { - Namespaces: map[string]cache.Config{ - gknn.Namespace: {}, + endPointsPool.GKNN.Namespace: {}, }, }, }, }, Metrics: metricsServerOptions, } - switch gknn.Group { - case v1alpha2.GroupName: - opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ - Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ - "metadata.name": gknn.Name, - })}}, - } - case v1.GroupName: - opt.Cache.ByObject[&v1.InferencePool{}] = cache.ByObject{ - Namespaces: map[string]cache.Config{gknn.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ - "metadata.name": gknn.Name, - })}}, + if !endPointsPool.StandaloneMode { + opt.Cache.ByObject[&v1alpha2.InferenceObjective{}] = cache.ByObject{Namespaces: map[string]cache.Config{ + endPointsPool.GKNN.Namespace: {}, + }} + switch endPointsPool.GKNN.Group { + case v1alpha2.GroupName: + opt.Cache.ByObject[&v1alpha2.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{endPointsPool.GKNN.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": endPointsPool.GKNN.Name, + })}}, + } + case v1.GroupName: + opt.Cache.ByObject[&v1.InferencePool{}] = cache.ByObject{ + Namespaces: map[string]cache.Config{endPointsPool.GKNN.Namespace: {FieldSelector: fields.SelectorFromSet(fields.Set{ + "metadata.name": endPointsPool.GKNN.Name, + })}}, + } + default: + return ctrl.Options{}, fmt.Errorf("unknown group: %s", endPointsPool.GKNN.Group) } - default: - return ctrl.Options{}, fmt.Errorf("unknown group: %s", gknn.Group) } return opt, nil } // NewDefaultManager creates a new controller manager with default configuration. -func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) { - opt, err := defaultManagerOptions(gknn, metricsServerOptions) +func NewDefaultManager(endPointsPool *datalayer.EndPointsPool, restConfig *rest.Config, metricsServerOptions metricsserver.Options, leaderElectionEnabled bool) (ctrl.Manager, error) { + opt, err := defaultManagerOptions(endPointsPool, metricsServerOptions) if err != nil { return nil, fmt.Errorf("failed to create controller manager options: %v", err) } @@ -95,8 +95,8 @@ func NewDefaultManager(gknn common.GKNN, restConfig *rest.Config, metricsServerO opt.LeaderElection = true opt.LeaderElectionResourceLock = "leases" // The lease name needs to be unique per EPP deployment. - opt.LeaderElectionID = fmt.Sprintf("epp-%s-%s.gateway-api-inference-extension.sigs.k8s.io", gknn.Namespace, gknn.Name) - opt.LeaderElectionNamespace = gknn.Namespace + opt.LeaderElectionID = fmt.Sprintf("epp-%s-%s.gateway-api-inference-extension.sigs.k8s.io", endPointsPool.GKNN.Namespace, endPointsPool.GKNN.Name) + opt.LeaderElectionNamespace = endPointsPool.GKNN.Namespace opt.LeaderElectionReleaseOnCancel = true } diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index c3037175e..86ee780be 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -22,20 +22,19 @@ import ( "fmt" "time" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "github.com/go-logr/logr" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/health" healthgrpc "google.golang.org/grpc/health/grpc_health_v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/gateway-api-inference-extension/internal/runnable" tlsutil "sigs.k8s.io/gateway-api-inference-extension/internal/tls" - "sigs.k8s.io/gateway-api-inference-extension/pkg/common" backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/controller" dlmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/metrics" @@ -48,8 +47,7 @@ import ( // ExtProcServerRunner provides methods to manage an external process server. type ExtProcServerRunner struct { GrpcPort int - PoolNamespacedName types.NamespacedName - PoolGKNN common.GKNN + EndPointsPool *datalayer.EndPointsPool Datastore datastore.Datastore SecureServing bool HealthChecking bool @@ -91,17 +89,8 @@ const ( // NewDefaultExtProcServerRunner creates a runner with default values. // Note: Dependencies like Datastore, Scheduler, SD need to be set separately. func NewDefaultExtProcServerRunner() *ExtProcServerRunner { - poolGKNN := common.GKNN{ - NamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, - GroupKind: schema.GroupKind{ - Group: DefaultPoolGroup, - Kind: "InferencePool", - }, - } return &ExtProcServerRunner{ GrpcPort: DefaultGrpcPort, - PoolNamespacedName: types.NamespacedName{Name: DefaultPoolName, Namespace: DefaultPoolNamespace}, - PoolGKNN: poolGKNN, SecureServing: DefaultSecureServing, HealthChecking: DefaultHealthChecking, RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, @@ -113,20 +102,22 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner { // SetupWithManager sets up the runner with the given manager. func (r *ExtProcServerRunner) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { // Create the controllers and register them with the manager - if err := (&controller.InferencePoolReconciler{ - Datastore: r.Datastore, - Reader: mgr.GetClient(), - PoolGKNN: r.PoolGKNN, - }).SetupWithManager(mgr); err != nil { - return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) - } + if !r.EndPointsPool.StandaloneMode { + if err := (&controller.InferencePoolReconciler{ + Datastore: r.Datastore, + Reader: mgr.GetClient(), + PoolGKNN: r.EndPointsPool.GKNN, + }).SetupWithManager(mgr); err != nil { + return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) + } - if err := (&controller.InferenceObjectiveReconciler{ - Datastore: r.Datastore, - Reader: mgr.GetClient(), - PoolGKNN: r.PoolGKNN, - }).SetupWithManager(ctx, mgr); err != nil { - return fmt.Errorf("failed setting up InferenceObjectiveReconciler: %w", err) + if err := (&controller.InferenceObjectiveReconciler{ + Datastore: r.Datastore, + Reader: mgr.GetClient(), + PoolGKNN: r.EndPointsPool.GKNN, + }).SetupWithManager(ctx, mgr); err != nil { + return fmt.Errorf("failed setting up InferenceObjectiveReconciler: %w", err) + } } if err := (&controller.PodReconciler{ diff --git a/pkg/epp/util/pool/pool.go b/pkg/epp/util/pool/pool.go new file mode 100644 index 000000000..f5126face --- /dev/null +++ b/pkg/epp/util/pool/pool.go @@ -0,0 +1,121 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pool + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" + v1alpha2 "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" +) + +func InferencePoolToEndPointsPool(inferencePool *v1.InferencePool) *datalayer.EndPointsPool { + if inferencePool == nil { + return nil + } + targetPorts := make([]int, 0, len(inferencePool.Spec.TargetPorts)) + for _, p := range inferencePool.Spec.TargetPorts { + targetPorts = append(targetPorts, int(p.Number)) + + } + selector := make(map[string]string, len(inferencePool.Spec.Selector.MatchLabels)) + for k, v := range inferencePool.Spec.Selector.MatchLabels { + selector[string(k)] = string(v) + } + gknn := common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: inferencePool.Namespace, Name: inferencePool.Name}, + GroupKind: schema.GroupKind{Group: "inference.networking.k8s.io", Kind: "InferencePool"}, + } + endPoints := &datalayer.EndPoints{ + Selector: selector, + TargetPorts: targetPorts, + } + endPointsPool := &datalayer.EndPointsPool{ + EndPoints: endPoints, + StandaloneMode: false, + GKNN: gknn, + } + return endPointsPool +} + +func AlphaInferencePoolToEndPointsPool(inferencePool *v1alpha2.InferencePool) *datalayer.EndPointsPool { + targetPorts := []int{int(inferencePool.Spec.TargetPortNumber)} + selector := make(map[string]string, len(inferencePool.Spec.Selector)) + for k, v := range inferencePool.Spec.Selector { + selector[string(k)] = string(v) + } + gknn := common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: inferencePool.Namespace, Name: inferencePool.Name}, + GroupKind: schema.GroupKind{Group: "inference.networking.x-k8s.io", Kind: "InferencePool"}, + } + endPoints := &datalayer.EndPoints{ + Selector: selector, + TargetPorts: targetPorts, + } + endPointsPool := &datalayer.EndPointsPool{ + EndPoints: endPoints, + StandaloneMode: false, + GKNN: gknn, + } + return endPointsPool +} + +func EndPointsPoolToInferencePool(endPointsPool *datalayer.EndPointsPool) *v1.InferencePool { + targetPorts := make([]v1.Port, 0, len(endPointsPool.EndPoints.TargetPorts)) + for _, p := range endPointsPool.EndPoints.TargetPorts { + targetPorts = append(targetPorts, v1.Port{Number: v1.PortNumber(p)}) + } + labels := make(map[v1.LabelKey]v1.LabelValue, len(endPointsPool.EndPoints.Selector)) + for k, v := range endPointsPool.EndPoints.Selector { + labels[v1.LabelKey(k)] = v1.LabelValue(v) + } + + inferencePool := &v1.InferencePool{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "inference.networking.k8s.io/v1", + Kind: "InferencePool", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: endPointsPool.GKNN.Name, + Namespace: endPointsPool.GKNN.Namespace, + }, + Spec: v1.InferencePoolSpec{ + Selector: v1.LabelSelector{MatchLabels: labels}, + TargetPorts: targetPorts, + }, + } + return inferencePool +} + +func ToGKNN(ip *v1.InferencePool) common.GKNN { + if ip == nil { + return common.GKNN{} + } + return common.GKNN{ + NamespacedName: types.NamespacedName{ + Name: ip.Name, + Namespace: ip.Namespace, + }, + GroupKind: schema.GroupKind{ + Group: ip.GroupVersionKind().Group, + Kind: ip.GroupVersionKind().Kind, + }, + } +} diff --git a/pkg/epp/util/testing/wrappers.go b/pkg/epp/util/testing/wrappers.go index 7621bff96..2ad02c55d 100644 --- a/pkg/epp/util/testing/wrappers.go +++ b/pkg/epp/util/testing/wrappers.go @@ -19,8 +19,11 @@ package testing import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" ) // PodWrapper wraps a Pod. @@ -219,6 +222,19 @@ func (m *InferencePoolWrapper) ObjRef() *v1.InferencePool { return &m.InferencePool } +func (m *InferencePoolWrapper) ToGKNN() common.GKNN { + return common.GKNN{ + NamespacedName: types.NamespacedName{ + Name: m.Name, + Namespace: m.ObjectMeta.Namespace, + }, + GroupKind: schema.GroupKind{ + Group: "inference.networking.k8s.io", + Kind: "InferencePool", + }, + } +} + // AlphaInferencePoolWrapper wraps an group "inference.networking.x-k8s.io" InferencePool. type AlphaInferencePoolWrapper struct { v1alpha2.InferencePool diff --git a/test/integration/epp/hermetic_test.go b/test/integration/epp/hermetic_test.go index 9ce4fec64..020beb59b 100644 --- a/test/integration/epp/hermetic_test.go +++ b/test/integration/epp/hermetic_test.go @@ -56,6 +56,7 @@ import ( crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" "sigs.k8s.io/yaml" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" @@ -1170,11 +1171,14 @@ func BeforeSuite() func() { serverRunner.TestPodMetricsClient = &backendmetrics.FakePodMetricsClient{} pmf := backendmetrics.NewPodMetricsFactory(serverRunner.TestPodMetricsClient, 10*time.Millisecond) // Adjust from defaults - serverRunner.PoolGKNN = common.GKNN{ + poolGKNN := common.GKNN{ NamespacedName: types.NamespacedName{Namespace: testNamespace, Name: testPoolName}, GroupKind: schema.GroupKind{Group: v1.GroupVersion.Group, Kind: "InferencePool"}, } - serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf, 0) + endPointsPool := datalayer.NewEndPointsPool(false, poolGKNN) + serverRunner.EndPointsPool = datalayer.NewEndPointsPool(false, poolGKNN) + + serverRunner.Datastore = datastore.NewDatastore(context.Background(), pmf, 0, endPointsPool) kvCacheUtilizationScorer := scorer.NewKVCacheUtilizationScorer() queueingScorer := scorer.NewQueueScorer() diff --git a/test/utils/server.go b/test/utils/server.go index 9cf907d29..5afc5ad05 100644 --- a/test/utils/server.go +++ b/test/utils/server.go @@ -29,9 +29,14 @@ import ( "google.golang.org/grpc/test/bufconn" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/gateway-api-inference-extension/pkg/common" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer" + pooltuil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pool" v1 "sigs.k8s.io/gateway-api-inference-extension/api/v1" "sigs.k8s.io/gateway-api-inference-extension/apix/v1alpha2" @@ -50,7 +55,11 @@ func PrepareForTestStreamingServer(objectives []*v1alpha2.InferenceObjective, po pmc := &metrics.FakePodMetricsClient{} pmf := metrics.NewPodMetricsFactory(pmc, time.Second) - ds := datastore.NewDatastore(ctx, pmf, 0) + endPointsPool := datalayer.NewEndPointsPool(false, common.GKNN{ + NamespacedName: types.NamespacedName{Namespace: namespace, Name: poolName}, + GroupKind: schema.GroupKind{Group: "inference.networking.k8s.io", Kind: "InferencePool"}, + }) + ds := datastore.NewDatastore(ctx, pmf, 0, endPointsPool) initObjs := []client.Object{} for _, objective := range objectives { @@ -72,7 +81,7 @@ func PrepareForTestStreamingServer(objectives []*v1alpha2.InferenceObjective, po Build() pool := testutil.MakeInferencePool(poolName).Namespace(namespace).ObjRef() pool.Spec.TargetPorts = []v1.Port{{Number: v1.PortNumber(poolPort)}} - _ = ds.PoolSet(context.Background(), fakeClient, pool) + _ = ds.PoolSet(context.Background(), fakeClient, pooltuil.InferencePoolToEndPointsPool(pool)) return ctx, cancel, ds, pmc }