Skip to content

Commit ae2f070

Browse files
committed
admission: compute cpu time token refill rates
This commit introduces a linear model that computes cpu time token refill rates. cpuTimeTokenFiller uses this model to determine how many tokens to add per second to the buckets in cpuTimeTokenGranter. Fixes: #154471 Release note: None.
1 parent 6ecd28b commit ae2f070

File tree

6 files changed

+205
-97
lines changed

6 files changed

+205
-97
lines changed

pkg/util/admission/cpu_time_token_filler.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ func (m *cpuTimeTokenLinearModel) init() {
272272
_, cpuCapacity := m.cpuMetricProvider.GetCPUInfo()
273273
m.cpuCapacity = cpuCapacity
274274
m.tokenToCPUTimeMultiplier = 1
275-
_ = m.calculateRefillRates()
275+
_ = m.updateRefillRates()
276276
}
277277

278278
// fit adjusts tokenToCPUTimeMultiplier based on CPU usage & token usage. fit
@@ -344,28 +344,28 @@ func (m *cpuTimeTokenLinearModel) fit() [numResourceTiers][numBurstQualification
344344
alpha*tokenToCPUTimeMultiplier + (1-alpha)*m.tokenToCPUTimeMultiplier
345345
}
346346

347-
return m.calculateRefillRates()
347+
return m.updateRefillRates()
348348
}
349349

350-
// calculateRefillRates computes refill rates from tokenToCPUTimeMultiplier &
351-
// the admission.cpu_time_tokens.target_util cluster setting. calculateRefillRates
352-
// returns the delta refill rates. That is calculateRefillRates returns the difference
350+
// updateRefillRates computes refill rates from tokenToCPUTimeMultiplier &
351+
// the admission.cpu_time_tokens.target_util cluster setting. updateRefillRates
352+
// returns the delta refill rates. That is updateRefillRates returns the difference
353353
// in tokens to add per interval (1s) from previous call to fit to this one.
354-
func (m *cpuTimeTokenLinearModel) calculateRefillRates() [numResourceTiers][numBurstQualifications]int64 {
355-
// Compute goals from cluster setting.
354+
func (m *cpuTimeTokenLinearModel) updateRefillRates() [numResourceTiers][numBurstQualifications]int64 {
355+
// Compute goals from cluster setting. Algorithmically, it is okay if some of
356+
// the below goalUtils are greater than 1. This would mean greater risk of
357+
// goroutine scheduling latency, but there is no immediate problem -- the
358+
// greater some goalUtil is, the more CPU time tokens will be in the corresponding
359+
// bucket.
356360
var goalUtils [numResourceTiers][numBurstQualifications]float64
357361
util := KVCPUTimeUtilGoal.Get(&m.settings.SV)
358-
goalUtils[appTenant][noBurst] = util
359-
// Algorithmically, it is okay if some of the below goalUtils are greater
360-
// than 1. This would mean greater risk of goroutine scheduling latency, but
361-
// there is no immediate problem -- the greater some goalUtil is, the more CPU
362-
// time tokens will be in the corresponding bucket.
363-
goalUtils[appTenant][canBurst] = util + 0.05
364-
goalUtils[systemTenant][noBurst] = util + 0.1
365-
// The system tenant will never set canBurst = true, so we do not fill
366-
// that bucket with tokens.
367-
// TODO(): Check that this is fine.
368-
goalUtils[systemTenant][canBurst] = 0
362+
var iter float64
363+
for tier := int(numResourceTiers - 1); tier >= 0; tier-- {
364+
for qual := int(numBurstQualifications - 1); qual >= 0; qual-- {
365+
goalUtils[tier][qual] = util + 0.05*iter
366+
iter++
367+
}
368+
}
369369

370370
// Update refill rates. Return change in rates via delta.
371371
var delta [numResourceTiers][numBurstQualifications]int64

pkg/util/admission/cpu_time_token_filler_test.go

Lines changed: 110 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/util/log"
1818
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1919
"github.com/cockroachdb/datadriven"
20+
"github.com/stretchr/testify/require"
2021
)
2122

2223
func TestCPUTimeTokenFiller(t *testing.T) {
@@ -81,13 +82,13 @@ func (a *testTokenAllocator) allocateTokens(remainingTicks int64) {
8182

8283
type testModel struct {
8384
rates [numResourceTiers][numBurstQualifications]int64
85+
delta [numResourceTiers][numBurstQualifications]int64
8486
}
8587

8688
func (m *testModel) init() {}
8789

8890
func (m *testModel) fit() [numResourceTiers][numBurstQualifications]int64 {
89-
// TODO(): Test ret value.
90-
return [numResourceTiers][numBurstQualifications]int64{}
91+
return m.delta
9192
}
9293

9394
func (m *testModel) getRefillRates() [numResourceTiers][numBurstQualifications]int64 {
@@ -130,10 +131,8 @@ func TestCPUTimeTokenAllocator(t *testing.T) {
130131
}
131132

132133
var buf strings.Builder
133-
flushAndReset := func(printGranter bool) string {
134-
if printGranter {
135-
fmt.Fprint(&buf, granter.String())
136-
}
134+
flushAndReset := func() string {
135+
fmt.Fprint(&buf, granter.String())
137136
str := buf.String()
138137
buf.Reset()
139138
return str
@@ -142,27 +141,40 @@ func TestCPUTimeTokenAllocator(t *testing.T) {
142141
datadriven.RunTest(t, datapathutils.TestDataPath(t, "cpu_time_token_allocator"), func(t *testing.T, d *datadriven.TestData) string {
143142
switch d.Cmd {
144143
case "resetInterval":
145-
// TODO(): Test when true also.
146-
allocator.resetInterval(false)
147-
return flushAndReset(false /* printGranter */)
144+
var delta int64
145+
d.MaybeScanArgs(t, "delta", &delta)
146+
if d.MaybeScanArgs(t, "delta", &delta) {
147+
for tier := range model.delta {
148+
for qual := range model.delta[tier] {
149+
model.delta[tier][qual] = delta
150+
}
151+
}
152+
}
153+
skipFit := d.HasArg("skipfit")
154+
allocator.resetInterval(skipFit /* skipFittingLinearModel */)
155+
for tier := range model.delta {
156+
for qual := range model.delta[tier] {
157+
model.delta[tier][qual] = 0
158+
}
159+
}
160+
return flushAndReset()
148161
case "allocate":
149162
var remainingTicks int64
150163
d.ScanArgs(t, "remaining", &remainingTicks)
151164
allocator.allocateTokens(remainingTicks)
152-
return flushAndReset(true /* printGranter */)
165+
return flushAndReset()
153166
case "clear":
154167
granter.mu.buckets[testTier0][canBurst].tokens = 0
155168
granter.mu.buckets[testTier0][noBurst].tokens = 0
156169
granter.mu.buckets[testTier1][canBurst].tokens = 0
157170
granter.mu.buckets[testTier1][noBurst].tokens = 0
158-
return flushAndReset(true /* printGranter */)
171+
return flushAndReset()
159172
default:
160173
return fmt.Sprintf("unknown command: %s", d.Cmd)
161174
}
162175
})
163176
}
164177

165-
// TODO(): Complete.
166178
func TestCPUTimeTokenLinearModel(t *testing.T) {
167179
defer leaktest.AfterTest(t)()
168180
defer log.Scope(t).Close(t)
@@ -176,122 +188,143 @@ func TestCPUTimeTokenLinearModel(t *testing.T) {
176188
totalCPUTimeMillis: 0,
177189
tokenToCPUTimeMultiplier: 1,
178190
}
179-
dur := 5 * time.Second
180-
ttt := &tt{}
181-
model.granter = ttt
182-
m := &mm{
191+
tokenCPUTime := &testTokenUsageTracker{}
192+
model.granter = tokenCPUTime
193+
actualCPUTime := &testCPUMetricProvider{
183194
capacity: 10,
184195
}
185-
model.cpuMetricProvider = m
186-
187-
// Up & down.
188-
ttt.append(dur.Nanoseconds()/2, 200)
189-
ttt.append(dur.Nanoseconds()*2, 100)
190-
m.append(dur.Milliseconds(), 100)
191-
m.append(dur.Milliseconds()*2, 100)
192-
m.append(dur.Milliseconds()*2, 100)
193-
for i := 0; i < len(ttt.tokensUsed); i++ {
196+
model.cpuMetricProvider = actualCPUTime
197+
198+
dur := 5 * time.Second
199+
actualCPUTime.append(dur.Nanoseconds(), 1) // appended value ignored by init
200+
model.init()
201+
202+
// 2x.
203+
tokenCPUTime.append(dur.Nanoseconds()/2, 100)
204+
actualCPUTime.append(dur.Milliseconds(), 100)
205+
for i := 0; i < 100; i++ {
194206
testTime.Advance(time.Second)
195207
model.fit()
196-
if (i+1)%100 == 0 {
197-
fmt.Printf("%f\n", model.tokenToCPUTimeMultiplier)
198-
}
199208
}
200-
// TODO(): Check rates.
201-
//rates := model.getRates()
202-
//want := 4 * time.Second
203-
//require.Equal(t, want.Nanoseconds(), rates[0][canBurst])
204-
205-
// Cap 20x
206-
ttt.append(dur.Nanoseconds(), 100)
207-
m.append(dur.Milliseconds()*40, 100)
209+
tolerance := 0.01
210+
require.InDelta(t, 2, model.tokenToCPUTimeMultiplier, tolerance)
211+
212+
// 4x.
213+
tokenCPUTime.append(dur.Nanoseconds()/2, 100)
214+
actualCPUTime.append(dur.Milliseconds()*2, 100)
208215
for i := 0; i < 100; i++ {
209216
testTime.Advance(time.Second)
210217
model.fit()
211-
if i == 99 {
212-
fmt.Printf("%f\n", model.tokenToCPUTimeMultiplier)
213-
}
214218
}
219+
require.InDelta(t, 4, model.tokenToCPUTimeMultiplier, tolerance)
215220

216-
// Cap 1x
217-
ttt.append(dur.Nanoseconds()*2, 100)
218-
m.append(dur.Milliseconds(), 100)
221+
// 1x.
222+
tokenCPUTime.append(dur.Nanoseconds()*2, 100)
223+
actualCPUTime.append(dur.Milliseconds()*2, 100)
219224
for i := 0; i < 100; i++ {
220225
testTime.Advance(time.Second)
221226
model.fit()
222-
if i == 99 {
223-
fmt.Printf("%f\n", model.tokenToCPUTimeMultiplier)
224-
}
225227
}
228+
require.InDelta(t, 1, model.tokenToCPUTimeMultiplier, tolerance)
226229

227-
// Low CPU mode
228-
ttt = &tt{}
229-
model.granter = ttt
230-
m = &mm{
231-
capacity: 10,
230+
// Cap at 20x.
231+
tokenCPUTime.append(dur.Nanoseconds(), 100)
232+
actualCPUTime.append(dur.Milliseconds()*40, 100)
233+
for i := 0; i < 100; i++ {
234+
testTime.Advance(time.Second)
235+
model.fit()
232236
}
233-
model.cpuMetricProvider = m
234-
235-
// Leave 2x as is
236-
ttt.append(dur.Nanoseconds(), 100)
237-
m.append(dur.Milliseconds()*2, 100)
238-
ttt.append(dur.Nanoseconds()/5, 100)
239-
m.append(dur.Milliseconds()/5, 100)
240-
for i := 0; i < 200; i++ {
237+
require.InDelta(t, 20, model.tokenToCPUTimeMultiplier, tolerance)
238+
239+
// Cap at 1x.
240+
tokenCPUTime.append(dur.Nanoseconds()*2, 100)
241+
actualCPUTime.append(dur.Milliseconds(), 100)
242+
for i := 0; i < 100; i++ {
241243
testTime.Advance(time.Second)
242244
model.fit()
243-
if i == 199 {
244-
fmt.Printf("%f\n", model.tokenToCPUTimeMultiplier)
245-
}
246245
}
246+
require.InDelta(t, 1, model.tokenToCPUTimeMultiplier, tolerance)
247247

248-
// Cut 10x, too high
249-
ttt.append(dur.Nanoseconds(), 100)
250-
m.append(dur.Milliseconds()*100, 100)
251-
ttt.append(dur.Nanoseconds()/5, 100)
252-
m.append(dur.Milliseconds()/5, 100)
253-
for i := 0; i < 200; i++ {
248+
// 2x.
249+
tokenCPUTime.append(dur.Nanoseconds(), 100)
250+
actualCPUTime.append(dur.Milliseconds()*2, 100)
251+
for i := 0; i < 100; i++ {
252+
testTime.Advance(time.Second)
253+
model.fit()
254+
}
255+
require.InDelta(t, 2, model.tokenToCPUTimeMultiplier, tolerance)
256+
257+
// Leave 2x as is, even tho low CPU mode, since multiplier is already low.
258+
tokenCPUTime.append(dur.Nanoseconds()/5, 100)
259+
actualCPUTime.append(dur.Milliseconds()/5, 100)
260+
for i := 0; i < 100; i++ {
261+
testTime.Advance(time.Second)
262+
model.fit()
263+
}
264+
require.InDelta(t, 2, model.tokenToCPUTimeMultiplier, tolerance)
265+
266+
// 20x.
267+
tokenCPUTime.append(dur.Nanoseconds(), 100)
268+
actualCPUTime.append(dur.Milliseconds()*100, 100)
269+
for i := 0; i < 100; i++ {
270+
testTime.Advance(time.Second)
271+
model.fit()
272+
}
273+
require.InDelta(t, 20, model.tokenToCPUTimeMultiplier, tolerance)
274+
275+
// Reduce to 3.6x, since low CPU mode, and multiplier is high.
276+
tokenCPUTime.append(dur.Nanoseconds()/5, 100)
277+
actualCPUTime.append(dur.Milliseconds()/5, 100)
278+
for i := 0; i < 100; i++ {
254279
testTime.Advance(time.Second)
255280
model.fit()
256-
if i == 199 {
257-
fmt.Printf("%f\n", model.tokenToCPUTimeMultiplier)
258-
}
259281
}
282+
require.InDelta(t, 3.6, model.tokenToCPUTimeMultiplier, tolerance)
283+
284+
rates := model.getRefillRates()
285+
// Hard-coded to be 0.
286+
// 95% -> 10 vCPUs * .95 * 1s = 9.5s, 9.5s / 3.6 ~= 2.63888889
287+
require.Equal(t, int64(2638888888), rates[systemTenant][canBurst])
288+
// 90% -> 10 vCPUs * .9 * 1s = 9s, 9s / 3.6 ~= 2.5s
289+
require.Equal(t, int64(2500000000), rates[systemTenant][noBurst])
290+
// 85% -> 10 vCPUs * .85 * 1s = 8.5s, 8.5s / 3.6 ~= 2.36111111s
291+
require.Equal(t, int64(2361111111), rates[appTenant][canBurst])
292+
// 80% -> 10 vCPUs * .8 * 1s = 8s, 8s / 3.6 ~= 2.22222222s
293+
require.Equal(t, int64(2222222222), rates[appTenant][noBurst])
260294
}
261295

262-
type tt struct {
296+
type testTokenUsageTracker struct {
263297
i int
264298
tokensUsed []int64
265299
}
266300

267-
func (t *tt) append(tokens int64, count int) {
268-
// TODO(): Modernize.
301+
func (t *testTokenUsageTracker) append(tokens int64, count int) {
269302
for i := 0; i < count; i++ {
270303
t.tokensUsed = append(t.tokensUsed, tokens)
271304
}
272305
}
273306

274-
func (t *tt) getTokensUsedInInterval() int64 {
307+
func (t *testTokenUsageTracker) getTokensUsedInInterval() int64 {
275308
ret := t.tokensUsed[t.i]
276309
t.i++
277310
return ret
278311
}
279312

280-
type mm struct {
313+
type testCPUMetricProvider struct {
281314
i int
282315
cum int64
283316
millis []int64
284317
capacity float64
285318
}
286319

287-
func (m *mm) GetCPUInfo() (int64, float64) {
320+
func (m *testCPUMetricProvider) GetCPUInfo() (int64, float64) {
288321
cycle := m.millis[m.i]
289322
m.i++
290323
m.cum += cycle
291324
return m.cum, m.capacity
292325
}
293326

294-
func (t *mm) append(millis int64, count int) {
327+
func (t *testCPUMetricProvider) append(millis int64, count int) {
295328
for i := 0; i < count; i++ {
296329
t.millis = append(t.millis, millis)
297330
}

pkg/util/admission/cpu_time_token_granter.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,6 @@ func (stg *cpuTimeTokenGranter) tryGrantLocked() bool {
253253

254254
// getTokensUsedInInterval returns the net number of tokens deducted from the
255255
// buckets, since the last call to getTokensUsedInInterval.
256-
// TODO(): Test.
257256
func (stg *cpuTimeTokenGranter) getTokensUsedInInterval() int64 {
258257
stg.mu.Lock()
259258
defer stg.mu.Unlock()

pkg/util/admission/cpu_time_token_granter_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ func TestCPUTimeTokenGranter(t *testing.T) {
167167
fmt.Fprintf(&buf, "refill(%v %v)\n", delta, bucketCapacity)
168168
return flushAndReset(false /* init */)
169169

170+
case "get-tokens-used":
171+
used := granter.getTokensUsedInInterval()
172+
fmt.Fprintf(&buf, "get-tokens-used-in-interval() returned %d\n", used)
173+
return flushAndReset(false /* init */)
174+
170175
// For cpuTimeTokenChildGranter, this is a NOP. Still, it will be
171176
// called in production. So best to test it doesn't panic, or similar.
172177
case "continue-grant-chain":

0 commit comments

Comments
 (0)