Skip to content

Commit e7de41d

Browse files
craig[bot]dodeca12msbutlertbg
committed
155164: storeliveness: add batching metrics to storeliveness transport r=iskettaneh,miraradeva a=dodeca12 Previously, the only metrics on storeliveness transport for batching is a simple counter that shows how many aggregate storeliveness messages have been sent. For paced storeliveness heartbeats verification, we need to compare the same metrics with the baseline to see if the changes introduced via pacing would have some negative impact on heartbeat sending - particularly with respect to batching. This commit adds the same metrics that are present in paced storeliveness heartbeats; additionally, since this commit only introduces metrics, there will not be any impact on current behaviour. #### Metrics added - `storeliveness.transport.batches.sent` - `storeliveness.transport.batches.received` <img width="2320" height="1758" alt="image" src="https://github.com/user-attachments/assets/e2f07976-9ba2-4f3f-a226-9abb8411bb0a" /> References: #148210 Release note: None 156080: kv/bulk: turn on bulkio.ingest.compute_stats_diff_in_stream_batcher.enabled r=dt a=msbutler We figured out the current cause of ComputeStatsDiff violations were due mid key sized based flushes, which are unavoidable and rare. Therefore, we feel good turning on accurate stats computing for PCR. This patch also adds logging every time we conduct a mid key flush. Fixes #152536 Release note: none 156272: roachtest: relax split test r=tbg a=tbg We can see more splits than the test expected. Closes #156261. Epic: none Co-authored-by: Swapneeth Gorantla <swapneeth.gorantla@cockroachlabs.com> Co-authored-by: Michael Butler <butler@cockroachlabs.com> Co-authored-by: Tobias Grieger <tobias.b.grieger@gmail.com>
4 parents 1e00b30 + d816af8 + 0c5af8b + 097f408 commit e7de41d

File tree

8 files changed

+52
-11
lines changed

8 files changed

+52
-11
lines changed

docs/generated/metrics/metrics.yaml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17381,6 +17381,22 @@ layers:
1738117381
unit: COUNT
1738217382
aggregation: AVG
1738317383
derivative: NON_NEGATIVE_DERIVATIVE
17384+
- name: storeliveness.transport.batches-received
17385+
exported_name: storeliveness_transport_batches_received
17386+
description: Number of message batches received by the Store Liveness Transport
17387+
y_axis_label: Batches
17388+
type: COUNTER
17389+
unit: COUNT
17390+
aggregation: AVG
17391+
derivative: NON_NEGATIVE_DERIVATIVE
17392+
- name: storeliveness.transport.batches-sent
17393+
exported_name: storeliveness_transport_batches_sent
17394+
description: Number of message batches sent by the Store Liveness Transport
17395+
y_axis_label: Batches
17396+
type: COUNTER
17397+
unit: COUNT
17398+
aggregation: AVG
17399+
derivative: NON_NEGATIVE_DERIVATIVE
1738417400
- name: storeliveness.transport.receive-queue-bytes
1738517401
exported_name: storeliveness_transport_receive_queue_bytes
1738617402
description: Total byte size of pending incoming messages from Store Liveness Transport

pkg/cmd/roachtest/tests/split.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ func registerLoadSplits(r registry.Registry) {
362362
// The number of splits should be similar to YCSB/A.
363363
cpuThreshold: 100 * time.Millisecond, // 1/10th of a CPU per second.
364364
minimumRanges: 15,
365-
maximumRanges: 40,
365+
maximumRanges: 60, // see #156261
366366
initialRangeCount: 2,
367367
load: ycsbSplitLoad{
368368
workload: "b",

pkg/crosscluster/physical/stream_ingestion_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,7 @@ func (sip *streamIngestionProcessor) handleEvent(event PartitionEvent) error {
784784
}
785785

786786
if sip.logBufferEvery.ShouldLog() {
787-
log.Dev.Infof(sip.Ctx(), "current KV batch size %d (%d items)", sip.buffer.curKVBatchSize, len(sip.buffer.curKVBatch))
787+
log.VEventf(sip.Ctx(), 2, "current KV batch size %d (%d items)", sip.buffer.curKVBatchSize, len(sip.buffer.curKVBatch))
788788
}
789789

790790
if sip.buffer.shouldFlushOnSize(sip.Ctx(), sv) {

pkg/kv/bulk/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ go_library(
3434
"//pkg/util/humanizeutil",
3535
"//pkg/util/limit",
3636
"//pkg/util/log",
37-
"//pkg/util/metamorphic",
3837
"//pkg/util/metric",
3938
"//pkg/util/mon",
4039
"//pkg/util/retry",

pkg/kv/bulk/sst_batcher.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import (
3030
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3131
"github.com/cockroachdb/cockroach/pkg/util/limit"
3232
"github.com/cockroachdb/cockroach/pkg/util/log"
33-
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
3433
"github.com/cockroachdb/cockroach/pkg/util/mon"
3534
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3635
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -70,7 +69,7 @@ var (
7069
settings.ApplicationLevel,
7170
"bulkio.ingest.compute_stats_diff_in_stream_batcher.enabled",
7271
"if set, kvserver will compute an accurate stats diff for every addsstable request",
73-
metamorphic.ConstantWithTestBool("computeStatsDiffInStreamBatcher", false),
72+
true,
7473
)
7574

7675
sstBatcherElasticCPUControlEnabled = settings.RegisterBoolSetting(
@@ -583,15 +582,21 @@ func (b *SSTBatcher) flushIfNeeded(ctx context.Context, nextKey roachpb.Key) err
583582
// That said, only do this if we are only moderately over the flush target;
584583
// if we are subtantially over the limit, just flush the partial row as we
585584
// cannot buffer indefinitely.
585+
586+
prevRow, prevErr := keys.EnsureSafeSplitKey(b.batch.endKey)
587+
nextRow, nextErr := keys.EnsureSafeSplitKey(nextKey)
588+
589+
// An error decoding either key implies it is not a valid row key and thus
590+
// not the same row for our purposes; we don't care what the error is.
591+
midKey := prevErr == nil && nextErr == nil && bytes.Equal(prevRow, nextRow)
586592
if b.batch.sstWriter.DataSize < 2*flushLimit {
587-
prevRow, prevErr := keys.EnsureSafeSplitKey(b.batch.endKey)
588-
nextRow, nextErr := keys.EnsureSafeSplitKey(nextKey)
589-
if prevErr == nil && nextErr == nil && bytes.Equal(prevRow, nextRow) {
590-
// An error decoding either key implies it is not a valid row key and thus
591-
// not the same row for our purposes; we don't care what the error is.
593+
if midKey {
592594
return nil // keep going to row boundary.
593595
}
594596
}
597+
if midKey {
598+
log.Dev.Infof(ctx, "flushing sst mid key %s due to size", b.batch.endKey)
599+
}
595600

596601
if b.mustSyncBeforeFlush {
597602
err := b.syncFlush()

pkg/kv/kvserver/storeliveness/metrics.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ type TransportMetrics struct {
2828
MessagesReceived *metric.Counter
2929
MessagesSendDropped *metric.Counter
3030
MessagesReceiveDropped *metric.Counter
31+
32+
BatchesSent *metric.Counter
33+
BatchesReceived *metric.Counter
3134
}
3235

3336
func newTransportMetrics() *TransportMetrics {
@@ -39,6 +42,8 @@ func newTransportMetrics() *TransportMetrics {
3942
MessagesReceived: metric.NewCounter(metaMessagesReceived),
4043
MessagesSendDropped: metric.NewCounter(metaMessagesSendDropped),
4144
MessagesReceiveDropped: metric.NewCounter(metaMessagesReceiveDropped),
45+
BatchesSent: metric.NewCounter(metaBatchesSent),
46+
BatchesReceived: metric.NewCounter(metaBatchesReceived),
4247
}
4348
}
4449

@@ -201,11 +206,22 @@ var (
201206
Measurement: "Bytes",
202207
Unit: metric.Unit_BYTES,
203208
}
204-
205209
metaCallbacksProcessingDuration = metric.Metadata{
206210
Name: "storeliveness.callbacks.processing_duration",
207211
Help: "Duration of support withdrawal callback processing",
208212
Measurement: "Duration",
209213
Unit: metric.Unit_NANOSECONDS,
210214
}
215+
metaBatchesSent = metric.Metadata{
216+
Name: "storeliveness.transport.batches-sent",
217+
Help: "Number of message batches sent by the Store Liveness Transport",
218+
Measurement: "Batches",
219+
Unit: metric.Unit_COUNT,
220+
}
221+
metaBatchesReceived = metric.Metadata{
222+
Name: "storeliveness.transport.batches-received",
223+
Help: "Number of message batches received by the Store Liveness Transport",
224+
Measurement: "Batches",
225+
Unit: metric.Unit_COUNT,
226+
}
211227
)

pkg/kv/kvserver/storeliveness/transport.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ func (t *Transport) stream(stream slpb.RPCStoreLiveness_StreamStream) error {
168168
if err != nil {
169169
return err
170170
}
171+
t.metrics.BatchesReceived.Inc(1)
171172
if !batch.Now.IsEmpty() {
172173
t.clock.Update(batch.Now)
173174
}
@@ -397,6 +398,8 @@ func (t *Transport) processQueue(
397398
t.metrics.MessagesSendDropped.Inc(int64(len(batch.Messages)))
398399
return err
399400
}
401+
402+
t.metrics.BatchesSent.Inc(1)
400403
t.metrics.MessagesSent.Inc(int64(len(batch.Messages)))
401404

402405
// Reuse the Messages slice, but zero out the contents to avoid delaying

pkg/roachprod/agents/opentelemetry/cockroachdb_metrics.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2448,6 +2448,8 @@ var cockroachdbMetrics = map[string]string{
24482448
"storeliveness_support_from_stores": "storeliveness.support_from.stores",
24492449
"storeliveness_support_withdraw_failures": "storeliveness.support_withdraw.failures",
24502450
"storeliveness_support_withdraw_successes": "storeliveness.support_withdraw.successes",
2451+
"storeliveness_transport_batches_received": "storeliveness.transport.batches_received",
2452+
"storeliveness_transport_batches_sent": "storeliveness.transport.batches_sent",
24512453
"storeliveness_transport_receive_dropped": "storeliveness.transport.receive_dropped",
24522454
"storeliveness_transport_receive_queue_bytes": "storeliveness.transport.receive_queue_bytes",
24532455
"storeliveness_transport_receive_queue_size": "storeliveness.transport.receive_queue_size",

0 commit comments

Comments
 (0)