Skip to content

Commit b3480df

Browse files
committed
admission: compute cpu time token refill rates
This commit introduces a linear model that computes cpu time token refill rates. cpuTimeTokenFiller uses this model to determine how many tokens to add per second to the buckets in cpuTimeTokenGranter. Fixes: #154471 Release note: None.
1 parent 02c7429 commit b3480df

File tree

7 files changed

+536
-47
lines changed

7 files changed

+536
-47
lines changed

pkg/util/admission/cpu_time_token_filler.go

Lines changed: 232 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,25 @@ package admission
88
import (
99
"time"
1010

11+
"github.com/cockroachdb/cockroach/pkg/settings"
12+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1113
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1214
"github.com/cockroachdb/errors"
1315
)
1416

17+
var KVCPUTimeUtilGoal = settings.RegisterFloatSetting(
18+
settings.SystemOnly,
19+
"admission.cpu_time_tokens.target_util",
20+
"the target CPU utilization for the KV CPU time token system", 0.8)
21+
1522
// timePerTick is how frequently cpuTimeTokenFiller ticks its time.Ticker & adds
1623
// tokens to the buckets. Must be < 1s. Must divide 1s evenly.
1724
const timePerTick = 1 * time.Millisecond
1825

1926
// cpuTimeTokenFiller starts a goroutine which periodically calls
2027
// cpuTimeTokenAllocator to add tokens to a cpuTimeTokenGranter. For example, on
2128
// an 8 vCPU machine, we may want to allow burstable tier-0 work to use 6 seconds
22-
// of CPU time per second. Then cpuTimeTokenAllocator.rates[tier0][canBurst] would
29+
// of CPU time per second. Then the refill rates for tier0 burstable work would
2330
// equal 6 seconds per second, and cpuTimeTokenFiller would add 6 seconds of token
2431
// every second, but smoothly -- 1ms at a time. See cpuTimeTokenGranter for details
2532
// on the multi-dimensional token buckets owned by cpuTimeTokenGranter; the TLDR is
@@ -55,13 +62,16 @@ type cpuTimeTokenFiller struct {
5562
tickCh *chan struct{}
5663
}
5764

58-
// tokenAllocator abstracts cpuTimeTokenAllocator for testing.
59-
type tokenAllocator interface {
60-
allocateTokens(remainingTicksInInInterval int64)
61-
resetInterval()
65+
func newCPUTimeTokenFiller() *cpuTimeTokenFiller {
66+
return &cpuTimeTokenFiller{}
6267
}
6368

6469
func (f *cpuTimeTokenFiller) start() {
70+
// The token buckets starts full.
71+
f.allocator.init()
72+
f.allocator.allocateTokens(1)
73+
f.allocator.resetInterval(true /* skipFittingLinearModel */)
74+
6575
ticker := f.timeSource.NewTicker(timePerTick)
6676
intervalStart := f.timeSource.Now()
6777
// Every 1s a new interval starts. every timePerTick time token allocation
@@ -85,7 +95,7 @@ func (f *cpuTimeTokenFiller) start() {
8595
f.allocator.allocateTokens(1)
8696
}
8797
intervalStart = t
88-
f.allocator.resetInterval()
98+
f.allocator.resetInterval(false /* skipFittingLinearModel */)
8999
remainingTicks = int64(time.Second / timePerTick)
90100
} else {
91101
remainingSinceIntervalStart := time.Second - elapsedSinceIntervalStart
@@ -111,31 +121,36 @@ func (f *cpuTimeTokenFiller) start() {
111121
}()
112122
}
113123

124+
// tokenAllocator abstracts cpuTimeTokenAllocator for testing.
125+
type tokenAllocator interface {
126+
init()
127+
allocateTokens(remainingTicksInInInterval int64)
128+
resetInterval(skipFittingLinearModel bool)
129+
}
130+
114131
// cpuTimeTokenAllocator allocates tokens to a cpuTimeTokenGranter. See the comment
115132
// above cpuTimeTokenFiller for a high level picture. The responsibility of
116-
// cpuTimeTokenAllocator is to gradually allocate rates tokens every interval,
117-
// while respecting bucketCapacity. We have split up the ticking & token allocation
118-
// logic, in order to improve clarity & testability.
133+
// cpuTimeTokenAllocator is to gradually allocate tokens every interval,
134+
// while respecting bucketCapacity. The computation of the rate of tokens to add
135+
// every interval is left to cpuTimeTokenLinearModel.
119136
type cpuTimeTokenAllocator struct {
120137
granter *cpuTimeTokenGranter
138+
model cpuTimeModel
121139

122-
// Mutable fields. No mutex, since only a single goroutine will call the
123-
// cpuTimeTokenAllocator.
124-
125-
// rates stores the number of token added to each bucket every interval.
126-
rates [numResourceTiers][numBurstQualifications]int64
127-
// bucketCapacity stores the maximum number of tokens that can be in each bucket.
128-
// That is, if a bucket is already at capacity, no more tokens will be added.
129-
bucketCapacity [numResourceTiers][numBurstQualifications]int64
130140
// allocated stores the number of tokens added to each bucket in the current
131-
// interval.
141+
// cpuTimeTokenAllocator. No mutex, since only a single goroutine will call
142+
// the interval.
132143
allocated [numResourceTiers][numBurstQualifications]int64
133144
}
134145

135146
var _ tokenAllocator = &cpuTimeTokenAllocator{}
136147

148+
func (a *cpuTimeTokenAllocator) init() {
149+
a.model.init()
150+
}
151+
137152
// allocateTokens allocates tokens to a cpuTimeTokenGranter. allocateTokens
138-
// adds rates tokens every interval, while respecting bucketCapacity.
153+
// adds the desired number of tokens every interval, while respecting bucketCapacity.
139154
// allocateTokens adds tokens evenly among the expected remaining ticks in
140155
// the interval.
141156
// INVARIANT: remainingTicks >= 1.
@@ -156,24 +171,216 @@ func (a *cpuTimeTokenAllocator) allocateTokens(expectedRemainingTicksInInterval
156171
return toAllocate
157172
}
158173

174+
rates := a.model.getRefillRates()
159175
var delta [numResourceTiers][numBurstQualifications]int64
160-
for wc := range a.rates {
161-
for kind := range a.rates[wc] {
176+
for wc := range rates {
177+
for kind := range rates[wc] {
162178
toAllocateTokens := allocateFunc(
163-
a.rates[wc][kind], a.allocated[wc][kind], expectedRemainingTicksInInterval)
179+
rates[wc][kind], a.allocated[wc][kind], expectedRemainingTicksInInterval)
164180
a.allocated[wc][kind] += toAllocateTokens
165181
delta[wc][kind] = toAllocateTokens
166182
}
167183
}
168-
a.granter.refill(delta, a.bucketCapacity)
184+
a.granter.refill(delta, rates)
169185
}
170186

171187
// resetInterval is called to signal the beginning of a new interval. allocateTokens
172-
// adds rates tokens every interval.
173-
func (a *cpuTimeTokenAllocator) resetInterval() {
188+
// adds the desired number of tokens every interval.
189+
func (a *cpuTimeTokenAllocator) resetInterval(skipFittingLinearModel bool) {
190+
if !skipFittingLinearModel {
191+
// delta is the difference in tokens to add per interval (1s) from previous
192+
// call to fit to this one. We add it immediately to the bucket. The model itself
193+
// handles filtering.
194+
delta := a.model.fit()
195+
a.granter.refill(delta, a.model.getRefillRates())
196+
}
174197
for wc := range a.allocated {
175198
for kind := range a.allocated[wc] {
176199
a.allocated[wc][kind] = 0
177200
}
178201
}
179202
}
203+
204+
// cpuTimeModel abstracts cpuTimeLinearModel for testing.
205+
type cpuTimeModel interface {
206+
init()
207+
fit() [numResourceTiers][numBurstQualifications]int64
208+
getRefillRates() [numResourceTiers][numBurstQualifications]int64
209+
}
210+
211+
// cpuTimeTokenLinearModel computes the number of CPU time tokens to add
212+
// to each bucket in the cpuTimeTokenGranter, per interval (per 1s). As is
213+
// discussed in the cpuTimeTokenGranter docs, the buckets are arranged in a
214+
// priority hierarchy; some buckets always have more tokens added per second
215+
// than others.
216+
//
217+
// Consider the refill rate of the lowest priority bucket first. In that case:
218+
// refillRate = 1s * vCPU count * targetUtilization * linearCorrectionTerm
219+
//
220+
// This formula is intuitive if linearCorrectionTerm equals 1. If CPU time
221+
// tokens corresponded exactly to actual CPU time, e.g. one token corresponds
222+
// to one nanosecond of CPU time, it would imply the token bucket will limit
223+
// the CPU used by requests subject to it to targetUtilization. Note that
224+
// targetUtilization is controllable via the admission.cpu_time_tokens.target_util
225+
// cluster setting.
226+
//
227+
// Example:
228+
// 8 vCPU machine. admission.cpu_time_tokens.target_util = 0.8 (80%)
229+
// RefillRate = 8 * 1s * .8 = 6.4 seconds of CPU time per second
230+
//
231+
// linearCorrectionTerm is a correction term derived from a linear model (hence
232+
// the name of the struct). There is work happening outside the kvserver BatchRequest
233+
// evaluation path, such as compaction. AC continuously fits a linear model:
234+
// total-cpu-time = linearCorrectionTerm * reported-cpu-time, where a is forced to be
235+
// in the interval [1, 20].
236+
//
237+
// The higher priority buckets have higher target utlizations than the lowest
238+
// priority one. The delta between the target utlizations is fixed, e.g.
239+
// burstable tenant work has a 5% higher target utilization than non-burstable.
240+
type cpuTimeTokenLinearModel struct {
241+
granter tokenUsageTracker
242+
settings *cluster.Settings
243+
cpuMetricProvider CPUMetricProvider
244+
timeSource timeutil.TimeSource
245+
246+
// The time that fit was called last.
247+
lastFitTime time.Time
248+
// The comulative user/sys CPU time used since process start in
249+
// milliseconds.
250+
totalCPUTimeMillis int64
251+
// The CPU capacity measured in vCPUs.
252+
cpuCapacity float64
253+
// The lineabr correction term, see the docs above cpuTimeTokenLinearModel.
254+
tokenToCPUTimeMultiplier float64
255+
256+
// The number of CPU time tokens to add to each bucket per interval (1s).
257+
rates [numResourceTiers][numBurstQualifications]int64
258+
}
259+
260+
type tokenUsageTracker interface {
261+
getTokensUsedInInterval() int64
262+
}
263+
264+
type CPUMetricProvider interface {
265+
// GetCPUInfo returns the comulative user/sys CPU time used since process
266+
// start in milliseconds, and the cpuCapacity measured in vCPUs.
267+
GetCPUInfo() (totalCPUTimeMillis int64, cpuCapacity float64)
268+
}
269+
270+
// init sets tokenToCPUTImeMultipler to 1 & computes refill rates.
271+
func (m *cpuTimeTokenLinearModel) init() {
272+
_, cpuCapacity := m.cpuMetricProvider.GetCPUInfo()
273+
m.cpuCapacity = cpuCapacity
274+
m.tokenToCPUTimeMultiplier = 1
275+
_ = m.updateRefillRates()
276+
}
277+
278+
// fit adjusts tokenToCPUTimeMultiplier based on CPU usage & token usage. fit
279+
// computes refill rates from tokenToCPUTimeMultiplier & the admission.cpu_time_tokens.target_util
280+
// cluster setting. fit returns the delta refill rates. That is fit returns the difference in tokens
281+
// to add per interval (1s) from previous call to fit to this one.
282+
func (m *cpuTimeTokenLinearModel) fit() [numResourceTiers][numBurstQualifications]int64 {
283+
now := m.timeSource.Now()
284+
elapsedSinceLastFit := now.Sub(m.lastFitTime)
285+
m.lastFitTime = now
286+
287+
// Get used CPU tokens.
288+
tokensUsed := m.granter.getTokensUsedInInterval()
289+
// At admission time, an estimate of CPU time is deducted. After
290+
// the request is done processing, a correction based on a measurement
291+
// from grunning is deducted. Thus it is theoretically possible for net
292+
// tokens used to be <=0. In this case, we set tokensUsed to 1, so that
293+
// the computation of tokenToCPUTimeMultiplier is well-behaved.
294+
if tokensUsed <= 0 {
295+
tokensUsed = 1
296+
}
297+
298+
// Get used CPU time.
299+
totalCPUTimeMillis, _ := m.cpuMetricProvider.GetCPUInfo()
300+
intCPUTimeMillis := totalCPUTimeMillis - m.totalCPUTimeMillis
301+
// totalCPUTimeMillis is not necessarily monontonic in all envionrments,
302+
// e.g. in case of VM live migration on a public cloud provider.
303+
if intCPUTimeMillis < 0 {
304+
intCPUTimeMillis = 0
305+
}
306+
m.totalCPUTimeMillis = totalCPUTimeMillis
307+
intCPUTimeNanos := intCPUTimeMillis * 1e6
308+
309+
// Update multiplier.
310+
const lowCPUUtilFrac = 0.25
311+
isLowCPUUtil := intCPUTimeNanos < int64(float64(elapsedSinceLastFit)*m.cpuCapacity*lowCPUUtilFrac)
312+
if isLowCPUUtil {
313+
// Ensure that low CPU utilization is not due to a flawed tokenToCPUTimeMultiplier
314+
// by multiplicatively lowering it until we are below the upperBound. If we are already
315+
// below uppperBound, we make no adjustment.
316+
const upperBound = (1 / lowCPUUtilFrac) * 0.9 // 3.6
317+
if m.tokenToCPUTimeMultiplier > upperBound {
318+
m.tokenToCPUTimeMultiplier /= 1.5
319+
if m.tokenToCPUTimeMultiplier < upperBound {
320+
m.tokenToCPUTimeMultiplier = upperBound
321+
}
322+
}
323+
} else {
324+
tokenToCPUTimeMultiplier :=
325+
float64(intCPUTimeNanos) / float64(tokensUsed)
326+
// Mulitplier is forced into the interval [1, 20].
327+
if tokenToCPUTimeMultiplier > 20 {
328+
// Cap the multiplier.
329+
tokenToCPUTimeMultiplier = 20
330+
} else if tokenToCPUTimeMultiplier < 1 {
331+
// Likely because work is queued up in the goroutine scheduler.
332+
tokenToCPUTimeMultiplier = 1
333+
}
334+
// Decrease faster than increase. Giving out too many tokens can
335+
// lead to goroutine scheduling latency.
336+
alpha := 0.5
337+
if tokenToCPUTimeMultiplier < m.tokenToCPUTimeMultiplier {
338+
alpha = 0.8
339+
}
340+
341+
// Exponentially filter changes to the multiplier. 1s of data is noisy,
342+
// so filtering is necessary.
343+
m.tokenToCPUTimeMultiplier =
344+
alpha*tokenToCPUTimeMultiplier + (1-alpha)*m.tokenToCPUTimeMultiplier
345+
}
346+
347+
return m.updateRefillRates()
348+
}
349+
350+
// updateRefillRates computes refill rates from tokenToCPUTimeMultiplier &
351+
// the admission.cpu_time_tokens.target_util cluster setting. updateRefillRates
352+
// returns the delta refill rates. That is updateRefillRates returns the difference
353+
// in tokens to add per interval (1s) from previous call to fit to this one.
354+
func (m *cpuTimeTokenLinearModel) updateRefillRates() [numResourceTiers][numBurstQualifications]int64 {
355+
// Compute goals from cluster setting. Algorithmically, it is okay if some of
356+
// the below goalUtils are greater than 1. This would mean greater risk of
357+
// goroutine scheduling latency, but there is no immediate problem -- the
358+
// greater some goalUtil is, the more CPU time tokens will be in the corresponding
359+
// bucket.
360+
var goalUtils [numResourceTiers][numBurstQualifications]float64
361+
util := KVCPUTimeUtilGoal.Get(&m.settings.SV)
362+
var iter float64
363+
for tier := int(numResourceTiers - 1); tier >= 0; tier-- {
364+
for qual := int(numBurstQualifications - 1); qual >= 0; qual-- {
365+
goalUtils[tier][qual] = util + 0.05*iter
366+
iter++
367+
}
368+
}
369+
370+
// Update refill rates. Return change in rates via delta.
371+
var delta [numResourceTiers][numBurstQualifications]int64
372+
for tier := range goalUtils {
373+
for qual := range goalUtils[tier] {
374+
newRate :=
375+
int64(m.cpuCapacity * float64(time.Second) * goalUtils[tier][qual] / m.tokenToCPUTimeMultiplier)
376+
delta[tier][qual] = newRate - m.rates[tier][qual]
377+
m.rates[tier][qual] = newRate
378+
}
379+
}
380+
return delta
381+
}
382+
383+
// getRefillRates returns the number of CPU time tokens to add to each bucket per interval (1s).
384+
func (m *cpuTimeTokenLinearModel) getRefillRates() [numResourceTiers][numBurstQualifications]int64 {
385+
return m.rates
386+
}

0 commit comments

Comments
 (0)