Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ ALL_TESTS = [
"//pkg/multitenant/tenantcostmodel:tenantcostmodel_test",
"//pkg/obs/eventagg:eventagg_test",
"//pkg/obs/logstream:logstream_test",
"//pkg/obs/resourceattr:resourceattr_test",
"//pkg/raft/confchange:confchange_test",
"//pkg/raft/quorum:quorum_test",
"//pkg/raft/raftpb:raftpb_test",
Expand Down Expand Up @@ -1676,6 +1677,8 @@ GO_TARGETS = [
"//pkg/obs/eventagg:eventagg_test",
"//pkg/obs/logstream:logstream",
"//pkg/obs/logstream:logstream_test",
"//pkg/obs/resourceattr:resourceattr",
"//pkg/obs/resourceattr:resourceattr_test",
"//pkg/obs:obs",
"//pkg/raft/confchange:confchange",
"//pkg/raft/confchange:confchange_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdcevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/keys",
"//pkg/kv",
"//pkg/obs/resourceattr",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/obs/resourceattr"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -319,6 +320,7 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily(
Spec: &spec,
TraceKV: c.rfArgs.traceKV,
TraceKVEvery: &util.EveryN{N: c.rfArgs.traceKVLogFrequency},
WorkloadID: resourceattr.WORKLOAD_ID_CDC,
},
); err != nil {
return nil, nil, err
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ type Streamer struct {
// need the mutex protection.
numRangesPerScanRequestAccountedFor int64

workloadID uint64

mu struct {
// If the budget's mutex also needs to be locked, the budget's mutex
// must be acquired first. If the results' mutex needs to be locked,
Expand Down Expand Up @@ -396,6 +398,7 @@ func NewStreamer(
lockStrength lock.Strength,
lockDurability lock.Durability,
reverse bool,
workloadID uint64,
) *Streamer {
if txn.Type() != kv.LeafTxn {
panic(errors.AssertionFailedf("RootTxn is given to the Streamer"))
Expand Down Expand Up @@ -423,6 +426,7 @@ func NewStreamer(
lockStrength: lockStrength,
lockDurability: lockDurability,
reverse: reverse,
workloadID: workloadID,
}
s.metrics.OperatorsCount.Inc(1)

Expand Down Expand Up @@ -1406,6 +1410,7 @@ func (w *workerCoordinator) performRequestAsync(
// regardless of the value of headOfLine.
ba.AdmissionHeader.NoMemoryReservedAtSource = false
ba.Requests = req.reqs
ba.Header.WorkloadId = w.s.workloadID

if buildutil.CrdbTestBuild {
if w.s.mode == InOrder {
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3022,9 +3022,16 @@ message Header {
// evaluated in descending range order.
bool is_reverse = 38;

// WorkloadID allows the caller to tag the request with an
// identifier that links is back to the workload entity that triggered
// the Batch. This can be a statement fingerprint ID, transaction
// fingerprint ID, job ID, etc. This ID will be added to profiler
// and trace labels.
uint64 workload_id = 39;

reserved 7, 10, 12, 14, 20, 24;

// Next ID: 39
// Next ID: 40
}

message WriteOptions {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ go_library(
"//pkg/multitenant",
"//pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer",
"//pkg/multitenant/tenantcostmodel",
"//pkg/obs/resourceattr",
"//pkg/raft",
"//pkg/raft/raftlogger",
"//pkg/raft/raftpb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/obs/resourceattr"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -664,7 +665,7 @@ func (mgcq *mvccGCQueue) process(
) (processed bool, err error) {
// Record the CPU time processing the request for this replica. This is
// recorded regardless of errors that are encountered.
defer repl.MeasureReqCPUNanos(ctx, grunning.Time())
defer repl.MeasureReqCPUNanos(ctx, grunning.Time(), resourceattr.WORKLOAD_ID_MVCC_GC)

// Lookup the descriptor and GC policy for the zone containing this key range.
desc, conf := repl.DescAndSpanConfig()
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate"
"github.com/cockroachdb/cockroach/pkg/obs/resourceattr"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -1083,6 +1084,8 @@ type Replica struct {
// changes, leaseholder changes, and periodically at the interval of
// kv.closed_timestamp.policy_refresh_interval by PolicyRefresher.
cachedClosedTimestampPolicy atomic.Pointer[ctpb.RangeClosedTimestampPolicy]

resourceAttr *resourceattr.ResourceAttr
}

// String returns the string representation of the replica using an
Expand Down Expand Up @@ -2760,7 +2763,7 @@ func init() {

// MeasureReqCPUNanos measures the cpu time spent on this replica processing
// requests.
func (r *Replica) MeasureReqCPUNanos(ctx context.Context, start time.Duration) {
func (r *Replica) MeasureReqCPUNanos(ctx context.Context, start time.Duration, workloadID uint64) {
r.measureNanosRunning(start, func(dur float64) {
r.loadStats.RecordReqCPUNanos(dur)
// NB: the caller also has a tenant ID, but we use the replica's here for
Expand All @@ -2777,6 +2780,8 @@ func (r *Replica) MeasureReqCPUNanos(ctx context.Context, start time.Duration) {
tm.ReqCPUNanos.Inc(dur)
r.store.metrics.releaseTenant(ctx, tm)
}

r.resourceAttr.Record(workloadID, dur)
})
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func newUninitializedReplicaWithoutRaftGroup(store *Store, id roachpb.FullReplic
TxnWaitKnobs: store.TestingKnobs().TxnWaitKnobs,
}),
allocatorToken: &plan.AllocatorToken{},
resourceAttr: store.resourceAttr,
}
r.sideTransportClosedTimestamp.init(store.cfg.ClosedTimestampReceiver, id.RangeID)
r.cachedClosedTimestampPolicy.Store(new(ctpb.RangeClosedTimestampPolicy))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (r *Replica) SendWithWriteBytes(
// Record the CPU time processing the request for this replica. This is
// recorded regardless of errors that are encountered.
startCPU := grunning.Time()
defer r.MeasureReqCPUNanos(ctx, startCPU)
defer r.MeasureReqCPUNanos(ctx, startCPU, ba.WorkloadId)

if r.store.cfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels {
defer pprof.SetGoroutineLabels(ctx)
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnrecovery"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer"
"github.com/cockroachdb/cockroach/pkg/obs/resourceattr"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftlogger"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
Expand Down Expand Up @@ -1161,6 +1162,8 @@ type Store struct {

// diskMonitor provides metrics for the disk associated with this store.
diskMonitor *disk.Monitor

resourceAttr *resourceattr.ResourceAttr
}

var _ kv.Sender = &Store{}
Expand Down Expand Up @@ -1354,6 +1357,8 @@ type StoreConfig struct {
// RangeCount is populated by the node and represents the total number of
// ranges this node has.
RangeCount *atomic.Int64

ResourceAttr *resourceattr.ResourceAttr
}

// logRangeAndNodeEventsEnabled is used to enable or disable logging range events
Expand Down Expand Up @@ -1504,6 +1509,7 @@ func NewStore(
nodeCapacityProvider: cfg.NodeCapacityProvider,
ioThresholds: &iot,
rangeFeedSlowClosedTimestampNudge: singleflight.NewGroup("rangfeed-ct-nudge", "range"),
resourceAttr: cfg.ResourceAttr,
}
s.ioThreshold.t = &admissionpb.IOThreshold{}
// Track the maxScore over the last 5 minutes, in one minute windows.
Expand Down
22 changes: 22 additions & 0 deletions pkg/obs/resourceattr/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "resourceattr",
srcs = ["resourceattr.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/obs/resourceattr",
visibility = ["//visibility:public"],
deps = [
"//pkg/sql/appstatspb",
"//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
],
)

go_test(
name = "resourceattr_test",
srcs = ["resourceattr_test.go"],
embed = [":resourceattr"],
)
Loading