Skip to content

Commit ef10b85

Browse files
craig[bot]mgartner
andcommitted
Merge #156405
156405: kvcoord: pre-allocate requests and positions for truncate r=mgartner a=mgartner #### kvcoord: hard-code seed in BenchmarkTruncateLoop Hard-coding the seed in `BenchmarkTruncateLoop` ensures fair comparisons across multiple benchmark runs. #### kvcoord: pre-allocate requests and positions for truncate Release note: None #### kvcoord: eliminate bounds checks in truncate loops Release note: None Co-authored-by: Marcus Gartner <marcus@cockroachlabs.com>
2 parents fbe4b69 + 888140d commit ef10b85

File tree

2 files changed

+125
-45
lines changed

2 files changed

+125
-45
lines changed

pkg/kv/kvclient/kvcoord/batch.go

Lines changed: 124 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -476,32 +476,69 @@ func (h *BatchTruncationHelper) Truncate(
476476
// non-trivial slowdown and increase in allocations, so we choose to duplicate
477477
// the code for performance.
478478
func (h *BatchTruncationHelper) truncateAsc(rs roachpb.RSpan) ([]kvpb.RequestUnion, []int, error) {
479-
var truncReqs []kvpb.RequestUnion
480-
var positions []int
481-
for i := h.startIdx; i < len(h.positions); i++ {
482-
pos := h.positions[i]
479+
endIdx := len(h.positions)
480+
headers := h.headers[h.startIdx:endIdx]
481+
positions := h.positions[h.startIdx:endIdx]
482+
requests := h.requests[h.startIdx:endIdx]
483+
isRange := h.isRange[h.startIdx:endIdx]
484+
endIdx = len(positions)
485+
486+
fullyProcessed := 0
487+
for i := range positions {
488+
//gcassert:bce
489+
pos := positions[i]
483490
if pos < 0 {
484-
// This request has already been fully processed, so there is no
485-
// need to look at it.
491+
fullyProcessed++
486492
continue
487493
}
488-
header := h.headers[i]
489-
// rs.EndKey can't be local because it contains range split points,
490-
// which are never local.
494+
//gcassert:bce
495+
header := headers[i]
491496
ek := rs.EndKey.AsRawKey()
492497
if ek.Compare(header.Key) <= 0 {
493498
// All of the remaining requests start after this range, so we're
494499
// done.
500+
endIdx = i
495501
break
496502
}
497-
if !h.isRange[i] {
503+
}
504+
505+
headers = headers[:endIdx]
506+
positions = positions[:endIdx]
507+
requests = requests[:endIdx]
508+
isRange = isRange[:endIdx]
509+
numReqs := len(positions) - fullyProcessed
510+
if numReqs == 0 {
511+
return nil, nil, nil
512+
}
513+
truncReqs := make([]kvpb.RequestUnion, 0, numReqs)
514+
truncPositions := make([]int, 0, numReqs)
515+
516+
for i := range positions {
517+
//gcassert:bce
518+
pos := positions[i]
519+
if pos < 0 {
520+
// This request has already been fully processed, so there is no
521+
// need to look at it.
522+
continue
523+
}
524+
//gcassert:bce
525+
header := headers[i]
526+
// rs.EndKey can't be local because it contains range split points,
527+
// which are never local.
528+
ek := rs.EndKey.AsRawKey()
529+
//gcassert:bce
530+
req := requests[i]
531+
//gcassert:bce
532+
if !isRange[i] {
498533
// This is a point request, and the key is contained within this
499534
// range, so we include the request as is and mark it as "fully
500535
// processed".
501-
truncReqs = append(truncReqs, h.requests[i])
502-
positions = append(positions, pos)
503-
h.headers[i] = kvpb.RequestHeader{}
504-
h.positions[i] = -1
536+
truncReqs = append(truncReqs, req)
537+
truncPositions = append(truncPositions, pos)
538+
//gcassert:bce
539+
headers[i] = kvpb.RequestHeader{}
540+
//gcassert:bce
541+
positions[i] = -1
505542
continue
506543
}
507544
// We're dealing with a range-spanning request.
@@ -514,32 +551,35 @@ func (h *BatchTruncationHelper) truncateAsc(rs roachpb.RSpan) ([]kvpb.RequestUni
514551
)
515552
}
516553
}
517-
inner := h.requests[i].GetInner()
554+
inner := req.GetInner()
518555
if header.EndKey.Compare(ek) <= 0 {
519556
// This is the last part of this request since it is fully contained
520557
// within this range, so we mark the request as "fully processed".
521-
h.headers[i] = kvpb.RequestHeader{}
522-
h.positions[i] = -1
558+
//gcassert:bce
559+
headers[i] = kvpb.RequestHeader{}
560+
//gcassert:bce
561+
positions[i] = -1
523562
if origStartKey := inner.Header().Key; origStartKey.Equal(header.Key) {
524563
// This range-spanning request fits within a single range, so we
525564
// can just use the original request.
526-
truncReqs = append(truncReqs, h.requests[i])
527-
positions = append(positions, pos)
565+
truncReqs = append(truncReqs, req)
566+
truncPositions = append(truncPositions, pos)
528567
continue
529568
}
530569
} else {
531570
header.EndKey = ek
532571
// Adjust the start key of the header so that it contained only the
533572
// unprocessed suffix of the request.
534-
h.headers[i].Key = header.EndKey
573+
//gcassert:bce
574+
headers[i].Key = header.EndKey
535575
}
536576
shallowCopy := inner.ShallowCopy()
537577
shallowCopy.SetHeader(header)
538578
truncReqs = append(truncReqs, kvpb.RequestUnion{})
539579
truncReqs[len(truncReqs)-1].MustSetInner(shallowCopy)
540-
positions = append(positions, pos)
580+
truncPositions = append(truncPositions, pos)
541581
}
542-
return truncReqs, positions, nil
582+
return truncReqs, truncPositions, nil
543583
}
544584

545585
// truncateDesc is the optimized strategy for Truncate() with the Descending
@@ -635,32 +675,69 @@ func (h *BatchTruncationHelper) truncateAsc(rs roachpb.RSpan) ([]kvpb.RequestUni
635675
// non-trivial slowdown and increase in allocations, so we choose to duplicate
636676
// the code for performance.
637677
func (h *BatchTruncationHelper) truncateDesc(rs roachpb.RSpan) ([]kvpb.RequestUnion, []int, error) {
638-
var truncReqs []kvpb.RequestUnion
639-
var positions []int
640-
for i := h.startIdx; i < len(h.positions); i++ {
641-
pos := h.positions[i]
678+
endIdx := len(h.positions)
679+
headers := h.headers[h.startIdx:endIdx]
680+
positions := h.positions[h.startIdx:endIdx]
681+
requests := h.requests[h.startIdx:endIdx]
682+
isRange := h.isRange[h.startIdx:endIdx]
683+
endIdx = len(positions)
684+
685+
fullyProcessed := 0
686+
for i := range positions {
687+
//gcassert:bce
688+
pos := positions[i]
642689
if pos < 0 {
643-
// This request has already been fully processed, so there is no
644-
// need to look at it.
690+
fullyProcessed++
645691
continue
646692
}
647-
header := h.headers[i]
648-
// rs.Key can't be local because it contains range split points, which
649-
// are never local.
693+
//gcassert:bce
694+
header := headers[i]
650695
sk := rs.Key.AsRawKey()
651696
if sk.Compare(header.EndKey) >= 0 {
652697
// All of the remaining requests end before this range, so we're
653698
// done.
699+
endIdx = i
654700
break
655701
}
656-
if !h.isRange[i] {
702+
}
703+
704+
headers = headers[:endIdx]
705+
positions = positions[:endIdx]
706+
requests = requests[:endIdx]
707+
isRange = isRange[:endIdx]
708+
numReqs := len(positions) - fullyProcessed
709+
if numReqs == 0 {
710+
return nil, nil, nil
711+
}
712+
truncReqs := make([]kvpb.RequestUnion, 0, numReqs)
713+
truncPositions := make([]int, 0, numReqs)
714+
715+
for i := range positions {
716+
//gcassert:bce
717+
pos := positions[i]
718+
if pos < 0 {
719+
// This request has already been fully processed, so there is no
720+
// need to look at it.
721+
continue
722+
}
723+
//gcassert:bce
724+
header := headers[i]
725+
// rs.Key can't be local because it contains range split points, which
726+
// are never local.
727+
sk := rs.Key.AsRawKey()
728+
//gcassert:bce
729+
req := requests[i]
730+
//gcassert:bce
731+
if !isRange[i] {
657732
// This is a point request, and the key is contained within this
658733
// range, so we include the request as is and mark it as "fully
659734
// processed".
660-
truncReqs = append(truncReqs, h.requests[i])
661-
positions = append(positions, pos)
662-
h.headers[i] = kvpb.RequestHeader{}
663-
h.positions[i] = -1
735+
truncReqs = append(truncReqs, req)
736+
truncPositions = append(truncPositions, pos)
737+
//gcassert:bce
738+
headers[i] = kvpb.RequestHeader{}
739+
//gcassert:bce
740+
positions[i] = -1
664741
continue
665742
}
666743
// We're dealing with a range-spanning request.
@@ -673,32 +750,35 @@ func (h *BatchTruncationHelper) truncateDesc(rs roachpb.RSpan) ([]kvpb.RequestUn
673750
)
674751
}
675752
}
676-
inner := h.requests[i].GetInner()
753+
inner := req.GetInner()
677754
if header.Key.Compare(sk) >= 0 {
678755
// This is the last part of this request since it is fully contained
679756
// within this range, so we mark the request as "fully processed".
680-
h.headers[i] = kvpb.RequestHeader{}
681-
h.positions[i] = -1
757+
//gcassert:bce
758+
headers[i] = kvpb.RequestHeader{}
759+
//gcassert:bce
760+
positions[i] = -1
682761
if origEndKey := inner.Header().EndKey; len(origEndKey) == 0 || origEndKey.Equal(header.EndKey) {
683762
// This range-spanning request fits within a single range, so we
684763
// can just use the original request.
685-
truncReqs = append(truncReqs, h.requests[i])
686-
positions = append(positions, pos)
764+
truncReqs = append(truncReqs, req)
765+
truncPositions = append(truncPositions, pos)
687766
continue
688767
}
689768
} else {
690769
header.Key = sk
691770
// Adjust the end key of the header so that it contained only the
692771
// unprocessed prefix of the request.
693-
h.headers[i].EndKey = header.Key
772+
//gcassert:bce
773+
headers[i].EndKey = header.Key
694774
}
695775
shallowCopy := inner.ShallowCopy()
696776
shallowCopy.SetHeader(header)
697777
truncReqs = append(truncReqs, kvpb.RequestUnion{})
698778
truncReqs[len(truncReqs)-1].MustSetInner(shallowCopy)
699-
positions = append(positions, pos)
779+
truncPositions = append(truncPositions, pos)
700780
}
701-
return truncReqs, positions, nil
781+
return truncReqs, truncPositions, nil
702782
}
703783

704784
var emptyHeader = kvpb.RequestHeader{}

pkg/kv/kvclient/kvcoord/batch_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,14 +642,14 @@ func BenchmarkTruncateLoop(b *testing.B) {
642642
defer leaktest.AfterTest(b)()
643643
defer log.Scope(b).Close(b)
644644

645-
rng, _ := randutil.NewTestRand()
646645
const keyLength = 8
647646
for _, scanDir := range []ScanDirection{Ascending, Descending} {
648647
for _, mustPreserveOrder := range []bool{false, true} {
649648
for _, numRequests := range []int{128, 16384} {
650649
for _, numRanges := range []int{4, 64} {
651650
// We'll split the whole key space into numRanges ranges
652651
// using random numRanges-1 split points.
652+
rng := randutil.NewTestRandWithSeed(51)
653653
rangeSpans := makeRanges(rng, numRanges, keyLength)
654654
if scanDir == Descending {
655655
// Reverse all the range spans for the Descending scan

0 commit comments

Comments
 (0)