Skip to content

Commit d788a2c

Browse files
authored
Add Produces and Consumes methods to Plugin (#1754)
* Add consumes and produces method to plugin with stub implementations * Refactor: Move metrics constants to a separate package * Add a specific Consumer Plugin * Add descriptive comments and cleanup code * Remove empty Consumes implementations from tests
1 parent e2f14ff commit d788a2c

File tree

7 files changed

+63
-0
lines changed

7 files changed

+63
-0
lines changed

pkg/epp/datalayer/metrics/extractor.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
dto "github.com/prometheus/client_model/go"
2929

3030
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
31+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
3132
)
3233

3334
const (
@@ -47,6 +48,17 @@ type Extractor struct {
4748
mapping *Mapping
4849
}
4950

51+
func Produces() map[string]any {
52+
return map[string]any{
53+
metrics.WaitingQueueSizeKey: int(0),
54+
metrics.KVCacheUsagePercentKey: float64(0),
55+
metrics.ActiveModelsKey: map[string]int{},
56+
metrics.WaitingModelsKey: map[string]int{},
57+
metrics.MaxActiveModelsKey: int(0),
58+
metrics.UpdateTimeKey: time.Time{},
59+
}
60+
}
61+
5062
// NewExtractor returns a new model server protocol (MSP) metrics extractor,
5163
// configured with the given metrics' specifications.
5264
// These are mandatory metrics per the MSP specification, and are used

pkg/epp/metrics/metrics.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ const (
3434
InferenceObjectiveComponent = "inference_objective"
3535
InferencePoolComponent = "inference_pool"
3636
InferenceExtension = "inference_extension"
37+
38+
KVCacheUsagePercentKey = "KVCacheUsagePercent"
39+
WaitingQueueSizeKey = "WaitingQueueSize"
40+
MaxActiveModelsKey = "MaxActiveModels"
41+
ActiveModelsKey = "ActiveModels"
42+
WaitingModelsKey = "WaitingModels"
43+
UpdateTimeKey = "UpdateTime"
3744
)
3845

3946
var (

pkg/epp/plugins/plugins.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,21 @@ type Plugin interface {
2222
// TypedName returns the type and name tuple of this plugin instance.
2323
TypedName() TypedName
2424
}
25+
26+
// ConsumerPlugin defines the interface for a consumer.
27+
type ConsumerPlugin interface {
28+
Plugin
29+
// Consumes returns data consumed by the plugin.
30+
// This is a map from data key (string) produced to
31+
// the data type of the key (represented as data with default value casted as any field).
32+
Consumes() map[string]any
33+
}
34+
35+
// ProducerPlugin defines the interface for a producer.
36+
type ProducerPlugin interface {
37+
Plugin
38+
// Produces returns data produced by the producer.
39+
// This is a map from data key (string) produced to
40+
// the data type of the key (represented as data with default value casted as any field).
41+
Produces() map[string]any
42+
}

pkg/epp/scheduling/framework/plugins/scorer/kvcache_utilization.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"encoding/json"
2222

23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
2324
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2425
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2526
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
@@ -54,6 +55,13 @@ func (s *KVCacheUtilizationScorer) TypedName() plugins.TypedName {
5455
return s.typedName
5556
}
5657

58+
// Consumes returns the list of data that is consumed by the plugin.
59+
func (s *KVCacheUtilizationScorer) Consumes() map[string]any {
60+
return map[string]any{
61+
metrics.KVCacheUsagePercentKey: float64(0),
62+
}
63+
}
64+
5765
// WithName sets the name of the scorer.
5866
func (s *KVCacheUtilizationScorer) WithName(name string) *KVCacheUtilizationScorer {
5967
s.typedName.Name = name

pkg/epp/scheduling/framework/plugins/scorer/lora_affinity.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"encoding/json"
2222

23+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
2324
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2425
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2526
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
@@ -54,6 +55,14 @@ func (s *LoraAffinityScorer) TypedName() plugins.TypedName {
5455
return s.tn
5556
}
5657

58+
// Consumes returns the list of data that is consumed by the plugin.
59+
func (s *LoraAffinityScorer) Consumes() map[string]any {
60+
return map[string]any{
61+
metrics.ActiveModelsKey: map[string]int{},
62+
metrics.WaitingModelsKey: map[string]int{},
63+
}
64+
}
65+
5766
// WithName sets the name of the scorer.
5867
func (s *LoraAffinityScorer) WithName(name string) *LoraAffinityScorer {
5968
s.tn.Name = name

pkg/epp/scheduling/framework/plugins/scorer/queue.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"encoding/json"
2222
"math"
2323

24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
2425
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
2526
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2627
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
@@ -56,6 +57,13 @@ func (s *QueueScorer) TypedName() plugins.TypedName {
5657
return s.typedName
5758
}
5859

60+
// Consumes returns the list of data that is consumed by the plugin.
61+
func (s *QueueScorer) Consumes() map[string]any {
62+
return map[string]any{
63+
metrics.WaitingQueueSizeKey: int(0),
64+
}
65+
}
66+
5967
// WithName sets the name of the scorer.
6068
func (s *QueueScorer) WithName(name string) *QueueScorer {
6169
s.typedName.Name = name

pkg/epp/scheduling/types/cycle_state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ func NewCycleState() *CycleState {
3333
// CycleState does not provide any data protection, as all plugins are assumed to be
3434
// trusted.
3535
// Note: CycleState uses a sync.Map to back the storage, because it is thread safe. It's aimed to optimize for the "write once and read many times" scenarios.
36+
// TODO: Perhaps, deprecate CycleState once datalayer producer-consumer changes are made.
3637
type CycleState struct {
3738
// key: StateKey, value: StateData
3839
storage sync.Map

0 commit comments

Comments
 (0)