From 8c758f9d37224c793fde14003f261a776fe7aaa7 Mon Sep 17 00:00:00 2001 From: Daneyon Hansen Date: Fri, 24 Oct 2025 14:35:34 -0700 Subject: [PATCH 1/2] Conformance: Adds Data Parallelism Test Signed-off-by: Daneyon Hansen --- conformance/resources/base.yaml | 213 +++++++++++- .../tests/gateway_following_epp_routing_dp.go | 319 ++++++++++++++++++ .../gateway_following_epp_routing_dp.yaml | 23 ++ .../framework/plugins/test/consts.go | 21 +- .../plugins/test/filter/filter_test.go | 123 +++---- .../filter/request_header_based_filter.go | 75 +++- 6 files changed, 697 insertions(+), 77 deletions(-) create mode 100644 conformance/tests/gateway_following_epp_routing_dp.go create mode 100644 conformance/tests/gateway_following_epp_routing_dp.yaml diff --git a/conformance/resources/base.yaml b/conformance/resources/base.yaml index 2e1b378c3..e74535e74 100644 --- a/conformance/resources/base.yaml +++ b/conformance/resources/base.yaml @@ -200,7 +200,7 @@ spec: terminationGracePeriodSeconds: 130 containers: - name: epp - image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:v1.0.0 + image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:v20251105-cbb8928 imagePullPolicy: Always args: - --pool-name @@ -298,7 +298,7 @@ spec: terminationGracePeriodSeconds: 130 containers: - name: epp - image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:v1.0.0 + image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:v20251105-cbb8928 imagePullPolicy: Always args: - --pool-name @@ -340,6 +340,215 @@ spec: configMap: name: plugins-config --- +# -- Data Parallelism (DP) backend deployment: 3 pods, each listening on three ports to simulate ranks --- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dp-inference-model-server-deployment + namespace: inference-conformance-app-backend + labels: + app: dp-inference-model-server +spec: + replicas: 3 + selector: + matchLabels: + app: dp-inference-model-server + template: + metadata: + labels: + app: dp-inference-model-server + spec: + containers: + - name: echoserver-3000 + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + ports: + - containerPort: 3000 + readinessProbe: + httpGet: + path: / + port: 3000 + initialDelaySeconds: 3 + periodSeconds: 5 + failureThreshold: 2 + env: + - name: HTTP_PORT # Default port for HTTP echo server + value: "3000" + - name: H2C_PORT # Default port for HTC echo server + value: "3001" + - name: INCLUDE_HTTP_PORT_HEADER + value: "true" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: echoserver-3002 + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + ports: + - containerPort: 3002 + readinessProbe: + httpGet: + path: / + port: 3002 + initialDelaySeconds: 3 + periodSeconds: 5 + failureThreshold: 2 + env: + - name: HTTP_PORT + value: "3002" + - name: H2C_PORT + value: "3003" + - name: INCLUDE_HTTP_PORT_HEADER + value: "true" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP + - name: echoserver-3004 + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + ports: + - containerPort: 3004 + readinessProbe: + httpGet: + path: / + port: 3004 + initialDelaySeconds: 3 + periodSeconds: 5 + failureThreshold: 2 + env: + - name: HTTP_PORT + value: "3004" + - name: H2C_PORT + value: "3005" + - name: INCLUDE_HTTP_PORT_HEADER + value: "true" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP +--- +# --- Data Parallelism (DP) InferencePool Definition --- +apiVersion: inference.networking.k8s.io/v1 +kind: InferencePool +metadata: + name: dp-inference-pool + namespace: inference-conformance-app-backend +spec: + selector: + matchLabels: + app: dp-inference-model-server + targetPorts: + - number: 3000 + - number: 3002 + - number: 3004 + endpointPickerRef: + name: dp-endpoint-picker-svc + port: + number: 9002 +--- +# --- Data Parallelism (DP) Conformance EPP service Definition --- +apiVersion: v1 +kind: Service +metadata: + name: dp-endpoint-picker-svc + namespace: inference-conformance-app-backend +spec: + selector: + app: dp-app-backend-epp + ports: + - protocol: TCP + port: 9002 + targetPort: 9002 + appProtocol: http2 + type: ClusterIP +--- +# --- Data Parallelism (DP) Conformance EPP Deployment --- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dp-app-endpoint-picker + namespace: inference-conformance-app-backend + labels: + app: dp-app-backend-epp +spec: + replicas: 1 + selector: + matchLabels: + app: dp-app-backend-epp + template: + metadata: + labels: + app: dp-app-backend-epp + spec: + # Conservatively, this timeout should mirror the longest grace period of the pods within the pool + terminationGracePeriodSeconds: 130 + containers: + - name: epp + image: us-central1-docker.pkg.dev/k8s-staging-images/gateway-api-inference-extension/epp:v20251105-cbb8928 + imagePullPolicy: Always + args: + - --pool-name + - "dp-inference-pool" + - --pool-namespace + - "inference-conformance-app-backend" + - --v + - "4" + - --zap-encoder + - "json" + - --grpc-port + - "9002" + - --grpc-health-port + - "9003" + - "--config-file" + - "/config/conformance-plugins.yaml" + ports: + - containerPort: 9002 + - containerPort: 9003 + - name: metrics + containerPort: 9090 + livenessProbe: + grpc: + port: 9003 + service: inference-extension + initialDelaySeconds: 5 + periodSeconds: 10 + readinessProbe: + grpc: + port: 9003 + service: inference-extension + initialDelaySeconds: 5 + periodSeconds: 10 + volumeMounts: + - name: plugins-config-volume + mountPath: "/config" + volumes: + - name: plugins-config-volume + configMap: + name: plugins-config +--- apiVersion: v1 kind: ConfigMap metadata: diff --git a/conformance/tests/gateway_following_epp_routing_dp.go b/conformance/tests/gateway_following_epp_routing_dp.go new file mode 100644 index 000000000..d608beb21 --- /dev/null +++ b/conformance/tests/gateway_following_epp_routing_dp.go @@ -0,0 +1,319 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tests + +import ( + "errors" + "fmt" + "net/http" + "slices" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + gwhttp "sigs.k8s.io/gateway-api/conformance/utils/http" + "sigs.k8s.io/gateway-api/conformance/utils/suite" + "sigs.k8s.io/gateway-api/pkg/features" + + "sigs.k8s.io/gateway-api-inference-extension/conformance/resources" + k8sutils "sigs.k8s.io/gateway-api-inference-extension/conformance/utils/kubernetes" + "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/test" +) + +// dpPorts are the data parallel ports exposed by the backend deployment. Update if the ports +// change in conformance/resources/base.yaml. +var dpPorts = portSet{"3000": {}, "3002": {}, "3004": {}} + +func init() { + ConformanceTests = append(ConformanceTests, GatewayFollowingEPPRoutingWithDataParallelism) +} + +// GatewayFollowingEPPRoutingWithDataParallelism verifies that with multiple targetPorts (ranks) +// the gateway still routes only to pods returned by EPP. +var GatewayFollowingEPPRoutingWithDataParallelism = suite.ConformanceTest{ + ShortName: "GatewayFollowingEPPRoutingWithDataParallelism", + Description: "Inference gateway should restrict traffic to EPP-selected pods while EPP balances across multiple targetPorts (DP ranks)", + Manifests: []string{"tests/gateway_following_epp_routing_dp.yaml"}, + Features: []features.FeatureName{ + features.FeatureName("SupportInferencePool"), + features.SupportGateway, + }, + Test: func(t *testing.T, s *suite.ConformanceTestSuite) { + const ( + hostname = "primary.example.com" + path = "/primary-gateway-dp-test" + appPodBackendPrefix = "dp-inference-model-server" // Must match the app label in the backend pods + ) + + httpRouteNN := types.NamespacedName{Name: "httproute-for-primary-gw-dp", Namespace: resources.AppBackendNamespace} + gatewayNN := resources.PrimaryGatewayNN + poolNN := types.NamespacedName{Name: "dp-inference-pool", Namespace: resources.AppBackendNamespace} + backendPodLabels := map[string]string{"app": "dp-inference-model-server"} + + t.Log("Verifying HTTPRoute and InferencePool are accepted and the Gateway has an address.") + k8sutils.HTTPRouteMustBeAcceptedAndResolved(t, s.Client, s.TimeoutConfig, httpRouteNN, gatewayNN) + k8sutils.InferencePoolMustBeAcceptedByParent(t, s.Client, poolNN, gatewayNN) + gwAddr := k8sutils.GetGatewayEndpoint(t, s.Client, s.TimeoutConfig, gatewayNN) + + t.Logf("Fetching backend pods with labels: %v", backendPodLabels) + pods, err := k8sutils.GetPodsWithLabel(t, s.Client, resources.AppBackendNamespace, backendPodLabels, s.TimeoutConfig) + require.NoError(t, err, "Failed to get backend pods") + require.Len(t, pods, 3, "Expected to find 3 backend pods, found %d", len(pods)) + + backends := toEndpoints(pods) + require.Len(t, backends, 3) + + podNameToIP := make(map[string]string, len(backends)) + for _, be := range backends { + podNameToIP[be.Name] = be.IP + } + + requestBody := `{ + "model": "conformance-fake-model", + "prompt": "Write as if you were a critic: San Francisco" + }` + + // Single-pod pin to ensure header filter works before main test cases. + gwhttp.MakeRequestAndExpectEventuallyConsistentResponse( + t, + s.RoundTripper, + s.TimeoutConfig, + gwAddr, + gwhttp.ExpectedResponse{ + Request: gwhttp.Request{ + Host: hostname, + Path: path, + Method: http.MethodPost, + Body: requestBody, + Headers: map[string]string{ + test.HeaderTestEppEndPointSelectionKey: backends[0].IP, + }, + }, + Response: gwhttp.Response{StatusCodes: []int{http.StatusOK}}, + Backend: backends[0].Name, + Namespace: resources.AppBackendNamespace, + }, + ) + + testCases := []struct { + name string + ipToAllowedPorts map[string]portSet + expectAllRequestsRoutedWithinPodNames []string + }{ + { + name: "DP routes only to one designated pod (any rank)", + ipToAllowedPorts: map[string]portSet{ + backends[1].IP: {}, // any of the DP ports for this IP + }, + expectAllRequestsRoutedWithinPodNames: []string{backends[1].Name}, + }, + { + name: "DP routes only to two designated pods; one has a fixed rank", + ipToAllowedPorts: map[string]portSet{ + backends[0].IP: {}, // any port + backends[2].IP: {"3002": {}}, // must be port 3002 for this IP + }, + expectAllRequestsRoutedWithinPodNames: []string{backends[0].Name, backends[2].Name}, + }, + { + name: "DP routes to all pods; one pod restricted to 3000,3004", + ipToAllowedPorts: map[string]portSet{ + backends[0].IP: {}, + backends[1].IP: {"3000": {}, "3004": {}}, + backends[2].IP: {}, + }, + expectAllRequestsRoutedWithinPodNames: []string{backends[0].Name, backends[1].Name, backends[2].Name}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Build the EPP header from the endpoint map (stable order). + eppHeaderValue := buildEPPHeader(tc.ipToAllowedPorts) + + headers := map[string]string{ + test.HeaderTestEppEndPointSelectionKey: eppHeaderValue, + } + + assertTrafficOnlyReachesToExpectedPodsDP( + t, s, gwAddr, + gwhttp.ExpectedResponse{ + Request: gwhttp.Request{ + Host: hostname, + Path: path, + Method: http.MethodPost, + Body: requestBody, + Headers: headers, + }, + Response: gwhttp.Response{StatusCode: http.StatusOK}, + Backend: appPodBackendPrefix, + Namespace: resources.AppBackendNamespace, + }, + tc.expectAllRequestsRoutedWithinPodNames, + podNameToIP, + tc.ipToAllowedPorts, + ) + }) + } + }, +} + +func assertTrafficOnlyReachesToExpectedPodsDP( + t *testing.T, + suite *suite.ConformanceTestSuite, + gwAddr string, + expected gwhttp.ExpectedResponse, + expectedPodNames []string, + podNameToIP map[string]string, + ipToAllowedPorts map[string]portSet, +) { + t.Helper() + const ( + concurrency = 10 + totalRequests = 100 + ) + + var ( + rt = suite.RoundTripper + g errgroup.Group + r = gwhttp.MakeRequest(t, &expected, gwAddr, "HTTP", "http") + ) + g.SetLimit(concurrency) + + for i := 0; i < totalRequests; i++ { + g.Go(func() error { + cReq, cRes, err := rt.CaptureRoundTrip(r) + if err != nil { + return fmt.Errorf("failed roundtrip: %w", err) + } + // Baseline response checks (status/namespace/backend, etc.) + if err := gwhttp.CompareRoundTrip(t, &r, cReq, cRes, expected); err != nil { + return fmt.Errorf("expectation failed: %w", err) + } + // Enforce no leakage to non-selected pods (ports/ranks are internal). + if !slices.Contains(expectedPodNames, cReq.Pod) { + return fmt.Errorf("unexpected pod %q (expected one of %v)", cReq.Pod, expectedPodNames) + } + + // Validate X-Echo-HTTP-Port vs EPP intent for the pod's IP. + portHdr := getHeaderValue(cRes.Headers, "X-Echo-HTTP-Port") // Header set by backend echo server + if portHdr == "" { + return errors.New("missing X-Echo-HTTP-Port response header") + } + ip := podNameToIP[cReq.Pod] + allowed, ok := ipToAllowedPorts[ip] + if !ok { + return fmt.Errorf("pod %q (IP %s) not present in EPP selection", cReq.Pod, ip) + } + if len(allowed) > 0 { + if _, ok := allowed[portHdr]; !ok { + return fmt.Errorf("unexpected X-Echo-HTTP-Port %q for IP %s (allowed: %v)", portHdr, ip, keys(allowed)) + } + } else { + if _, ok := dpPorts[portHdr]; !ok { + return fmt.Errorf("unexpected X-Echo-HTTP-Port %q for IP %s (expected one of DP ports %v)", portHdr, ip, keys(dpPorts)) + } + } + + return nil + }) + } + if err := g.Wait(); err != nil { + t.Fatalf("Requests were not confined to expected pods (DP) or failed port-header checks: %v", err) + } + t.Logf("DP traffic restricted to %v and port header validated against EPP selection", expectedPodNames) +} + +type portSet map[string]struct{} + +type PodEndpoint struct { + Name string + IP string + Ports []string +} + +func keys(m portSet) []string { + out := make([]string, 0, len(m)) + for k := range m { + out = append(out, k) + } + return out +} + +// getHeaderValue is a case-insensitive header lookup on headers using the key. +func getHeaderValue(headers map[string][]string, key string) string { + for k, v := range headers { + if strings.EqualFold(k, key) && len(v) > 0 { + return v[0] + } + } + return "" +} + +// buildEPPHeader builds the test EPP header (HeaderTestEppEndPointSelectionKey) from ip->ports. +// Empty portSet => emit just "IP". Non-empty => emit "IP:port" for each port. +// Sorted for determinism. +func buildEPPHeader(ipToPorts map[string]portSet) string { + ips := make([]string, 0, len(ipToPorts)) + for ip := range ipToPorts { + ips = append(ips, ip) + } + slices.Sort(ips) + + var tokens []string + for _, ip := range ips { + ports := ipToPorts[ip] + if len(ports) == 0 { + tokens = append(tokens, ip) + continue + } + ps := keys(ports) + slices.Sort(ps) + for _, p := range ps { + tokens = append(tokens, fmt.Sprintf("%s:%s", ip, p)) + } + } + return strings.Join(tokens, ",") +} + +// toEndpoints extracts name, IP, and unique containerPort values per pod. +func toEndpoints(pods []corev1.Pod) []PodEndpoint { + out := make([]PodEndpoint, 0, len(pods)) + for _, p := range pods { + seen := map[int32]struct{}{} + var ports []string + for _, c := range p.Spec.Containers { + for _, cp := range c.Ports { + if _, ok := seen[cp.ContainerPort]; ok { + continue + } + seen[cp.ContainerPort] = struct{}{} + ports = append(ports, strconv.Itoa(int(cp.ContainerPort))) + } + } + out = append(out, PodEndpoint{ + Name: p.Name, + IP: p.Status.PodIP, + Ports: ports, + }) + } + return out +} diff --git a/conformance/tests/gateway_following_epp_routing_dp.yaml b/conformance/tests/gateway_following_epp_routing_dp.yaml new file mode 100644 index 000000000..02f849c2f --- /dev/null +++ b/conformance/tests/gateway_following_epp_routing_dp.yaml @@ -0,0 +1,23 @@ +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: httproute-for-primary-gw-dp + namespace: inference-conformance-app-backend +spec: + parentRefs: + - group: gateway.networking.k8s.io + kind: Gateway + name: conformance-primary + namespace: inference-conformance-infra + sectionName: http + hostnames: + - "primary.example.com" + rules: + - backendRefs: + - group: inference.networking.k8s.io + kind: InferencePool + name: dp-inference-pool + matches: + - path: + type: PathPrefix + value: /primary-gateway-dp-test diff --git a/pkg/epp/scheduling/framework/plugins/test/consts.go b/pkg/epp/scheduling/framework/plugins/test/consts.go index 81c05790c..166dba2e6 100644 --- a/pkg/epp/scheduling/framework/plugins/test/consts.go +++ b/pkg/epp/scheduling/framework/plugins/test/consts.go @@ -17,9 +17,22 @@ limitations under the License. package test const ( - // HeaderTestEppEndPointSelectionKey is the header used for testing purposes to make EPP behavior controllable. - // The header value should be a comma-separated list of endpoint IP addresses. - // E.g., "test-epp-endpoint-selection": "10.0.0.7,10.0.0.8" - // The returned order is the same as the order provided in the header. + // HeaderTestEppEndPointSelectionKey is the request header used in tests to control + // Endpoint Picker (EPP) behavior deterministically. + // + // The header value is a comma-separated list of endpoint identifiers. Each entry + // may be in one of the following formats: + // + // - "IP" — selects all pods whose IP address matches the given value. + // - "IP:port" — selects only pods whose IP and port both match exactly. + // Ports correspond to data-parallel ranks or specific targetPorts. + // + // IPv6 addresses are supported, with or without brackets (e.g. "fd00::1" or "[fd00::1]:3002"). + // The returned order matches the order of endpoints specified in the header, and duplicates + // are ignored. + // + // Examples: + // "test-epp-endpoint-selection": "10.0.0.7,10.0.0.8:3002" + // "test-epp-endpoint-selection": "[fd00::1]:3000,fd00::2" HeaderTestEppEndPointSelectionKey = "test-epp-endpoint-selection" ) diff --git a/pkg/epp/scheduling/framework/plugins/test/filter/filter_test.go b/pkg/epp/scheduling/framework/plugins/test/filter/filter_test.go index d5615bc90..3cd8a6197 100644 --- a/pkg/epp/scheduling/framework/plugins/test/filter/filter_test.go +++ b/pkg/epp/scheduling/framework/plugins/test/filter/filter_test.go @@ -35,88 +35,93 @@ func TestFilter(t *testing.T) { output []types.Pod }{ { - name: "TestHeaderBasedFilter, header endpoint unset in request", - req: &types.LLMRequest{}, // Delieverately unset the header. + name: "header unset in request", + req: &types.LLMRequest{}, // Deliberately unset input: []types.Pod{ - &types.PodMetrics{ - Pod: &backend.Pod{ - Address: "test-endpoint", - }, - }, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1", Port: "3000"}}, }, output: []types.Pod{}, }, { - name: "TestHeaderBasedFilter, header endpoint set in request but no match", - req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "test-endpoint"}}, + name: "header set but no IP match", + req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "10.0.0.99"}}, input: []types.Pod{ - &types.PodMetrics{ - Pod: &backend.Pod{ - Address: "test-endpoint-unmatch", - }, - }, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1", Port: "3000"}}, }, output: []types.Pod{}, }, { - name: "TestHeaderBasedFilter, header endpoint set", - req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "test-endpoint"}}, + name: "IP-only header matches pod (port-agnostic)", + req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "10.0.0.1"}}, input: []types.Pod{ - &types.PodMetrics{ - Pod: &backend.Pod{ - Address: "test-endpoint", - }, - }, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1", Port: "3002"}}, }, output: []types.Pod{ - &types.PodMetrics{ - Pod: &backend.Pod{ - Address: "test-endpoint", - }, - }, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1", Port: "3002"}}, }, }, { - name: "TestHeaderBasedFilter, multiple header endpoints set and multiple matches", - req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "test-endpoint3,test-endpoint2"}}, + name: "IP:port header matches exact port", + req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "10.0.0.1:3002"}}, input: []types.Pod{ - &types.PodMetrics{ - Pod: &backend.Pod{ - Address: "test-endpoint1", - }, - }, - &types.PodMetrics{ - Pod: &backend.Pod{ - Address: "test-endpoint2", - }, - }, - &types.PodMetrics{ - Pod: &backend.Pod{ - Address: "test-endpoint3", - }, - }, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1", Port: "3000"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1", Port: "3002"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.2", Port: "3002"}}, }, output: []types.Pod{ - &types.PodMetrics{ - Pod: &backend.Pod{ - Address: "test-endpoint3", - }, - }, - &types.PodMetrics{ - Pod: &backend.Pod{ - Address: "test-endpoint2", - }, - }, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1", Port: "3002"}}, + }, + }, + { + name: "IP:port header with non-matching port produces no match", + req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "10.0.0.1:9999"}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1", Port: "3002"}}, + }, + output: []types.Pod{}, + }, + { + name: "multiple header values (IP and IP:port) produce multiple matches in order and deduped", + req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "10.0.0.3:3004, 10.0.0.2, 10.0.0.3"}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.1", Port: "3000"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.2", Port: "3002"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.3", Port: "3004"}}, + }, + output: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.3", Port: "3004"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "10.0.0.2", Port: "3002"}}, + }, + }, + { + name: "IPv6 with brackets and port", + req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "[fd00::1]:3002"}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "fd00::1", Port: "3002"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "fd00::2", Port: "3002"}}, + }, + output: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "fd00::1", Port: "3002"}}, + }, + }, + { + name: "IPv6 bare address (no port)", + req: &types.LLMRequest{Headers: map[string]string{test.HeaderTestEppEndPointSelectionKey: "fd00::2"}}, + input: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "fd00::1", Port: "3002"}}, + &types.PodMetrics{Pod: &backend.Pod{Address: "fd00::2", Port: "3004"}}, + }, + output: []types.Pod{ + &types.PodMetrics{Pod: &backend.Pod{Address: "fd00::2", Port: "3004"}}, }, }, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - got := NewHeaderBasedTestingFilter().Filter(context.Background(), types.NewCycleState(), test.req, test.input) - - if diff := cmp.Diff(test.output, got); diff != "" { - t.Errorf("Unexpected output (-want +got): %v", diff) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := NewHeaderBasedTestingFilter().Filter(context.Background(), types.NewCycleState(), tc.req, tc.input) + if diff := cmp.Diff(tc.output, got); diff != "" { + t.Fatalf("Unexpected output (-want +got): %s", diff) } }) } diff --git a/pkg/epp/scheduling/framework/plugins/test/filter/request_header_based_filter.go b/pkg/epp/scheduling/framework/plugins/test/filter/request_header_based_filter.go index 2836755d4..bce3be11a 100644 --- a/pkg/epp/scheduling/framework/plugins/test/filter/request_header_based_filter.go +++ b/pkg/epp/scheduling/framework/plugins/test/filter/request_header_based_filter.go @@ -19,6 +19,7 @@ package filter import ( "context" "encoding/json" + "net" "strings" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" @@ -64,24 +65,74 @@ func (f *HeaderBasedTestingFilter) WithName(name string) *HeaderBasedTestingFilt return f } -// Filter selects pods that match the IP addresses specified in the request header. +// Filter selects pods whose IP or IP:port matches any value in the +// "test-epp-endpoint-selection" header. Values may be "IP" or "IP:port". +// If a port is provided, only an exact IP:port match is accepted. func (f *HeaderBasedTestingFilter) Filter(_ context.Context, _ *types.CycleState, request *types.LLMRequest, pods []types.Pod) []types.Pod { - headerValue, ok := request.Headers[test.HeaderTestEppEndPointSelectionKey] - if !ok || headerValue == "" { + hv, ok := request.Headers[test.HeaderTestEppEndPointSelectionKey] + if !ok || strings.TrimSpace(hv) == "" { return []types.Pod{} } - podAddressMap := make(map[string]types.Pod, len(pods)) - for _, pod := range pods { - podAddressMap[pod.GetPod().GetIPAddress()] = pod + normalizeIP := func(s string) string { return strings.Trim(s, "[]") } + + // Build lookup maps: + // ip -> pod + // ip:port -> pod (only when pod GetPort() is non-empty) + ipToPod := make(map[string]types.Pod, len(pods)) + hpToPod := make(map[string]types.Pod, len(pods)) + for _, p := range pods { + if p == nil || p.GetPod() == nil { + continue + } + ip := normalizeIP(strings.TrimSpace(p.GetPod().GetIPAddress())) + if ip == "" { + continue + } + ipToPod[ip] = p + if port := strings.TrimSpace(p.GetPod().GetPort()); port != "" { + hpToPod[ip+":"+port] = p + } } - endpoints := strings.Split(headerValue, ",") - filteredPods := make([]types.Pod, 0, len(endpoints)) - for _, endpoint := range endpoints { - trimmedEndpoint := strings.TrimSpace(endpoint) - if pod, found := podAddressMap[trimmedEndpoint]; found { - filteredPods = append(filteredPods, pod) + headerVals := strings.Split(hv, ",") + filteredPods := make([]types.Pod, 0, len(headerVals)) + seen := make(map[string]struct{}, len(headerVals)) // de-dupe by pod IP + + for _, raw := range headerVals { + item := strings.TrimSpace(raw) + if item == "" { + continue + } + + host := item + port := "" + if h, pt, err := net.SplitHostPort(item); err == nil { + host, port = h, pt + } else { + host = normalizeIP(host) // bare IP, possibly bracketed IPv6 + } + host = normalizeIP(host) + + var pod types.Pod + if port != "" { + // Require an exact ip:port match + if p, ok := hpToPod[host+":"+port]; ok { + pod = p + } + } else { + // IP-only selection + if p, ok := ipToPod[host]; ok { + pod = p + } + } + + if pod != nil { + ip := normalizeIP(pod.GetPod().GetIPAddress()) + if _, dup := seen[ip]; !dup { + seen[ip] = struct{}{} + filteredPods = append(filteredPods, pod) + } } } return filteredPods From 53bfda78aab800423aba5a31b1ba936904aebd00 Mon Sep 17 00:00:00 2001 From: Daneyon Hansen Date: Wed, 5 Nov 2025 15:48:12 -0800 Subject: [PATCH 2/2] Use port from json body and go.mod replace Signed-off-by: Daneyon Hansen --- conformance/resources/base.yaml | 10 +-- .../tests/gateway_following_epp_routing_dp.go | 74 ++++++++----------- go.mod | 9 ++- go.sum | 12 +-- 4 files changed, 48 insertions(+), 57 deletions(-) diff --git a/conformance/resources/base.yaml b/conformance/resources/base.yaml index e74535e74..3b4933caf 100644 --- a/conformance/resources/base.yaml +++ b/conformance/resources/base.yaml @@ -77,7 +77,7 @@ spec: spec: containers: - name: echoserver - image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20251106-v1.3.0-263-g47c3435c ports: - containerPort: 3000 readinessProbe: @@ -121,7 +121,7 @@ spec: spec: containers: - name: echoserver - image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20251106-v1.3.0-263-g47c3435c ports: - containerPort: 3000 readinessProbe: @@ -360,7 +360,7 @@ spec: spec: containers: - name: echoserver-3000 - image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20251106-v1.3.0-263-g47c3435c ports: - containerPort: 3000 readinessProbe: @@ -390,7 +390,7 @@ spec: fieldRef: fieldPath: status.podIP - name: echoserver-3002 - image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20251106-v1.3.0-263-g47c3435c ports: - containerPort: 3002 readinessProbe: @@ -420,7 +420,7 @@ spec: fieldRef: fieldPath: status.podIP - name: echoserver-3004 - image: gcr.io/k8s-staging-gateway-api/echo-basic:v20240412-v1.0.0-394-g40c666fd + image: gcr.io/k8s-staging-gateway-api/echo-basic:v20251106-v1.3.0-263-g47c3435c ports: - containerPort: 3004 readinessProbe: diff --git a/conformance/tests/gateway_following_epp_routing_dp.go b/conformance/tests/gateway_following_epp_routing_dp.go index d608beb21..0067095ad 100644 --- a/conformance/tests/gateway_following_epp_routing_dp.go +++ b/conformance/tests/gateway_following_epp_routing_dp.go @@ -92,26 +92,28 @@ var GatewayFollowingEPPRoutingWithDataParallelism = suite.ConformanceTest{ }` // Single-pod pin to ensure header filter works before main test cases. - gwhttp.MakeRequestAndExpectEventuallyConsistentResponse( - t, - s.RoundTripper, - s.TimeoutConfig, - gwAddr, - gwhttp.ExpectedResponse{ - Request: gwhttp.Request{ - Host: hostname, - Path: path, - Method: http.MethodPost, - Body: requestBody, - Headers: map[string]string{ - test.HeaderTestEppEndPointSelectionKey: backends[0].IP, + for _, backend := range backends { + gwhttp.MakeRequestAndExpectEventuallyConsistentResponse( + t, + s.RoundTripper, + s.TimeoutConfig, + gwAddr, + gwhttp.ExpectedResponse{ + Request: gwhttp.Request{ + Host: hostname, + Path: path, + Method: http.MethodPost, + Body: requestBody, + Headers: map[string]string{ + test.HeaderTestEppEndPointSelectionKey: backend.IP, + }, }, + Response: gwhttp.Response{StatusCodes: []int{http.StatusOK}}, + Backend: backend.Name, + Namespace: resources.AppBackendNamespace, }, - Response: gwhttp.Response{StatusCodes: []int{http.StatusOK}}, - Backend: backends[0].Name, - Namespace: resources.AppBackendNamespace, - }, - ) + ) + } testCases := []struct { name string @@ -208,15 +210,14 @@ func assertTrafficOnlyReachesToExpectedPodsDP( if err := gwhttp.CompareRoundTrip(t, &r, cReq, cRes, expected); err != nil { return fmt.Errorf("expectation failed: %w", err) } - // Enforce no leakage to non-selected pods (ports/ranks are internal). + // Enforce no leakage to non-selected pods. if !slices.Contains(expectedPodNames, cReq.Pod) { return fmt.Errorf("unexpected pod %q (expected one of %v)", cReq.Pod, expectedPodNames) } - // Validate X-Echo-HTTP-Port vs EPP intent for the pod's IP. - portHdr := getHeaderValue(cRes.Headers, "X-Echo-HTTP-Port") // Header set by backend echo server - if portHdr == "" { - return errors.New("missing X-Echo-HTTP-Port response header") + // Validate httpPort from JSON response body vs EPP intent. + if cReq.HTTPPort == "" { + return errors.New("missing httpPort in echo JSON body response") } ip := podNameToIP[cReq.Pod] allowed, ok := ipToAllowedPorts[ip] @@ -224,12 +225,12 @@ func assertTrafficOnlyReachesToExpectedPodsDP( return fmt.Errorf("pod %q (IP %s) not present in EPP selection", cReq.Pod, ip) } if len(allowed) > 0 { - if _, ok := allowed[portHdr]; !ok { - return fmt.Errorf("unexpected X-Echo-HTTP-Port %q for IP %s (allowed: %v)", portHdr, ip, keys(allowed)) + if _, ok := allowed[cReq.HTTPPort]; !ok { + return fmt.Errorf("unexpected httpPort %q for IP %s (allowed: %v)", cReq.HTTPPort, ip, keys(allowed)) } } else { - if _, ok := dpPorts[portHdr]; !ok { - return fmt.Errorf("unexpected X-Echo-HTTP-Port %q for IP %s (expected one of DP ports %v)", portHdr, ip, keys(dpPorts)) + if _, ok := dpPorts[cReq.HTTPPort]; !ok { + return fmt.Errorf("unexpected httpPort %q for IP %s (expected one of ports %v)", cReq.HTTPPort, ip, keys(dpPorts)) } } @@ -237,9 +238,9 @@ func assertTrafficOnlyReachesToExpectedPodsDP( }) } if err := g.Wait(); err != nil { - t.Fatalf("Requests were not confined to expected pods (DP) or failed port-header checks: %v", err) + t.Fatalf("Requests were not confined to expected pods or failed port checks: %v", err) } - t.Logf("DP traffic restricted to %v and port header validated against EPP selection", expectedPodNames) + t.Logf("Traffic restricted to %v and httpPort validated against EPP selection", expectedPodNames) } type portSet map[string]struct{} @@ -258,19 +259,8 @@ func keys(m portSet) []string { return out } -// getHeaderValue is a case-insensitive header lookup on headers using the key. -func getHeaderValue(headers map[string][]string, key string) string { - for k, v := range headers { - if strings.EqualFold(k, key) && len(v) > 0 { - return v[0] - } - } - return "" -} - -// buildEPPHeader builds the test EPP header (HeaderTestEppEndPointSelectionKey) from ip->ports. -// Empty portSet => emit just "IP". Non-empty => emit "IP:port" for each port. -// Sorted for determinism. +// buildEPPHeader builds the test EPP header from ip->ports. AN empty portSet => emit just "IP". +// A non-empty => emit "IP:port" for each port. Sorted for determinism. func buildEPPHeader(ipToPorts map[string]portSet) string { ips := make([]string, 0, len(ipToPorts)) for ip := range ipToPorts { diff --git a/go.mod b/go.mod index ab54e64f3..43460dd05 100644 --- a/go.mod +++ b/go.mod @@ -36,11 +36,13 @@ require ( sigs.k8s.io/controller-runtime v0.22.4 // Update the CONTROLLER_TOOLS_VERSION in Makefile when bumping controller-tools. sigs.k8s.io/controller-tools v0.19.0 - sigs.k8s.io/gateway-api v1.4.0 + sigs.k8s.io/gateway-api v1.3.1-0.20251106052652-079e4774d76b sigs.k8s.io/structured-merge-diff/v6 v6.3.0 sigs.k8s.io/yaml v1.6.0 ) +require github.com/go-logr/zapr v1.3.0 + require ( cel.dev/expr v0.24.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect @@ -61,7 +63,6 @@ require ( github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/go-logr/zapr v1.3.0 // indirect github.com/go-openapi/jsonpointer v0.21.2 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.1 // indirect @@ -108,14 +109,14 @@ require ( go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.43.0 // indirect golang.org/x/exp v0.0.0-20250808145144-a408d31f581a // indirect - golang.org/x/mod v0.28.0 // indirect + golang.org/x/mod v0.29.0 // indirect golang.org/x/net v0.46.0 // indirect golang.org/x/oauth2 v0.32.0 // indirect golang.org/x/sys v0.37.0 // indirect golang.org/x/term v0.36.0 // indirect golang.org/x/text v0.30.0 // indirect golang.org/x/time v0.13.0 // indirect - golang.org/x/tools v0.37.0 // indirect + golang.org/x/tools v0.38.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250929231259-57b25ae835d4 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 // indirect diff --git a/go.sum b/go.sum index c330b550d..c8020467a 100644 --- a/go.sum +++ b/go.sum @@ -311,8 +311,8 @@ golang.org/x/exp v0.0.0-20250808145144-a408d31f581a h1:Y+7uR/b1Mw2iSXZ3G//1haIiS golang.org/x/exp v0.0.0-20250808145144-a408d31f581a/go.mod h1:rT6SFzZ7oxADUDx58pcaKFTcZ+inxAa9fTrYx/uVYwg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.28.0 h1:gQBtGhjxykdjY9YhZpSlZIsbnaE2+PgjfLWUQTnoZ1U= -golang.org/x/mod v0.28.0/go.mod h1:yfB/L0NOf/kmEbXjzCPOx1iK1fRutOydrCMsqRhEBxI= +golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= +golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -343,8 +343,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.37.0 h1:DVSRzp7FwePZW356yEAChSdNcQo6Nsp+fex1SUW09lE= -golang.org/x/tools v0.37.0/go.mod h1:MBN5QPQtLMHVdvsbtarmTNukZDdgwdwlO5qGacAzF0w= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= golang.org/x/tools/go/expect v0.1.1-deprecated h1:jpBZDwmgPhXsKZC6WhL20P4b/wmnpsEAGHaNy0n/rJM= golang.org/x/tools/go/expect v0.1.1-deprecated/go.mod h1:eihoPOH+FgIqa3FpoTwguz/bVUSGBlGQU67vpBeOrBY= golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated h1:1h2MnaIAIXISqTFKdENegdpAgUXz6NrPEsbIeWaBRvM= @@ -409,8 +409,8 @@ sigs.k8s.io/controller-runtime v0.22.4 h1:GEjV7KV3TY8e+tJ2LCTxUTanW4z/FmNB7l327U sigs.k8s.io/controller-runtime v0.22.4/go.mod h1:+QX1XUpTXN4mLoblf4tqr5CQcyHPAki2HLXqQMY6vh8= sigs.k8s.io/controller-tools v0.19.0 h1:OU7jrPPiZusryu6YK0jYSjPqg8Vhf8cAzluP9XGI5uk= sigs.k8s.io/controller-tools v0.19.0/go.mod h1:y5HY/iNDFkmFla2CfQoVb2AQXMsBk4ad84iR1PLANB0= -sigs.k8s.io/gateway-api v1.4.0 h1:ZwlNM6zOHq0h3WUX2gfByPs2yAEsy/EenYJB78jpQfQ= -sigs.k8s.io/gateway-api v1.4.0/go.mod h1:AR5RSqciWP98OPckEjOjh2XJhAe2Na4LHyXD2FUY7Qk= +sigs.k8s.io/gateway-api v1.3.1-0.20251106052652-079e4774d76b h1:CoVExRHGK0xoewqKEtip+CXa/PPIxiEKeNJaKdEz5o0= +sigs.k8s.io/gateway-api v1.3.1-0.20251106052652-079e4774d76b/go.mod h1:eEYVpDGr0WPqR/35ZTBIWWpwKL7uUzOqlT92mmv3fus= sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg= sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU=