Skip to content

Commit 5bc7425

Browse files
authored
Multi cycle scheduler (kubernetes-sigs#862)
* code review Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * minor change Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * add support for multi cycle scheduling Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * minor change Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * moved plugins under plugins dir Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * few more changes Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * moved RunCycle logic into SchedulerProfile Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * minor changes Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * linter Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> * minor change in unit-test Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com> --------- Signed-off-by: Nir Rozenbaum <nirro@il.ibm.com>
1 parent 28229bf commit 5bc7425

30 files changed

+752
-612
lines changed

cmd/epp/main.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,12 @@ import (
4141
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
4242
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics/collectors"
4343
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling"
44-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/filter"
45-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/multi/prefix"
46-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/picker"
47-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins/scorer"
44+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
45+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/filter"
46+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/multi/prefix"
47+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker"
48+
profilepicker "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/profile-picker"
49+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/scorer"
4850
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/server"
4951
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
5052
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
@@ -202,20 +204,21 @@ func run() error {
202204
queueScorerWeight := envutil.GetEnvInt("QUEUE_SCORE_WEIGHT", scorer.DefaultQueueScorerWeight, setupLog)
203205
kvCacheScorerWeight := envutil.GetEnvInt("KV_CACHE_SCORE_WEIGHT", scorer.DefaultKVCacheScorerWeight, setupLog)
204206

205-
schedulerConfig := scheduling.NewSchedulerConfig().
207+
schedulerProfile := framework.NewSchedulerProfile().
206208
WithFilters(filter.NewSheddableCapacityFilter()).
207-
WithScorers(scorer.NewWeightedScorer(&scorer.QueueScorer{}, queueScorerWeight),
208-
scorer.NewWeightedScorer(&scorer.KVCacheScorer{}, kvCacheScorerWeight)).
209+
WithScorers(framework.NewWeightedScorer(&scorer.QueueScorer{}, queueScorerWeight),
210+
framework.NewWeightedScorer(&scorer.KVCacheScorer{}, kvCacheScorerWeight)).
209211
WithPicker(picker.NewMaxScorePicker())
210212

211213
if prefixCacheScheduling == "true" {
212214
prefixScorerWeight := envutil.GetEnvInt("PREFIX_CACHE_SCORE_WEIGHT", prefix.DefaultScorerWeight, setupLog)
213-
if err := schedulerConfig.AddPlugins(scorer.NewWeightedScorer(prefix.New(loadPrefixCacheConfig()), prefixScorerWeight)); err != nil {
215+
if err := schedulerProfile.AddPlugins(framework.NewWeightedScorer(prefix.New(loadPrefixCacheConfig()), prefixScorerWeight)); err != nil {
214216
setupLog.Error(err, "Failed to register scheduler plugins")
215217
return err
216218
}
217219
}
218220

221+
schedulerConfig := scheduling.NewSchedulerConfig(profilepicker.NewAllProfilesPicker(), map[string]*framework.SchedulerProfile{"schedulerv2": schedulerProfile})
219222
scheduler = scheduling.NewSchedulerWithConfig(datastore, schedulerConfig)
220223
}
221224
serverRunner := &runserver.ExtProcServerRunner{

pkg/epp/requestcontrol/director.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
)
3636

3737
type Scheduler interface {
38-
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result *schedulingtypes.Result, err error)
38+
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result map[string]*schedulingtypes.Result, err error)
3939
OnResponse(ctx context.Context, resp *schedulingtypes.LLMResponse, targetPodName string)
4040
}
4141

@@ -108,23 +108,27 @@ func (d *Director) HandleRequest(ctx context.Context, reqCtx *handlers.RequestCo
108108
}
109109

110110
// Dispatch runs one or many scheduling cycles.
111-
func (d *Director) Dispatch(ctx context.Context, llmReq *schedulingtypes.LLMRequest) ([]*schedulingtypes.Result, error) {
111+
func (d *Director) Dispatch(ctx context.Context, llmReq *schedulingtypes.LLMRequest) (map[string]*schedulingtypes.Result, error) {
112112
var err error
113113
res, err := d.scheduler.Schedule(ctx, llmReq)
114114
if err != nil {
115115
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
116116
}
117117

118-
return []*schedulingtypes.Result{res}, nil
118+
return res, nil // TODO handle multi cycle result after defining the PostDispatch extension point
119119
}
120120

121-
func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestContext, results []*schedulingtypes.Result) (*handlers.RequestContext, error) {
121+
func (d *Director) PostDispatch(ctx context.Context, reqCtx *handlers.RequestContext, results map[string]*schedulingtypes.Result) (*handlers.RequestContext, error) {
122122
logger := log.FromContext(ctx)
123123
// currently only get a single result. Will refactor to pluggably implement the PostSchedule
124124
if len(results) == 0 {
125125
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: "results must be greater than zero"}
126126
}
127-
targetPod := results[0].TargetPod.GetPod()
127+
var targetPod *backend.Pod
128+
// TODO should handle multi cycle results, this should be pluggable logic
129+
for _, result := range results {
130+
targetPod = result.TargetPod.GetPod()
131+
}
128132

129133
pool, err := d.datastore.PoolGet()
130134
if err != nil {

pkg/epp/scheduling/plugins/plugins.go renamed to pkg/epp/scheduling/framework/plugins.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,19 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package plugins
17+
package framework
1818

1919
import (
2020
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2121
)
2222

2323
const (
24-
PreSchedulerPluginType = "PreSchedule"
24+
ProfilePickerType = "ProfilePicker"
25+
PreCyclePluginType = "PreCycle"
2526
FilterPluginType = "Filter"
2627
ScorerPluginType = "Scorer"
27-
PostSchedulePluginType = "PostSchedule"
2828
PickerPluginType = "Picker"
29+
PostCyclePluginType = "PostCycle"
2930
PostResponsePluginType = "PostResponse"
3031
)
3132

@@ -36,11 +37,18 @@ type Plugin interface {
3637
Name() string
3738
}
3839

39-
// PreSchedule is called when the scheduler receives a new request. It can be used for various
40-
// initialization work.
41-
type PreSchedule interface {
40+
// ProfilePicker selects the SchedulingProfiles to run from a list of candidate profiles, while taking into consideration the request properties
41+
// and the previously executed SchedluderProfile cycles along with their results.
42+
type ProfilePicker interface {
4243
Plugin
43-
PreSchedule(ctx *types.SchedulingContext)
44+
Pick(request *types.LLMRequest, profiles map[string]*SchedulerProfile, executionResults map[string]*types.Result) map[string]*SchedulerProfile
45+
}
46+
47+
// PreCycle is called when the scheduler receives a new request and invokes a SchedulerProfile cycle.
48+
// It can be used for various initialization work.
49+
type PreCycle interface {
50+
Plugin
51+
PreCycle(ctx *types.SchedulingContext)
4452
}
4553

4654
// Filter defines the interface for filtering a list of pods based on context.
@@ -62,10 +70,10 @@ type Picker interface {
6270
Pick(ctx *types.SchedulingContext, scoredPods []*types.ScoredPod) *types.Result
6371
}
6472

65-
// PostSchedule is called by the scheduler after it selects a targetPod for the request.
66-
type PostSchedule interface {
73+
// PostCycle is called by the scheduler after it selects a targetPod for the request in the SchedulerProfile cycle.
74+
type PostCycle interface {
6775
Plugin
68-
PostSchedule(ctx *types.SchedulingContext, res *types.Result)
76+
PostCycle(ctx *types.SchedulingContext, res *types.Result)
6977
}
7078

7179
// PostResponse is called by the scheduler after a successful response was sent.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Scheduling Plugins
2+
3+
This package contains the scheduling plugin implementations.
4+
5+
Plugins are organized by the following rule. Follow this rule when adding a new
6+
plugin.
7+
8+
```
9+
plugins/
10+
|__ filter/(Plugins that implement the Filter interface only.)
11+
|__ scorer/ (Plugins that implement the Scorer interface only.)
12+
|__ picker/(Plugins that implement the Picker interface only.)
13+
|__ multi/ (Plugins that implement multiple plugin interfaces.)
14+
|____prefix/ (Prefix cache aware scheduling plugin.)
15+
```

pkg/epp/scheduling/plugins/filter/decision_tree_filter.go renamed to pkg/epp/scheduling/framework/plugins/filter/decision_tree_filter.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,31 @@ limitations under the License.
1717
package filter
1818

1919
import (
20-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
20+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2121
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2222
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2323
)
2424

25-
// compile-time type validation
26-
var _ plugins.Filter = &DecisionTreeFilter{}
25+
// compile-time type assertion
26+
var _ framework.Filter = &DecisionTreeFilter{}
2727

2828
// DecisionTreeFilter applies current fitler, and then recursively applies next filters
2929
// depending success or failure of the current filter.
3030
// It can be used to construct a flow chart algorithm.
3131
type DecisionTreeFilter struct {
32-
Current plugins.Filter
32+
Current framework.Filter
3333
// NextOnSuccess filter will be applied after successfully applying the current filter.
3434
// The filtered results will be passed to the next filter.
35-
NextOnSuccess plugins.Filter
35+
NextOnSuccess framework.Filter
3636
// NextOnFailure filter will be applied if current filter results in no pods.
3737
// The original input will be passed to the next filter.
38-
NextOnFailure plugins.Filter
38+
NextOnFailure framework.Filter
3939
// NextOnSuccessOrFailure is a convenience field to configure the next filter regardless of the
4040
// success or failure of the current filter.
4141
// NOTE: When using NextOnSuccessOrFailure, both nextOnSuccess and nextOnFailure SHOULD be nil.
4242
// However if that's not the case, nextOnSuccess and nextOnFailure will be used, instead of
4343
// NextOnSuccessOrFailure, in the success and failure scenarios, respectively.
44-
NextOnSuccessOrFailure plugins.Filter
44+
NextOnSuccessOrFailure framework.Filter
4545
}
4646

4747
// Name returns the name of the filter.

pkg/epp/scheduling/plugins/filter/filter_test.go renamed to pkg/epp/scheduling/framework/plugins/filter/filter_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@ import (
2626
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
2727
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
2828
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
29-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
29+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
3030
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
3131
)
3232

33+
// compile-time type assertion
34+
var _ framework.Filter = &filterAll{}
35+
3336
type filterAll struct{}
3437

3538
func (f *filterAll) Name() string {
@@ -44,7 +47,7 @@ func TestFilter(t *testing.T) {
4447
tests := []struct {
4548
name string
4649
req *types.LLMRequest
47-
filter plugins.Filter
50+
filter framework.Filter
4851
input []types.Pod
4952
output []types.Pod
5053
}{

pkg/epp/scheduling/plugins/filter/least_kvcache_filter.go renamed to pkg/epp/scheduling/framework/plugins/filter/least_kvcache_filter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ package filter
1919
import (
2020
"math"
2121

22-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2323
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2424
)
2525

26-
// compile-time type validation
27-
var _ plugins.Filter = &LeastKVCacheFilter{}
26+
// compile-time type assertion
27+
var _ framework.Filter = &LeastKVCacheFilter{}
2828

2929
// NewLeastKVCacheFilter initializes a new LeastKVCacheFilter and returns its pointer.
3030
func NewLeastKVCacheFilter() *LeastKVCacheFilter {

pkg/epp/scheduling/plugins/filter/least_queue_filter.go renamed to pkg/epp/scheduling/framework/plugins/filter/least_queue_filter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ package filter
1919
import (
2020
"math"
2121

22-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
22+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2323
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2424
)
2525

26-
// compile-time type validation
27-
var _ plugins.Filter = &LeastQueueFilter{}
26+
// compile-time type assertion
27+
var _ framework.Filter = &LeastQueueFilter{}
2828

2929
// NewLeastQueueFilter initializes a new LeastQueueFilter and returns its pointer.
3030
func NewLeastQueueFilter() *LeastQueueFilter {

pkg/epp/scheduling/plugins/filter/lora_affinity_filter.go renamed to pkg/epp/scheduling/framework/plugins/filter/lora_affinity_filter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@ import (
2121
"time"
2222

2323
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
24-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
24+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2525
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2626
)
2727

28-
// compile-time type validation
29-
var _ plugins.Filter = &LoraAffinityFilter{}
28+
// compile-time type assertion
29+
var _ framework.Filter = &LoraAffinityFilter{}
3030

3131
// NewLoraAffinityFilter initializes a new LoraAffinityFilter and returns its pointer.
3232
func NewLoraAffinityFilter() *LoraAffinityFilter {

pkg/epp/scheduling/plugins/filter/low_queue_filter.go renamed to pkg/epp/scheduling/framework/plugins/filter/low_queue_filter.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ package filter
1818

1919
import (
2020
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/config"
21-
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/plugins"
21+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
2222
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
2323
)
2424

25-
// compile-time type validation
26-
var _ plugins.Filter = &LowQueueFilter{}
25+
// compile-time type assertion
26+
var _ framework.Filter = &LowQueueFilter{}
2727

2828
// NewLowQueueFilter initializes a new LowQueueFilter and returns its pointer.
2929
func NewLowQueueFilter() *LowQueueFilter {

0 commit comments

Comments
 (0)