@@ -8,18 +8,25 @@ package admission
88import (
99 "time"
1010
11+ "github.com/cockroachdb/cockroach/pkg/settings"
12+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
1113 "github.com/cockroachdb/cockroach/pkg/util/timeutil"
1214 "github.com/cockroachdb/errors"
1315)
1416
17+ var KVCPUTimeUtilGoal = settings .RegisterFloatSetting (
18+ settings .SystemOnly ,
19+ "admission.cpu_time_tokens.target_util" ,
20+ "the target CPU utilization for the KV CPU time token system" , 0.8 )
21+
1522// timePerTick is how frequently cpuTimeTokenFiller ticks its time.Ticker & adds
1623// tokens to the buckets. Must be < 1s. Must divide 1s evenly.
1724const timePerTick = 1 * time .Millisecond
1825
1926// cpuTimeTokenFiller starts a goroutine which periodically calls
2027// cpuTimeTokenAllocator to add tokens to a cpuTimeTokenGranter. For example, on
2128// an 8 vCPU machine, we may want to allow burstable tier-0 work to use 6 seconds
22- // of CPU time per second. Then cpuTimeTokenAllocator. rates[ tier0][canBurst] would
29+ // of CPU time per second. Then the refill rates for tier0 burstable work would
2330// equal 6 seconds per second, and cpuTimeTokenFiller would add 6 seconds of token
2431// every second, but smoothly -- 1ms at a time. See cpuTimeTokenGranter for details
2532// on the multi-dimensional token buckets owned by cpuTimeTokenGranter; the TLDR is
@@ -55,13 +62,12 @@ type cpuTimeTokenFiller struct {
5562 tickCh * chan struct {}
5663}
5764
58- // tokenAllocator abstracts cpuTimeTokenAllocator for testing.
59- type tokenAllocator interface {
60- allocateTokens (remainingTicksInInInterval int64 )
61- resetInterval ()
62- }
63-
6465func (f * cpuTimeTokenFiller ) start () {
66+ // The token buckets starts full.
67+ f .allocator .init ()
68+ f .allocator .allocateTokens (1 )
69+ f .allocator .resetInterval (true /* skipFittingLinearModel */ )
70+
6571 ticker := f .timeSource .NewTicker (timePerTick )
6672 intervalStart := f .timeSource .Now ()
6773 // Every 1s a new interval starts. every timePerTick time token allocation
@@ -85,7 +91,7 @@ func (f *cpuTimeTokenFiller) start() {
8591 f .allocator .allocateTokens (1 )
8692 }
8793 intervalStart = t
88- f .allocator .resetInterval ()
94+ f .allocator .resetInterval (false /* skipFittingLinearModel */ )
8995 remainingTicks = int64 (time .Second / timePerTick )
9096 } else {
9197 remainingSinceIntervalStart := time .Second - elapsedSinceIntervalStart
@@ -111,31 +117,36 @@ func (f *cpuTimeTokenFiller) start() {
111117 }()
112118}
113119
120+ // tokenAllocator abstracts cpuTimeTokenAllocator for testing.
121+ type tokenAllocator interface {
122+ init ()
123+ allocateTokens (remainingTicksInInInterval int64 )
124+ resetInterval (skipFittingLinearModel bool )
125+ }
126+
114127// cpuTimeTokenAllocator allocates tokens to a cpuTimeTokenGranter. See the comment
115128// above cpuTimeTokenFiller for a high level picture. The responsibility of
116- // cpuTimeTokenAllocator is to gradually allocate rates tokens every interval,
117- // while respecting bucketCapacity. We have split up the ticking & token allocation
118- // logic, in order to improve clarity & testability .
129+ // cpuTimeTokenAllocator is to gradually allocate tokens every interval,
130+ // while respecting bucketCapacity. The computation of the rate of tokens to add
131+ // every interval is left to cpuTimeTokenLinearModel .
119132type cpuTimeTokenAllocator struct {
120133 granter * cpuTimeTokenGranter
134+ model cpuTimeModel
121135
122- // Mutable fields. No mutex, since only a single goroutine will call the
123- // cpuTimeTokenAllocator.
124-
125- // rates stores the number of token added to each bucket every interval.
126- rates [numResourceTiers ][numBurstQualifications ]int64
127- // bucketCapacity stores the maximum number of tokens that can be in each bucket.
128- // That is, if a bucket is already at capacity, no more tokens will be added.
129- bucketCapacity [numResourceTiers ][numBurstQualifications ]int64
130136 // allocated stores the number of tokens added to each bucket in the current
131- // interval.
137+ // cpuTimeTokenAllocator. No mutex, since only a single goroutine will call
138+ // the interval.
132139 allocated [numResourceTiers ][numBurstQualifications ]int64
133140}
134141
135142var _ tokenAllocator = & cpuTimeTokenAllocator {}
136143
144+ func (a * cpuTimeTokenAllocator ) init () {
145+ a .model .init ()
146+ }
147+
137148// allocateTokens allocates tokens to a cpuTimeTokenGranter. allocateTokens
138- // adds rates tokens every interval, while respecting bucketCapacity.
149+ // adds the desired number of tokens every interval, while respecting bucketCapacity.
139150// allocateTokens adds tokens evenly among the expected remaining ticks in
140151// the interval.
141152// INVARIANT: remainingTicks >= 1.
@@ -156,24 +167,217 @@ func (a *cpuTimeTokenAllocator) allocateTokens(expectedRemainingTicksInInterval
156167 return toAllocate
157168 }
158169
170+ rates := a .model .getRefillRates ()
159171 var delta [numResourceTiers ][numBurstQualifications ]int64
160- for wc := range a . rates {
161- for kind := range a . rates [wc ] {
172+ for wc := range rates {
173+ for kind := range rates [wc ] {
162174 toAllocateTokens := allocateFunc (
163- a. rates [wc ][kind ], a.allocated [wc ][kind ], expectedRemainingTicksInInterval )
175+ rates [wc ][kind ], a.allocated [wc ][kind ], expectedRemainingTicksInInterval )
164176 a.allocated [wc ][kind ] += toAllocateTokens
165177 delta [wc ][kind ] = toAllocateTokens
166178 }
167179 }
168- a .granter .refill (delta , a . bucketCapacity )
180+ a .granter .refill (delta , rates )
169181}
170182
171183// resetInterval is called to signal the beginning of a new interval. allocateTokens
172- // adds rates tokens every interval.
173- func (a * cpuTimeTokenAllocator ) resetInterval () {
184+ // adds the desired number of tokens every interval.
185+ func (a * cpuTimeTokenAllocator ) resetInterval (skipFittingLinearModel bool ) {
186+ if ! skipFittingLinearModel {
187+ // delta is the difference in tokens to add per interval (1s) from previous
188+ // call to fit to this one. We add it immediately to the bucket. The model itself
189+ // handles filtering.
190+ delta := a .model .fit ()
191+ a .granter .refill (delta , a .model .getRefillRates ())
192+ }
174193 for wc := range a .allocated {
175194 for kind := range a .allocated [wc ] {
176195 a.allocated [wc ][kind ] = 0
177196 }
178197 }
179198}
199+
200+ // cpuTimeModel abstracts cpuTimeLinearModel for testing.
201+ type cpuTimeModel interface {
202+ init ()
203+ fit () [numResourceTiers ][numBurstQualifications ]int64
204+ getRefillRates () [numResourceTiers ][numBurstQualifications ]int64
205+ }
206+
207+ // cpuTimeTokenLinearModel computes the number of CPU time tokens to add
208+ // to each bucket in the cpuTimeTokenGranter, per interval (per 1s). As is
209+ // discussed in the cpuTimeTokenGranter docs, the buckets are arranged in a
210+ // priority hierarchy; some buckets always have more tokens added per second
211+ // than others.
212+ //
213+ // Consider the refill rate of the lowest priority bucket first. In that case:
214+ // refillRate = 1s * vCPU count * targetUtilization * linearCorrectionTerm
215+ //
216+ // This formula is intuitive if linearCorrectionTerm equals 1. If CPU time
217+ // tokens corresponded exactly to actual CPU time, e.g. one token corresponds
218+ // to one nanosecond of CPU time, it would imply the token bucket will limit
219+ // the CPU used by requests subject to it to targetUtilization. Note that
220+ // targetUtilization is controllable via the admission.cpu_time_tokens.target_util
221+ // cluster setting.
222+ //
223+ // Example:
224+ // 8 vCPU machine. admission.cpu_time_tokens.target_util = 0.8 (80%)
225+ // RefillRate = 8 * 1s * .8 = 6.4 seconds of CPU time per second
226+ //
227+ // linearCorrectionTerm is a correction term derived from a linear model (hence
228+ // the name of the struct). There is work happening outside the kvserver BatchRequest
229+ // evaluation path, such as compaction. AC continuously fits a linear model:
230+ // total-cpu-time = linearCorrectionTerm * reported-cpu-time, where a is forced to be
231+ // in the interval [1, 20].
232+ //
233+ // The higher priority buckets have higher target utlizations than the lowest
234+ // priority one. The delta between the target utlizations is fixed, e.g.
235+ // burstable tenant work has a 5% higher target utilization than non-burstable.
236+ type cpuTimeTokenLinearModel struct {
237+ granter tokenUsageTracker
238+ settings * cluster.Settings
239+ cpuMetricProvider CPUMetricProvider
240+ timeSource timeutil.TimeSource
241+
242+ // The time that fit was called last.
243+ lastFitTime time.Time
244+ // The comulative user/sys CPU time used since process start in
245+ // milliseconds.
246+ totalCPUTimeMillis int64
247+ // The CPU capacity measured in vCPUs.
248+ cpuCapacity float64
249+ // The lineabr correction term, see the docs above cpuTimeTokenLinearModel.
250+ tokenToCPUTimeMultiplier float64
251+
252+ // The number of CPU time tokens to add to each bucket per interval (1s).
253+ rates [numResourceTiers ][numBurstQualifications ]int64
254+ }
255+
256+ type tokenUsageTracker interface {
257+ getTokensUsedInInterval () int64
258+ }
259+
260+ type CPUMetricProvider interface {
261+ // GetCPUInfo returns the comulative user/sys CPU time used since process
262+ // start in milliseconds, and the cpuCapacity measured in vCPUs.
263+ GetCPUInfo () (totalCPUTimeMillis int64 , cpuCapacity float64 )
264+ }
265+
266+ // init sets tokenToCPUTImeMultipler to 1 & computes refill rates.
267+ func (m * cpuTimeTokenLinearModel ) init () {
268+ m .lastFitTime = m .timeSource .Now ()
269+ _ , cpuCapacity := m .cpuMetricProvider .GetCPUInfo ()
270+ m .cpuCapacity = cpuCapacity
271+ m .tokenToCPUTimeMultiplier = 1
272+ _ = m .updateRefillRates ()
273+ }
274+
275+ // fit adjusts tokenToCPUTimeMultiplier based on CPU usage & token usage. fit
276+ // computes refill rates from tokenToCPUTimeMultiplier & the admission.cpu_time_tokens.target_util
277+ // cluster setting. fit returns the delta refill rates. That is fit returns the difference in tokens
278+ // to add per interval (1s) from previous call to fit to this one.
279+ func (m * cpuTimeTokenLinearModel ) fit () [numResourceTiers ][numBurstQualifications ]int64 {
280+ now := m .timeSource .Now ()
281+ elapsedSinceLastFit := now .Sub (m .lastFitTime )
282+ m .lastFitTime = now
283+
284+ // Get used CPU tokens.
285+ tokensUsed := m .granter .getTokensUsedInInterval ()
286+ // At admission time, an estimate of CPU time is deducted. After
287+ // the request is done processing, a correction based on a measurement
288+ // from grunning is deducted. Thus it is theoretically possible for net
289+ // tokens used to be <=0. In this case, we set tokensUsed to 1, so that
290+ // the computation of tokenToCPUTimeMultiplier is well-behaved.
291+ if tokensUsed <= 0 {
292+ tokensUsed = 1
293+ }
294+
295+ // Get used CPU time.
296+ totalCPUTimeMillis , _ := m .cpuMetricProvider .GetCPUInfo ()
297+ intCPUTimeMillis := totalCPUTimeMillis - m .totalCPUTimeMillis
298+ // totalCPUTimeMillis is not necessarily monontonic in all envionrments,
299+ // e.g. in case of VM live migration on a public cloud provider.
300+ if intCPUTimeMillis < 0 {
301+ intCPUTimeMillis = 0
302+ }
303+ m .totalCPUTimeMillis = totalCPUTimeMillis
304+ intCPUTimeNanos := intCPUTimeMillis * 1e6
305+
306+ // Update multiplier.
307+ const lowCPUUtilFrac = 0.25
308+ isLowCPUUtil := intCPUTimeNanos < int64 (float64 (elapsedSinceLastFit )* m .cpuCapacity * lowCPUUtilFrac )
309+ if isLowCPUUtil {
310+ // Ensure that low CPU utilization is not due to a flawed tokenToCPUTimeMultiplier
311+ // by multiplicatively lowering it until we are below the upperBound. If we are already
312+ // below uppperBound, we make no adjustment.
313+ const upperBound = (1 / lowCPUUtilFrac ) * 0.9 // 3.6
314+ if m .tokenToCPUTimeMultiplier > upperBound {
315+ m .tokenToCPUTimeMultiplier /= 1.5
316+ if m .tokenToCPUTimeMultiplier < upperBound {
317+ m .tokenToCPUTimeMultiplier = upperBound
318+ }
319+ }
320+ } else {
321+ tokenToCPUTimeMultiplier :=
322+ float64 (intCPUTimeNanos ) / float64 (tokensUsed )
323+ // Mulitplier is forced into the interval [1, 20].
324+ if tokenToCPUTimeMultiplier > 20 {
325+ // Cap the multiplier.
326+ tokenToCPUTimeMultiplier = 20
327+ } else if tokenToCPUTimeMultiplier < 1 {
328+ // Likely because work is queued up in the goroutine scheduler.
329+ tokenToCPUTimeMultiplier = 1
330+ }
331+ // Decrease faster than increase. Giving out too many tokens can
332+ // lead to goroutine scheduling latency.
333+ alpha := 0.5
334+ if tokenToCPUTimeMultiplier < m .tokenToCPUTimeMultiplier {
335+ alpha = 0.8
336+ }
337+
338+ // Exponentially filter changes to the multiplier. 1s of data is noisy,
339+ // so filtering is necessary.
340+ m .tokenToCPUTimeMultiplier =
341+ alpha * tokenToCPUTimeMultiplier + (1 - alpha )* m .tokenToCPUTimeMultiplier
342+ }
343+
344+ return m .updateRefillRates ()
345+ }
346+
347+ // updateRefillRates computes refill rates from tokenToCPUTimeMultiplier &
348+ // the admission.cpu_time_tokens.target_util cluster setting. updateRefillRates
349+ // returns the delta refill rates. That is updateRefillRates returns the difference
350+ // in tokens to add per interval (1s) from previous call to fit to this one.
351+ func (m * cpuTimeTokenLinearModel ) updateRefillRates () [numResourceTiers ][numBurstQualifications ]int64 {
352+ // Compute goals from cluster setting. Algorithmically, it is okay if some of
353+ // the below goalUtils are greater than 1. This would mean greater risk of
354+ // goroutine scheduling latency, but there is no immediate problem -- the
355+ // greater some goalUtil is, the more CPU time tokens will be in the corresponding
356+ // bucket.
357+ var goalUtils [numResourceTiers ][numBurstQualifications ]float64
358+ util := KVCPUTimeUtilGoal .Get (& m .settings .SV )
359+ var iter float64
360+ for tier := int (numResourceTiers - 1 ); tier >= 0 ; tier -- {
361+ for qual := int (numBurstQualifications - 1 ); qual >= 0 ; qual -- {
362+ goalUtils [tier ][qual ] = util + 0.05 * iter
363+ iter ++
364+ }
365+ }
366+
367+ // Update refill rates. Return change in rates via delta.
368+ var delta [numResourceTiers ][numBurstQualifications ]int64
369+ for tier := range goalUtils {
370+ for qual := range goalUtils [tier ] {
371+ newRate :=
372+ int64 (m .cpuCapacity * float64 (time .Second ) * goalUtils [tier ][qual ] / m .tokenToCPUTimeMultiplier )
373+ delta [tier ][qual ] = newRate - m.rates [tier ][qual ]
374+ m.rates [tier ][qual ] = newRate
375+ }
376+ }
377+ return delta
378+ }
379+
380+ // getRefillRates returns the number of CPU time tokens to add to each bucket per interval (1s).
381+ func (m * cpuTimeTokenLinearModel ) getRefillRates () [numResourceTiers ][numBurstQualifications ]int64 {
382+ return m .rates
383+ }
0 commit comments