diff --git a/pkg/util/admission/cpu_time_token_filler.go b/pkg/util/admission/cpu_time_token_filler.go index be4993a50dba..fd7acc3624df 100644 --- a/pkg/util/admission/cpu_time_token_filler.go +++ b/pkg/util/admission/cpu_time_token_filler.go @@ -8,10 +8,17 @@ package admission import ( "time" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) +var KVCPUTimeUtilGoal = settings.RegisterFloatSetting( + settings.SystemOnly, + "admission.cpu_time_tokens.target_util", + "the target CPU utilization for the KV CPU time token system", 0.8) + // timePerTick is how frequently cpuTimeTokenFiller ticks its time.Ticker & adds // tokens to the buckets. Must be < 1s. Must divide 1s evenly. const timePerTick = 1 * time.Millisecond @@ -19,7 +26,7 @@ const timePerTick = 1 * time.Millisecond // cpuTimeTokenFiller starts a goroutine which periodically calls // cpuTimeTokenAllocator to add tokens to a cpuTimeTokenGranter. For example, on // an 8 vCPU machine, we may want to allow burstable tier-0 work to use 6 seconds -// of CPU time per second. Then cpuTimeTokenAllocator.rates[tier0][canBurst] would +// of CPU time per second. Then the refill rates for tier0 burstable work would // equal 6 seconds per second, and cpuTimeTokenFiller would add 6 seconds of token // every second, but smoothly -- 1ms at a time. See cpuTimeTokenGranter for details // on the multi-dimensional token buckets owned by cpuTimeTokenGranter; the TLDR is @@ -55,13 +62,12 @@ type cpuTimeTokenFiller struct { tickCh *chan struct{} } -// tokenAllocator abstracts cpuTimeTokenAllocator for testing. -type tokenAllocator interface { - allocateTokens(remainingTicksInInInterval int64) - resetInterval() -} - func (f *cpuTimeTokenFiller) start() { + // The token buckets starts full. + f.allocator.init() + f.allocator.allocateTokens(1) + f.allocator.resetInterval(true /* skipFittingLinearModel */) + ticker := f.timeSource.NewTicker(timePerTick) intervalStart := f.timeSource.Now() // Every 1s a new interval starts. every timePerTick time token allocation @@ -85,7 +91,7 @@ func (f *cpuTimeTokenFiller) start() { f.allocator.allocateTokens(1) } intervalStart = t - f.allocator.resetInterval() + f.allocator.resetInterval(false /* skipFittingLinearModel */) remainingTicks = int64(time.Second / timePerTick) } else { remainingSinceIntervalStart := time.Second - elapsedSinceIntervalStart @@ -111,31 +117,36 @@ func (f *cpuTimeTokenFiller) start() { }() } +// tokenAllocator abstracts cpuTimeTokenAllocator for testing. +type tokenAllocator interface { + init() + allocateTokens(remainingTicksInInInterval int64) + resetInterval(skipFittingLinearModel bool) +} + // cpuTimeTokenAllocator allocates tokens to a cpuTimeTokenGranter. See the comment // above cpuTimeTokenFiller for a high level picture. The responsibility of -// cpuTimeTokenAllocator is to gradually allocate rates tokens every interval, -// while respecting bucketCapacity. We have split up the ticking & token allocation -// logic, in order to improve clarity & testability. +// cpuTimeTokenAllocator is to gradually allocate tokens every interval, +// while respecting bucketCapacity. The computation of the rate of tokens to add +// every interval is left to cpuTimeTokenLinearModel. type cpuTimeTokenAllocator struct { granter *cpuTimeTokenGranter + model cpuTimeModel - // Mutable fields. No mutex, since only a single goroutine will call the - // cpuTimeTokenAllocator. - - // rates stores the number of token added to each bucket every interval. - rates [numResourceTiers][numBurstQualifications]int64 - // bucketCapacity stores the maximum number of tokens that can be in each bucket. - // That is, if a bucket is already at capacity, no more tokens will be added. - bucketCapacity [numResourceTiers][numBurstQualifications]int64 // allocated stores the number of tokens added to each bucket in the current - // interval. + // cpuTimeTokenAllocator. No mutex, since only a single goroutine will call + // the interval. allocated [numResourceTiers][numBurstQualifications]int64 } var _ tokenAllocator = &cpuTimeTokenAllocator{} +func (a *cpuTimeTokenAllocator) init() { + a.model.init() +} + // allocateTokens allocates tokens to a cpuTimeTokenGranter. allocateTokens -// adds rates tokens every interval, while respecting bucketCapacity. +// adds the desired number of tokens every interval, while respecting bucketCapacity. // allocateTokens adds tokens evenly among the expected remaining ticks in // the interval. // INVARIANT: remainingTicks >= 1. @@ -156,24 +167,217 @@ func (a *cpuTimeTokenAllocator) allocateTokens(expectedRemainingTicksInInterval return toAllocate } + rates := a.model.getRefillRates() var delta [numResourceTiers][numBurstQualifications]int64 - for wc := range a.rates { - for kind := range a.rates[wc] { + for wc := range rates { + for kind := range rates[wc] { toAllocateTokens := allocateFunc( - a.rates[wc][kind], a.allocated[wc][kind], expectedRemainingTicksInInterval) + rates[wc][kind], a.allocated[wc][kind], expectedRemainingTicksInInterval) a.allocated[wc][kind] += toAllocateTokens delta[wc][kind] = toAllocateTokens } } - a.granter.refill(delta, a.bucketCapacity) + a.granter.refill(delta, rates) } // resetInterval is called to signal the beginning of a new interval. allocateTokens -// adds rates tokens every interval. -func (a *cpuTimeTokenAllocator) resetInterval() { +// adds the desired number of tokens every interval. +func (a *cpuTimeTokenAllocator) resetInterval(skipFittingLinearModel bool) { + if !skipFittingLinearModel { + // delta is the difference in tokens to add per interval (1s) from previous + // call to fit to this one. We add it immediately to the bucket. The model itself + // handles filtering. + delta := a.model.fit() + a.granter.refill(delta, a.model.getRefillRates()) + } for wc := range a.allocated { for kind := range a.allocated[wc] { a.allocated[wc][kind] = 0 } } } + +// cpuTimeModel abstracts cpuTimeLinearModel for testing. +type cpuTimeModel interface { + init() + fit() [numResourceTiers][numBurstQualifications]int64 + getRefillRates() [numResourceTiers][numBurstQualifications]int64 +} + +// cpuTimeTokenLinearModel computes the number of CPU time tokens to add +// to each bucket in the cpuTimeTokenGranter, per interval (per 1s). As is +// discussed in the cpuTimeTokenGranter docs, the buckets are arranged in a +// priority hierarchy; some buckets always have more tokens added per second +// than others. +// +// Consider the refill rate of the lowest priority bucket first. In that case: +// refillRate = 1s * vCPU count * targetUtilization * linearCorrectionTerm +// +// This formula is intuitive if linearCorrectionTerm equals 1. If CPU time +// tokens corresponded exactly to actual CPU time, e.g. one token corresponds +// to one nanosecond of CPU time, it would imply the token bucket will limit +// the CPU used by requests subject to it to targetUtilization. Note that +// targetUtilization is controllable via the admission.cpu_time_tokens.target_util +// cluster setting. +// +// Example: +// 8 vCPU machine. admission.cpu_time_tokens.target_util = 0.8 (80%) +// RefillRate = 8 * 1s * .8 = 6.4 seconds of CPU time per second +// +// linearCorrectionTerm is a correction term derived from a linear model (hence +// the name of the struct). There is work happening outside the kvserver BatchRequest +// evaluation path, such as compaction. AC continuously fits a linear model: +// total-cpu-time = linearCorrectionTerm * reported-cpu-time, where a is forced to be +// in the interval [1, 20]. +// +// The higher priority buckets have higher target utlizations than the lowest +// priority one. The delta between the target utlizations is fixed, e.g. +// burstable tenant work has a 5% higher target utilization than non-burstable. +type cpuTimeTokenLinearModel struct { + granter tokenUsageTracker + settings *cluster.Settings + cpuMetricProvider CPUMetricProvider + timeSource timeutil.TimeSource + + // The time that fit was called last. + lastFitTime time.Time + // The comulative user/sys CPU time used since process start in + // milliseconds. + totalCPUTimeMillis int64 + // The CPU capacity measured in vCPUs. + cpuCapacity float64 + // The lineabr correction term, see the docs above cpuTimeTokenLinearModel. + tokenToCPUTimeMultiplier float64 + + // The number of CPU time tokens to add to each bucket per interval (1s). + rates [numResourceTiers][numBurstQualifications]int64 +} + +type tokenUsageTracker interface { + getTokensUsedInInterval() int64 +} + +type CPUMetricProvider interface { + // GetCPUInfo returns the comulative user/sys CPU time used since process + // start in milliseconds, and the cpuCapacity measured in vCPUs. + GetCPUInfo() (totalCPUTimeMillis int64, cpuCapacity float64) +} + +// init sets tokenToCPUTImeMultipler to 1 & computes refill rates. +func (m *cpuTimeTokenLinearModel) init() { + m.lastFitTime = m.timeSource.Now() + _, cpuCapacity := m.cpuMetricProvider.GetCPUInfo() + m.cpuCapacity = cpuCapacity + m.tokenToCPUTimeMultiplier = 1 + _ = m.updateRefillRates() +} + +// fit adjusts tokenToCPUTimeMultiplier based on CPU usage & token usage. fit +// computes refill rates from tokenToCPUTimeMultiplier & the admission.cpu_time_tokens.target_util +// cluster setting. fit returns the delta refill rates. That is fit returns the difference in tokens +// to add per interval (1s) from previous call to fit to this one. +func (m *cpuTimeTokenLinearModel) fit() [numResourceTiers][numBurstQualifications]int64 { + now := m.timeSource.Now() + elapsedSinceLastFit := now.Sub(m.lastFitTime) + m.lastFitTime = now + + // Get used CPU tokens. + tokensUsed := m.granter.getTokensUsedInInterval() + // At admission time, an estimate of CPU time is deducted. After + // the request is done processing, a correction based on a measurement + // from grunning is deducted. Thus it is theoretically possible for net + // tokens used to be <=0. In this case, we set tokensUsed to 1, so that + // the computation of tokenToCPUTimeMultiplier is well-behaved. + if tokensUsed <= 0 { + tokensUsed = 1 + } + + // Get used CPU time. + totalCPUTimeMillis, _ := m.cpuMetricProvider.GetCPUInfo() + intCPUTimeMillis := totalCPUTimeMillis - m.totalCPUTimeMillis + // totalCPUTimeMillis is not necessarily monontonic in all envionrments, + // e.g. in case of VM live migration on a public cloud provider. + if intCPUTimeMillis < 0 { + intCPUTimeMillis = 0 + } + m.totalCPUTimeMillis = totalCPUTimeMillis + intCPUTimeNanos := intCPUTimeMillis * 1e6 + + // Update multiplier. + const lowCPUUtilFrac = 0.25 + isLowCPUUtil := intCPUTimeNanos < int64(float64(elapsedSinceLastFit)*m.cpuCapacity*lowCPUUtilFrac) + if isLowCPUUtil { + // Ensure that low CPU utilization is not due to a flawed tokenToCPUTimeMultiplier + // by multiplicatively lowering it until we are below the upperBound. If we are already + // below uppperBound, we make no adjustment. + const upperBound = (1 / lowCPUUtilFrac) * 0.9 // 3.6 + if m.tokenToCPUTimeMultiplier > upperBound { + m.tokenToCPUTimeMultiplier /= 1.5 + if m.tokenToCPUTimeMultiplier < upperBound { + m.tokenToCPUTimeMultiplier = upperBound + } + } + } else { + tokenToCPUTimeMultiplier := + float64(intCPUTimeNanos) / float64(tokensUsed) + // Mulitplier is forced into the interval [1, 20]. + if tokenToCPUTimeMultiplier > 20 { + // Cap the multiplier. + tokenToCPUTimeMultiplier = 20 + } else if tokenToCPUTimeMultiplier < 1 { + // Likely because work is queued up in the goroutine scheduler. + tokenToCPUTimeMultiplier = 1 + } + // Decrease faster than increase. Giving out too many tokens can + // lead to goroutine scheduling latency. + alpha := 0.5 + if tokenToCPUTimeMultiplier < m.tokenToCPUTimeMultiplier { + alpha = 0.8 + } + + // Exponentially filter changes to the multiplier. 1s of data is noisy, + // so filtering is necessary. + m.tokenToCPUTimeMultiplier = + alpha*tokenToCPUTimeMultiplier + (1-alpha)*m.tokenToCPUTimeMultiplier + } + + return m.updateRefillRates() +} + +// updateRefillRates computes refill rates from tokenToCPUTimeMultiplier & +// the admission.cpu_time_tokens.target_util cluster setting. updateRefillRates +// returns the delta refill rates. That is updateRefillRates returns the difference +// in tokens to add per interval (1s) from previous call to fit to this one. +func (m *cpuTimeTokenLinearModel) updateRefillRates() [numResourceTiers][numBurstQualifications]int64 { + // Compute goals from cluster setting. Algorithmically, it is okay if some of + // the below goalUtils are greater than 1. This would mean greater risk of + // goroutine scheduling latency, but there is no immediate problem -- the + // greater some goalUtil is, the more CPU time tokens will be in the corresponding + // bucket. + var goalUtils [numResourceTiers][numBurstQualifications]float64 + util := KVCPUTimeUtilGoal.Get(&m.settings.SV) + var iter float64 + for tier := int(numResourceTiers - 1); tier >= 0; tier-- { + for qual := int(numBurstQualifications - 1); qual >= 0; qual-- { + goalUtils[tier][qual] = util + 0.05*iter + iter++ + } + } + + // Update refill rates. Return change in rates via delta. + var delta [numResourceTiers][numBurstQualifications]int64 + for tier := range goalUtils { + for qual := range goalUtils[tier] { + newRate := + int64(m.cpuCapacity * float64(time.Second) * goalUtils[tier][qual] / m.tokenToCPUTimeMultiplier) + delta[tier][qual] = newRate - m.rates[tier][qual] + m.rates[tier][qual] = newRate + } + } + return delta +} + +// getRefillRates returns the number of CPU time tokens to add to each bucket per interval (1s). +func (m *cpuTimeTokenLinearModel) getRefillRates() [numResourceTiers][numBurstQualifications]int64 { + return m.rates +} diff --git a/pkg/util/admission/cpu_time_token_filler_test.go b/pkg/util/admission/cpu_time_token_filler_test.go index d1f45227dbc4..aa46293ef2d2 100644 --- a/pkg/util/admission/cpu_time_token_filler_test.go +++ b/pkg/util/admission/cpu_time_token_filler_test.go @@ -11,11 +11,13 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" ) func TestCPUTimeTokenFiller(t *testing.T) { @@ -68,14 +70,31 @@ type testTokenAllocator struct { buf *strings.Builder } -func (a *testTokenAllocator) resetInterval() { - fmt.Fprintf(a.buf, "resetInterval()\n") +func (m *testTokenAllocator) init() {} + +func (a *testTokenAllocator) resetInterval(skipFittingLinearModel bool) { + fmt.Fprintf(a.buf, "resetInterval(%t)\n", skipFittingLinearModel) } func (a *testTokenAllocator) allocateTokens(remainingTicks int64) { fmt.Fprintf(a.buf, "allocateTokens(%d)\n", remainingTicks) } +type testModel struct { + rates [numResourceTiers][numBurstQualifications]int64 + delta [numResourceTiers][numBurstQualifications]int64 +} + +func (m *testModel) init() {} + +func (m *testModel) fit() [numResourceTiers][numBurstQualifications]int64 { + return m.delta +} + +func (m *testModel) getRefillRates() [numResourceTiers][numBurstQualifications]int64 { + return m.rates +} + func TestCPUTimeTokenAllocator(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -101,20 +120,19 @@ func TestCPUTimeTokenAllocator(t *testing.T) { granter.requester[testTier0] = requesters[testTier0] granter.requester[testTier1] = requesters[testTier1] + model := &testModel{} + model.rates[testTier0][canBurst] = 5 + model.rates[testTier0][noBurst] = 4 + model.rates[testTier1][canBurst] = 3 + model.rates[testTier1][noBurst] = 2 allocator := cpuTimeTokenAllocator{ granter: granter, + model: model, } - allocator.rates[testTier0][canBurst] = 5 - allocator.rates[testTier0][noBurst] = 4 - allocator.rates[testTier1][canBurst] = 3 - allocator.rates[testTier1][noBurst] = 2 - allocator.bucketCapacity = allocator.rates var buf strings.Builder - flushAndReset := func(printGranter bool) string { - if printGranter { - fmt.Fprint(&buf, granter.String()) - } + flushAndReset := func() string { + fmt.Fprint(&buf, granter.String()) str := buf.String() buf.Reset() return str @@ -123,21 +141,191 @@ func TestCPUTimeTokenAllocator(t *testing.T) { datadriven.RunTest(t, datapathutils.TestDataPath(t, "cpu_time_token_allocator"), func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "resetInterval": - allocator.resetInterval() - return flushAndReset(false /* printGranter */) + var delta int64 + d.MaybeScanArgs(t, "delta", &delta) + if d.MaybeScanArgs(t, "delta", &delta) { + for tier := range model.delta { + for qual := range model.delta[tier] { + model.delta[tier][qual] = delta + } + } + } + skipFit := d.HasArg("skipfit") + allocator.resetInterval(skipFit /* skipFittingLinearModel */) + for tier := range model.delta { + for qual := range model.delta[tier] { + model.delta[tier][qual] = 0 + } + } + return flushAndReset() case "allocate": var remainingTicks int64 d.ScanArgs(t, "remaining", &remainingTicks) allocator.allocateTokens(remainingTicks) - return flushAndReset(true /* printGranter */) + return flushAndReset() case "clear": granter.mu.buckets[testTier0][canBurst].tokens = 0 granter.mu.buckets[testTier0][noBurst].tokens = 0 granter.mu.buckets[testTier1][canBurst].tokens = 0 granter.mu.buckets[testTier1][noBurst].tokens = 0 - return flushAndReset(true /* printGranter */) + return flushAndReset() default: return fmt.Sprintf("unknown command: %s", d.Cmd) } }) } + +func TestCPUTimeTokenLinearModel(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + unixNanos := int64(1758938600000000000) // 2025-09-24T14:30:00Z + testTime := timeutil.NewManualTime(time.Unix(0, unixNanos).UTC()) + model := cpuTimeTokenLinearModel{ + settings: cluster.MakeClusterSettings(), + timeSource: testTime, + lastFitTime: testTime.Now(), + totalCPUTimeMillis: 0, + tokenToCPUTimeMultiplier: 1, + } + tokenCPUTime := &testTokenUsageTracker{} + model.granter = tokenCPUTime + actualCPUTime := &testCPUMetricProvider{ + capacity: 10, + } + model.cpuMetricProvider = actualCPUTime + + dur := 5 * time.Second + actualCPUTime.append(dur.Nanoseconds(), 1) // appended value ignored by init + model.init() + + // 2x. + tokenCPUTime.append(dur.Nanoseconds()/2, 100) + actualCPUTime.append(dur.Milliseconds(), 100) + for i := 0; i < 100; i++ { + testTime.Advance(time.Second) + model.fit() + } + tolerance := 0.01 + require.InDelta(t, 2, model.tokenToCPUTimeMultiplier, tolerance) + + // 4x. + tokenCPUTime.append(dur.Nanoseconds()/2, 100) + actualCPUTime.append(dur.Milliseconds()*2, 100) + for i := 0; i < 100; i++ { + testTime.Advance(time.Second) + model.fit() + } + require.InDelta(t, 4, model.tokenToCPUTimeMultiplier, tolerance) + + // 1x. + tokenCPUTime.append(dur.Nanoseconds()*2, 100) + actualCPUTime.append(dur.Milliseconds()*2, 100) + for i := 0; i < 100; i++ { + testTime.Advance(time.Second) + model.fit() + } + require.InDelta(t, 1, model.tokenToCPUTimeMultiplier, tolerance) + + // Cap at 20x. + tokenCPUTime.append(dur.Nanoseconds(), 100) + actualCPUTime.append(dur.Milliseconds()*40, 100) + for i := 0; i < 100; i++ { + testTime.Advance(time.Second) + model.fit() + } + require.InDelta(t, 20, model.tokenToCPUTimeMultiplier, tolerance) + + // Cap at 1x. + tokenCPUTime.append(dur.Nanoseconds()*2, 100) + actualCPUTime.append(dur.Milliseconds(), 100) + for i := 0; i < 100; i++ { + testTime.Advance(time.Second) + model.fit() + } + require.InDelta(t, 1, model.tokenToCPUTimeMultiplier, tolerance) + + // 2x. + tokenCPUTime.append(dur.Nanoseconds(), 100) + actualCPUTime.append(dur.Milliseconds()*2, 100) + for i := 0; i < 100; i++ { + testTime.Advance(time.Second) + model.fit() + } + require.InDelta(t, 2, model.tokenToCPUTimeMultiplier, tolerance) + + // Leave 2x as is, even tho low CPU mode, since multiplier is already low. + tokenCPUTime.append(dur.Nanoseconds()/5, 100) + actualCPUTime.append(dur.Milliseconds()/5, 100) + for i := 0; i < 100; i++ { + testTime.Advance(time.Second) + model.fit() + } + require.InDelta(t, 2, model.tokenToCPUTimeMultiplier, tolerance) + + // 20x. + tokenCPUTime.append(dur.Nanoseconds(), 100) + actualCPUTime.append(dur.Milliseconds()*100, 100) + for i := 0; i < 100; i++ { + testTime.Advance(time.Second) + model.fit() + } + require.InDelta(t, 20, model.tokenToCPUTimeMultiplier, tolerance) + + // Reduce to 3.6x, since low CPU mode, and multiplier is high. + tokenCPUTime.append(dur.Nanoseconds()/5, 100) + actualCPUTime.append(dur.Milliseconds()/5, 100) + for i := 0; i < 100; i++ { + testTime.Advance(time.Second) + model.fit() + } + require.InDelta(t, 3.6, model.tokenToCPUTimeMultiplier, tolerance) + + rates := model.getRefillRates() + // Hard-coded to be 0. + // 95% -> 10 vCPUs * .95 * 1s = 9.5s, 9.5s / 3.6 ~= 2.63888889 + require.Equal(t, int64(2638888888), rates[testTier0][canBurst]) + // 90% -> 10 vCPUs * .9 * 1s = 9s, 9s / 3.6 ~= 2.5s + require.Equal(t, int64(2500000000), rates[testTier0][noBurst]) + // 85% -> 10 vCPUs * .85 * 1s = 8.5s, 8.5s / 3.6 ~= 2.36111111s + require.Equal(t, int64(2361111111), rates[testTier1][canBurst]) + // 80% -> 10 vCPUs * .8 * 1s = 8s, 8s / 3.6 ~= 2.22222222s + require.Equal(t, int64(2222222222), rates[testTier1][noBurst]) +} + +type testTokenUsageTracker struct { + i int + tokensUsed []int64 +} + +func (t *testTokenUsageTracker) append(tokens int64, count int) { + for i := 0; i < count; i++ { + t.tokensUsed = append(t.tokensUsed, tokens) + } +} + +func (t *testTokenUsageTracker) getTokensUsedInInterval() int64 { + ret := t.tokensUsed[t.i] + t.i++ + return ret +} + +type testCPUMetricProvider struct { + i int + cum int64 + millis []int64 + capacity float64 +} + +func (m *testCPUMetricProvider) GetCPUInfo() (int64, float64) { + cycle := m.millis[m.i] + m.i++ + m.cum += cycle + return m.cum, m.capacity +} + +func (t *testCPUMetricProvider) append(millis int64, count int) { + for i := 0; i < count; i++ { + t.millis = append(t.millis, millis) + } +} diff --git a/pkg/util/admission/cpu_time_token_granter.go b/pkg/util/admission/cpu_time_token_granter.go index 832ed6d3cae2..38986c4d4a7f 100644 --- a/pkg/util/admission/cpu_time_token_granter.go +++ b/pkg/util/admission/cpu_time_token_granter.go @@ -117,7 +117,8 @@ type cpuTimeTokenGranter struct { // Since admission deducts from all buckets, these invariants are true, so long as token bucket // replenishing respects it also. Token bucket replenishing is not yet implemented. See // tryGrantLocked for a situation where invariant #1 is relied on. - buckets [numResourceTiers][numBurstQualifications]tokenBucket + buckets [numResourceTiers][numBurstQualifications]tokenBucket + tokensUsed int64 } } @@ -190,6 +191,7 @@ func (stg *cpuTimeTokenGranter) tookWithoutPermission(count int64) { } func (stg *cpuTimeTokenGranter) tookWithoutPermissionLocked(count int64) { + stg.mu.tokensUsed += count for tier := range stg.mu.buckets { for qual := range stg.mu.buckets[tier] { stg.mu.buckets[tier][qual].tokens -= count @@ -245,6 +247,16 @@ func (stg *cpuTimeTokenGranter) tryGrantLocked() bool { return false } +// getTokensUsedInInterval returns the net number of tokens deducted from the +// buckets, since the last call to getTokensUsedInInterval. +func (stg *cpuTimeTokenGranter) getTokensUsedInInterval() int64 { + stg.mu.Lock() + defer stg.mu.Unlock() + tokensUsed := stg.mu.tokensUsed + stg.mu.tokensUsed = 0 + return tokensUsed +} + // refill adds delta tokens to the corresponding buckets, while respecting // the capacity info stored in bucketCapacity. That is, if a bucket is already // at capacity, no more tokens will be added. delta is always positive, diff --git a/pkg/util/admission/cpu_time_token_granter_test.go b/pkg/util/admission/cpu_time_token_granter_test.go index dc3d17ff9ce9..7cd321f4ed9f 100644 --- a/pkg/util/admission/cpu_time_token_granter_test.go +++ b/pkg/util/admission/cpu_time_token_granter_test.go @@ -167,6 +167,11 @@ func TestCPUTimeTokenGranter(t *testing.T) { fmt.Fprintf(&buf, "refill(%v %v)\n", delta, bucketCapacity) return flushAndReset(false /* init */) + case "get-tokens-used": + used := granter.getTokensUsedInInterval() + fmt.Fprintf(&buf, "get-tokens-used-in-interval() returned %d\n", used) + return flushAndReset(false /* init */) + // For cpuTimeTokenChildGranter, this is a NOP. Still, it will be // called in production. So best to test it doesn't panic, or similar. case "continue-grant-chain": diff --git a/pkg/util/admission/testdata/cpu_time_token_allocator b/pkg/util/admission/testdata/cpu_time_token_allocator index c81a4540c0a0..0623c34a2f81 100644 --- a/pkg/util/admission/testdata/cpu_time_token_allocator +++ b/pkg/util/admission/testdata/cpu_time_token_allocator @@ -30,6 +30,9 @@ tier1 3 2 resetInterval ---- +cpuTTG canBurst noBurst +tier0 5 4 +tier1 3 2 # No more tokens added due to burst budget. allocate remaining=5 @@ -53,8 +56,27 @@ cpuTTG canBurst noBurst tier0 4 3 tier1 2 1 -resetInterval +# Test that no tokens are added to bucket, since call to fit, etc. +# is skipped. +resetInterval delta=1 skipfit +---- +cpuTTG canBurst noBurst +tier0 4 3 +tier1 2 1 + +# Test that tokens are added to bucket, in case of a refill rate change. +resetInterval delta=1 +---- +cpuTTG canBurst noBurst +tier0 5 4 +tier1 3 2 + +# Test that burst budget will be respected, in case of refill rate change. +resetInterval delta=1 ---- +cpuTTG canBurst noBurst +tier0 5 4 +tier1 3 2 clear ---- @@ -72,6 +94,9 @@ tier1 3 2 resetInterval ---- +cpuTTG canBurst noBurst +tier0 5 4 +tier1 3 2 clear ---- diff --git a/pkg/util/admission/testdata/cpu_time_token_filler b/pkg/util/admission/testdata/cpu_time_token_filler index 682d9e64c84f..8e45876467e7 100644 --- a/pkg/util/admission/testdata/cpu_time_token_filler +++ b/pkg/util/admission/testdata/cpu_time_token_filler @@ -1,5 +1,7 @@ init ---- +allocateTokens(1) +resetInterval(true) elapsed: 0s # Let 2s pass, 200ms at a time. Expect a call to allocateTokens @@ -28,7 +30,7 @@ elapsed: 800ms advance dur=200ms ---- allocateTokens(1) -resetInterval() +resetInterval(false) allocateTokens(1000) elapsed: 1s @@ -55,7 +57,7 @@ elapsed: 1.8s advance dur=200ms ---- allocateTokens(1) -resetInterval() +resetInterval(false) allocateTokens(1000) elapsed: 2s @@ -64,7 +66,7 @@ elapsed: 2s advance dur=1000ms ---- allocateTokens(1) -resetInterval() +resetInterval(false) allocateTokens(1000) elapsed: 3s @@ -87,7 +89,7 @@ elapsed: 3.8s advance dur=200ms ---- allocateTokens(1) -resetInterval() +resetInterval(false) allocateTokens(1000) elapsed: 4s diff --git a/pkg/util/admission/testdata/cpu_time_token_granter b/pkg/util/admission/testdata/cpu_time_token_granter index 5de370579967..508b7729dacd 100644 --- a/pkg/util/admission/testdata/cpu_time_token_granter +++ b/pkg/util/admission/testdata/cpu_time_token_granter @@ -22,6 +22,16 @@ try-get tier=tier0 v=1 ---- kvtier0: tryGet(1) returned false +get-tokens-used +---- +get-tokens-used-in-interval() returned 1 + +# Returns tokens used since last call to get-tokens-used. +# So should be zero. +get-tokens-used +---- +get-tokens-used-in-interval() returned 0 + # More simple tryGet calls. This time, tier1 work is admitted. init tier0=2 tier1=1 ---- @@ -59,6 +69,10 @@ try-get tier=tier0 v=1 ---- kvtier0: tryGet(1) returned false +get-tokens-used +---- +get-tokens-used-in-interval() returned 2 + # returnGrant adds tokens to the buckets, when a positive count is used. init tier0=1 tier1=1 ---- @@ -89,6 +103,10 @@ cpuTTG canBurst noBurst tier0 -1 0 tier1 -1 0 +get-tokens-used +---- +get-tokens-used-in-interval() returned 1 + # Simple tookWithoutPermission test. init tier0=3 tier1=3 ---- @@ -113,6 +131,10 @@ try-get tier=tier1 v=1 ---- kvtier1: tryGet(1) returned false +get-tokens-used +---- +get-tokens-used-in-interval() returned 3 + # Test granting. A single tier0 request will be admitted. init tier0waiter=2 tier1waiter=1 ---- @@ -127,6 +149,10 @@ return-grant tier=tier0 v=2 kvtier0: returnGrant(2) kvtier0: granted in chain 0, and returning 2 +get-tokens-used +---- +get-tokens-used-in-interval() returned 0 + # Test granting. Two tier1 requests will be admitted. init tier0=-1 tier0burst=-1 tier1=-1 tier1burst=-1 tier1waiter=1 ---- @@ -150,6 +176,10 @@ cpuTTG canBurst noBurst tier0 0 0 tier1 0 0 +get-tokens-used +---- +get-tokens-used-in-interval() returned -1 + # Simple tryGet calls, with burst this time. init tier0burst=1 ---- @@ -174,6 +204,10 @@ try-get tier=tier0 burst v=1 ---- kvtier0: tryGet(1) returned false +get-tokens-used +---- +get-tokens-used-in-interval() returned 1 + # Test granting, with burst. After the call to return-grant, only tier1 burst will # have enough tokens to grant. So grant a tier1 request. init tier0burst=-1 tier0=-1 tier1burst=0 tier1=-1 tier1waiter=1 tier1burstwaiter @@ -189,6 +223,10 @@ return-grant tier=tier0 v=1 kvtier0: returnGrant(1) kvtier1: granted in chain 0, and returning 1 +get-tokens-used +---- +get-tokens-used-in-interval() returned 0 + # Test granting, with burst. After the call to return-grant, only tier1 no-burst will # have enough tokens to grant. So no grant, since the tier1 waiting request is a burst request. init tier0burst=-1 tier0=-1 tier1burst=-1 tier1=0 tier1waiter=1 tier1burstwaiter @@ -206,6 +244,10 @@ cpuTTG canBurst noBurst tier0 0 0 tier1 0 1 +get-tokens-used +---- +get-tokens-used-in-interval() returned -1 + # Test refill, which is the interface cpuTimeTokenFiller uses to add tokens to # the buckets. The final allocation of tokens in the buckets is a result of # the delta argument to refill, the bucket capacities in certain cases capping how @@ -225,3 +267,7 @@ refill([[5 4] [3 1]] [[4 3] [10 1]]) cpuTTG canBurst noBurst tier0 3 2 tier1 2 0 + +get-tokens-used +---- +get-tokens-used-in-interval() returned 1