diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 144eef8a8886..31a5fd755a3d 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -310,6 +310,7 @@ ALL_TESTS = [ "//pkg/multitenant/tenantcostmodel:tenantcostmodel_test", "//pkg/obs/eventagg:eventagg_test", "//pkg/obs/logstream:logstream_test", + "//pkg/obs/resourceattr:resourceattr_test", "//pkg/raft/confchange:confchange_test", "//pkg/raft/quorum:quorum_test", "//pkg/raft/raftpb:raftpb_test", @@ -1676,6 +1677,8 @@ GO_TARGETS = [ "//pkg/obs/eventagg:eventagg_test", "//pkg/obs/logstream:logstream", "//pkg/obs/logstream:logstream_test", + "//pkg/obs/resourceattr:resourceattr", + "//pkg/obs/resourceattr:resourceattr_test", "//pkg/obs:obs", "//pkg/raft/confchange:confchange", "//pkg/raft/confchange:confchange_test", diff --git a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel index 6c29b4e4b3fd..549eeca8b31f 100644 --- a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/keys", "//pkg/kv", + "//pkg/obs/resourceattr", "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index fa77b97db2a4..3fa8c6005bfc 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/obs/resourceattr" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -319,6 +320,7 @@ func (c *rowFetcherCache) RowFetcherForColumnFamily( Spec: &spec, TraceKV: c.rfArgs.traceKV, TraceKVEvery: &util.EveryN{N: c.rfArgs.traceKVLogFrequency}, + WorkloadID: resourceattr.WORKLOAD_ID_CDC, }, ); err != nil { return nil, nil, err diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 4abd62f291f1..49a10d4a0fa1 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -272,6 +272,8 @@ type Streamer struct { // need the mutex protection. numRangesPerScanRequestAccountedFor int64 + workloadID uint64 + mu struct { // If the budget's mutex also needs to be locked, the budget's mutex // must be acquired first. If the results' mutex needs to be locked, @@ -396,6 +398,7 @@ func NewStreamer( lockStrength lock.Strength, lockDurability lock.Durability, reverse bool, + workloadID uint64, ) *Streamer { if txn.Type() != kv.LeafTxn { panic(errors.AssertionFailedf("RootTxn is given to the Streamer")) @@ -423,6 +426,7 @@ func NewStreamer( lockStrength: lockStrength, lockDurability: lockDurability, reverse: reverse, + workloadID: workloadID, } s.metrics.OperatorsCount.Inc(1) @@ -1406,6 +1410,7 @@ func (w *workerCoordinator) performRequestAsync( // regardless of the value of headOfLine. ba.AdmissionHeader.NoMemoryReservedAtSource = false ba.Requests = req.reqs + ba.Header.WorkloadId = w.s.workloadID if buildutil.CrdbTestBuild { if w.s.mode == InOrder { diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index 94518f7011db..2722aa880a9b 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -3022,9 +3022,16 @@ message Header { // evaluated in descending range order. bool is_reverse = 38; + // WorkloadID allows the caller to tag the request with an + // identifier that links is back to the workload entity that triggered + // the Batch. This can be a statement fingerprint ID, transaction + // fingerprint ID, job ID, etc. This ID will be added to profiler + // and trace labels. + uint64 workload_id = 39; + reserved 7, 10, 12, 14, 20, 24; - // Next ID: 39 + // Next ID: 40 } message WriteOptions { diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 05dd33ac47e5..d682905e6b77 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -191,6 +191,7 @@ go_library( "//pkg/multitenant", "//pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer", "//pkg/multitenant/tenantcostmodel", + "//pkg/obs/resourceattr", "//pkg/raft", "//pkg/raft/raftlogger", "//pkg/raft/raftpb", diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index b66a5f1b8e74..e273a030bb51 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/obs/resourceattr" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -664,7 +665,7 @@ func (mgcq *mvccGCQueue) process( ) (processed bool, err error) { // Record the CPU time processing the request for this replica. This is // recorded regardless of errors that are encountered. - defer repl.MeasureReqCPUNanos(ctx, grunning.Time()) + defer repl.MeasureReqCPUNanos(ctx, grunning.Time(), resourceattr.WORKLOAD_ID_MVCC_GC) // Lookup the descriptor and GC policy for the zone containing this key range. desc, conf := repl.DescAndSpanConfig() diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 0701176bcba5..17300a3392af 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" + "github.com/cockroachdb/cockroach/pkg/obs/resourceattr" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -1083,6 +1084,8 @@ type Replica struct { // changes, leaseholder changes, and periodically at the interval of // kv.closed_timestamp.policy_refresh_interval by PolicyRefresher. cachedClosedTimestampPolicy atomic.Pointer[ctpb.RangeClosedTimestampPolicy] + + resourceAttr *resourceattr.ResourceAttr } // String returns the string representation of the replica using an @@ -2760,7 +2763,7 @@ func init() { // MeasureReqCPUNanos measures the cpu time spent on this replica processing // requests. -func (r *Replica) MeasureReqCPUNanos(ctx context.Context, start time.Duration) { +func (r *Replica) MeasureReqCPUNanos(ctx context.Context, start time.Duration, workloadID uint64) { r.measureNanosRunning(start, func(dur float64) { r.loadStats.RecordReqCPUNanos(dur) // NB: the caller also has a tenant ID, but we use the replica's here for @@ -2777,6 +2780,8 @@ func (r *Replica) MeasureReqCPUNanos(ctx context.Context, start time.Duration) { tm.ReqCPUNanos.Inc(dur) r.store.metrics.releaseTenant(ctx, tm) } + + r.resourceAttr.Record(workloadID, dur) }) } diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 6d3bbe7bea3f..3ff9acfaae3c 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -145,6 +145,7 @@ func newUninitializedReplicaWithoutRaftGroup(store *Store, id roachpb.FullReplic TxnWaitKnobs: store.TestingKnobs().TxnWaitKnobs, }), allocatorToken: &plan.AllocatorToken{}, + resourceAttr: store.resourceAttr, } r.sideTransportClosedTimestamp.init(store.cfg.ClosedTimestampReceiver, id.RangeID) r.cachedClosedTimestampPolicy.Store(new(ctpb.RangeClosedTimestampPolicy)) diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 78f596f15dca..5e6e84930d78 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -128,7 +128,7 @@ func (r *Replica) SendWithWriteBytes( // Record the CPU time processing the request for this replica. This is // recorded regardless of errors that are encountered. startCPU := grunning.Time() - defer r.MeasureReqCPUNanos(ctx, startCPU) + defer r.MeasureReqCPUNanos(ctx, startCPU, ba.WorkloadId) if r.store.cfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels { defer pprof.SetGoroutineLabels(ctx) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 99f607768a54..b872abf96fba 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -63,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnrecovery" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitiesauthorizer" + "github.com/cockroachdb/cockroach/pkg/obs/resourceattr" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftlogger" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" @@ -1161,6 +1162,8 @@ type Store struct { // diskMonitor provides metrics for the disk associated with this store. diskMonitor *disk.Monitor + + resourceAttr *resourceattr.ResourceAttr } var _ kv.Sender = &Store{} @@ -1354,6 +1357,8 @@ type StoreConfig struct { // RangeCount is populated by the node and represents the total number of // ranges this node has. RangeCount *atomic.Int64 + + ResourceAttr *resourceattr.ResourceAttr } // logRangeAndNodeEventsEnabled is used to enable or disable logging range events @@ -1504,6 +1509,7 @@ func NewStore( nodeCapacityProvider: cfg.NodeCapacityProvider, ioThresholds: &iot, rangeFeedSlowClosedTimestampNudge: singleflight.NewGroup("rangfeed-ct-nudge", "range"), + resourceAttr: cfg.ResourceAttr, } s.ioThreshold.t = &admissionpb.IOThreshold{} // Track the maxScore over the last 5 minutes, in one minute windows. diff --git a/pkg/obs/resourceattr/BUILD.bazel b/pkg/obs/resourceattr/BUILD.bazel new file mode 100644 index 000000000000..77877000e43b --- /dev/null +++ b/pkg/obs/resourceattr/BUILD.bazel @@ -0,0 +1,22 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "resourceattr", + srcs = ["resourceattr.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/obs/resourceattr", + visibility = ["//visibility:public"], + deps = [ + "//pkg/sql/appstatspb", + "//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil", + "//pkg/util/log", + "//pkg/util/stop", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + ], +) + +go_test( + name = "resourceattr_test", + srcs = ["resourceattr_test.go"], + embed = [":resourceattr"], +) diff --git a/pkg/obs/resourceattr/resourceattr.go b/pkg/obs/resourceattr/resourceattr.go new file mode 100644 index 000000000000..5c7683955fe3 --- /dev/null +++ b/pkg/obs/resourceattr/resourceattr.go @@ -0,0 +1,178 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package resourceattr + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +const ( + WORKLOAD_ID_UNKNOWN = iota + WORKLOAD_ID_BULKIO + WORKLOAD_ID_BACKUP + WORKLOAD_ID_RESTORE + WORKLOAD_ID_IMPORT + WORKLOAD_ID_CDC + WORKLOAD_ID_JOB + WORKLOAD_ID_INTERNAL_UNKNOWN + WORKLOAD_ID_SUBQUERY + WORKLOAD_ID_BACKFILL + WORKLOAD_ID_SCHEMA_CHANGE + WORKLOAD_ID_MVCC_GC +) + +var workloadIDToName = map[uint64]string{ + WORKLOAD_ID_UNKNOWN: "UNKNOWN", + WORKLOAD_ID_BULKIO: "BULKIO", + WORKLOAD_ID_BACKUP: "BACKUP", + WORKLOAD_ID_RESTORE: "RESTORE", + WORKLOAD_ID_IMPORT: "IMPORT", + WORKLOAD_ID_CDC: "CDC", + WORKLOAD_ID_JOB: "JOB", + WORKLOAD_ID_INTERNAL_UNKNOWN: "INTERNAL_UNKNOWN", + WORKLOAD_ID_SUBQUERY: "SUBQUERY", + WORKLOAD_ID_BACKFILL: "BACKFILL", + WORKLOAD_ID_SCHEMA_CHANGE: "SCHEMA_CHANGE", + WORKLOAD_ID_MVCC_GC: "MVCC_GC", +} + +type ResourceAttr struct { + syncutil.Mutex + workloads []uint64 + cpuTimes []float64 + id int +} + +func NewResourceAttr(size int, stopper *stop.Stopper) *ResourceAttr { + r := &ResourceAttr{ + workloads: make([]uint64, size), + cpuTimes: make([]float64, size), + } + + err := stopper.RunAsyncTask(context.Background(), "resource-attr-reporter", func(ctx context.Context) { + timer := timeutil.Timer{} + defer timer.Stop() + timer.Reset(time.Second * 10) + + for { + select { + case <-stopper.ShouldQuiesce(): + return + case <-ctx.Done(): + return + case <-timer.C: + log.Dev.Warningf(ctx, "~~~~~ WORKLOAD SUMMARY:%s", r.SummaryAndClear()) + timer.Reset(time.Second * 10) + } + } + }) + if err != nil { + panic(err) + } + + return r +} + +func (ra *ResourceAttr) Record(workloadID uint64, cpuTimeNanos float64) { + ra.Lock() + defer ra.Unlock() + ra.workloads[ra.id] = workloadID + ra.cpuTimes[ra.id] = cpuTimeNanos + ra.id++ + if ra.id == len(ra.workloads) { + ra.id = 0 + } +} + +type ResourceAttrSum struct { + workloadCpu map[uint64]float64 +} + +func (ra *ResourceAttr) SummaryAndClear() *ResourceAttrSum { + sum := &ResourceAttrSum{ + workloadCpu: make(map[uint64]float64), + } + + ra.Lock() + defer ra.Unlock() + for i := range ra.workloads { + sum.workloadCpu[ra.workloads[i]] += ra.cpuTimes[i] + ra.workloads[i] = 0 + ra.cpuTimes[i] = 0 + } + ra.id = 0 + return sum +} + +type workloadEntry struct { + workloadID uint64 + name string + cpuTime float64 +} + +func (ras *ResourceAttrSum) String() string { + if len(ras.workloadCpu) == 0 { + return "(empty)" + } + + // Collect entries and find max workload name length for alignment + entries := make([]workloadEntry, 0, len(ras.workloadCpu)) + maxLen := 0 + for workloadID, cpuTime := range ras.workloadCpu { + name := workloadName(workloadID) + entries = append(entries, workloadEntry{ + workloadID: workloadID, + name: name, + cpuTime: cpuTime, + }) + if len(name) > maxLen { + maxLen = len(name) + } + } + + // Sort by CPU time descending (highest first) + sort.Slice(entries, func(i, j int) bool { + return entries[i].cpuTime > entries[j].cpuTime + }) + + var b strings.Builder + b.WriteString("\n") + for _, entry := range entries { + fmt.Fprintf(&b, " Workload %-*s CPU: %s\n", maxLen, entry.name, formatCPUTime(entry.cpuTime)) + } + return b.String() +} + +func formatCPUTime(cpuTimeNanos float64) string { + switch { + case cpuTimeNanos >= 1e9: + return fmt.Sprintf("%8.2f s", cpuTimeNanos/1e9) + case cpuTimeNanos >= 1e6: + return fmt.Sprintf("%8.2f ms", cpuTimeNanos/1e6) + case cpuTimeNanos >= 1e3: + return fmt.Sprintf("%8.2f µs", cpuTimeNanos/1e3) + default: + return fmt.Sprintf("%8.2f ns", cpuTimeNanos) + } +} + +func workloadName(workloadID uint64) string { + if name, ok := workloadIDToName[workloadID]; ok { + return name + } + return fmt.Sprintf("STATEMENT: %s", sqlstatsutil.EncodeStmtFingerprintIDToString(appstatspb.StmtFingerprintID(workloadID))) +} diff --git a/pkg/obs/resourceattr/resourceattr_test.go b/pkg/obs/resourceattr/resourceattr_test.go new file mode 100644 index 000000000000..d556525f3ec0 --- /dev/null +++ b/pkg/obs/resourceattr/resourceattr_test.go @@ -0,0 +1,28 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package resourceattr + +import ( + "testing" +) + +func BenchmarkResourceAttrRecord(b *testing.B) { + // Create ResourceAttr directly without background task for clean benchmarking + ra := &ResourceAttr{ + workloads: make([]uint64, 1000), + cpuTimes: make([]float64, 1000), + } + + workloadID := uint64(12345) + cpuTime := float64(1000000) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + ra.Record(workloadID, cpuTime) + } +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 3d489bd7eac0..8eebf6cc3975 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -166,6 +166,7 @@ go_library( "//pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher", "//pkg/multitenant/tenantcapabilitiespb", "//pkg/multitenant/tenantcostmodel", + "//pkg/obs/resourceattr", "//pkg/raft", "//pkg/raft/raftpb", "//pkg/roachpb", diff --git a/pkg/server/node.go b/pkg/server/node.go index d252f0f661e0..345aac0ede69 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -12,6 +12,7 @@ import ( "io" "math" "net" + "runtime/trace" "sort" "strings" "sync/atomic" @@ -38,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher" + "github.com/cockroachdb/cockroach/pkg/obs/resourceattr" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/rpcbase" "github.com/cockroachdb/cockroach/pkg/server/license" @@ -423,6 +425,8 @@ type Node struct { // licenseEnforcer is used to enforce license policies on the cluster licenseEnforcer *license.Enforcer + + workloadObserver *resourceattr.ResourceAttr } var _ kvpb.InternalServer = &Node{} @@ -596,11 +600,13 @@ func NewNode( spanStatsCollector: spanstatscollector.New(cfg.Settings), proxySender: proxySender, licenseEnforcer: licenseEnforcer, + workloadObserver: resourceattr.NewResourceAttr(10000, stopper), } n.perConsumerCatchupLimiterMu.limiters = make(map[int64]*perConsumerLimiter) n.diskSlowCoalescerMu.lastDiskSlow = make(map[roachpb.StoreID]time.Time) n.versionUpdateMu.updateCh = make(chan struct{}) n.perReplicaServer = kvserver.MakeServer(&n.Descriptor, n.stores) + n.storeCfg.ResourceAttr = n.workloadObserver return n } @@ -1838,16 +1844,31 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR ctx = logtags.AddTag(ctx, "tenant", tenantID) } + // TODO(davidh): SOME BS HERE TO ALSO SET THE PROFILE LABEL + wID := fmt.Sprintf("%d", args.WorkloadId) // If the node is collecting a CPU profile with labels, and the sender has set // pprof labels in the BatchRequest, then we apply them to the context that is // going to execute the BatchRequest. These labels will help correlate server // side CPU profile samples to the sender. if len(args.ProfileLabels) != 0 && n.execCfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels { var undo func() - ctx, undo = pprofutil.SetProfilerLabels(ctx, args.ProfileLabels...) + lbls := append(args.ProfileLabels, "workloadID", wID) + ctx, undo = pprofutil.SetProfilerLabels(ctx, lbls...) defer undo() } + for i, v := range args.ProfileLabels { + if v == "stmt.fingerprint.id" && wID == "0" { + log.Dev.Warningf(ctx, "~~~~~~~~ WORKLOAD ID IS MISSING FINGERPRINT: %s", args.ProfileLabels[i+1]) + break + } + } + + var region *trace.Region + if trace.IsEnabled() { + region = trace.StartRegion(ctx, fmt.Sprintf("Workload: %s", wID)) + } + // Requests from tenants don't have gateway node id set but are required for // the QPS based rebalancing to work. The GatewayNodeID is used as a proxy // for the locality of the origin of the request. The replica stats aggregate @@ -1859,6 +1880,9 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR br, err := n.batchInternal(ctx, tenantID, args) + if region != nil { + region.End() + } // We always return errors via BatchResponse.Error so structure is // preserved; plain errors are presumed to be from the RPC // framework and not from cockroach. diff --git a/pkg/server/status.go b/pkg/server/status.go index 11b8919ed97b..adcaa9b9d791 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -379,6 +379,7 @@ func (b *baseStatusServer) ListLocalDistSQLFlows( NodeID: nodeIDOrZero, Timestamp: f.Timestamp, Stmt: f.StatementSQL, + }}, }) } diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index fbce34068b7b..a8b0b6a64900 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -350,6 +350,7 @@ go_library( "//pkg/multitenant/multitenantcpu", "//pkg/multitenant/tenantcapabilities", "//pkg/multitenant/tenantcapabilitiespb", + "//pkg/obs/resourceattr", "//pkg/repstream", "//pkg/repstream/streampb", "//pkg/roachpb", diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 9049e83f1706..e47f1ea13867 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -2445,7 +2445,11 @@ func (sc *SchemaChanger) truncateAndBackfillColumns( // It operates entirely on the current goroutine and is thus able to // reuse the planner's kv.Txn safely. func runSchemaChangesInTxn( - ctx context.Context, planner *planner, tableDesc *tabledesc.Mutable, traceKV bool, + ctx context.Context, + planner *planner, + tableDesc *tabledesc.Mutable, + traceKV bool, + workloadID uint64, ) error { if tableDesc.Dropped() { return nil @@ -2501,7 +2505,7 @@ func runSchemaChangesInTxn( if !doneColumnBackfill && catalog.ColumnNeedsBackfill(col) { if err := columnBackfillInTxn( ctx, planner.Txn(), planner.ExecCfg(), planner.EvalContext(), planner.SemaCtx(), - immutDesc, traceKV, + immutDesc, traceKV, workloadID, ); err != nil { return err } @@ -2524,7 +2528,7 @@ func runSchemaChangesInTxn( if !doneColumnBackfill && catalog.ColumnNeedsBackfill(col) { if err := columnBackfillInTxn( ctx, planner.Txn(), planner.ExecCfg(), planner.EvalContext(), planner.SemaCtx(), - immutDesc, traceKV, + immutDesc, traceKV, workloadID, ); err != nil { return err } @@ -2532,7 +2536,7 @@ func runSchemaChangesInTxn( } } else if idx := m.AsIndex(); idx != nil { if err := indexTruncateInTxn( - ctx, planner.InternalSQLTxn(), planner.ExecCfg(), planner.EvalContext(), immutDesc, idx, traceKV, + ctx, planner.InternalSQLTxn(), planner.ExecCfg(), planner.EvalContext(), immutDesc, idx, traceKV, workloadID, ); err != nil { return err } @@ -2889,6 +2893,7 @@ func columnBackfillInTxn( semaCtx *tree.SemaContext, tableDesc catalog.TableDescriptor, traceKV bool, + workloadID uint64, ) error { // A column backfill in the ADD state is a noop. if tableDesc.Adding() { @@ -2915,7 +2920,7 @@ func columnBackfillInTxn( updateChunkSizeThresholdBytes := rowinfra.BytesLimit(columnBackfillUpdateChunkSizeThresholdBytes.Get(&evalCtx.Settings.SV)) const alsoCommit = false sp.Key, err = backfiller.RunColumnBackfillChunk( - ctx, txn, tableDesc, sp, scanBatchSize, updateChunkSizeThresholdBytes, alsoCommit, traceKV, + ctx, txn, tableDesc, sp, scanBatchSize, updateChunkSizeThresholdBytes, alsoCommit, traceKV, workloadID, ) if err != nil { return err @@ -2980,6 +2985,7 @@ func indexTruncateInTxn( tableDesc catalog.TableDescriptor, idx catalog.Index, traceKV bool, + workloadID uint64, ) error { var sp roachpb.Span for done := false; !done; done = sp.Key == nil { @@ -2988,7 +2994,7 @@ func indexTruncateInTxn( evalCtx.SessionData(), &execCfg.Settings.SV, execCfg.GetRowMetrics(evalCtx.SessionData().Internal), ) td := tableDeleter{rd: rd} - if err := td.init(ctx, txn.KV(), evalCtx); err != nil { + if err := td.init(ctx, txn.KV(), evalCtx, workloadID); err != nil { return err } var err error diff --git a/pkg/sql/backfill/BUILD.bazel b/pkg/sql/backfill/BUILD.bazel index 0258f20d3077..481e0dca2a61 100644 --- a/pkg/sql/backfill/BUILD.bazel +++ b/pkg/sql/backfill/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvpb", + "//pkg/obs/resourceattr", "//pkg/roachpb", "//pkg/settings", "//pkg/sql/catalog", diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index e55ac0891019..ab5cec2778ce 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/obs/resourceattr" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -178,6 +179,7 @@ func (cb *ColumnBackfiller) init( Spec: &spec, TraceKV: traceKV, ForceProductionKVBatchSize: cb.evalCtx.TestingKnobs.ForceProductionValues, + WorkloadID: resourceattr.WORKLOAD_ID_BACKFILL, }, ) } @@ -300,6 +302,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( updateChunkSizeThresholdBytes rowinfra.BytesLimit, alsoCommit bool, traceKV bool, + workloadID uint64, ) (roachpb.Key, error) { // TODO(dan): Tighten up the bound on the requestedCols parameter to // makeRowUpdater. @@ -331,7 +334,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk( } // Update the fetcher to use the new txn. - if err := cb.fetcher.SetTxn(txn); err != nil { + if err := cb.fetcher.SetTxn(txn, workloadID); err != nil { log.Dev.Errorf(ctx, "scan error during SetTxn: %s", err) return roachpb.Key{}, err } @@ -1418,6 +1421,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( Spec: &spec, TraceKV: traceKV, ForceProductionKVBatchSize: ib.evalCtx.TestingKnobs.ForceProductionValues, + WorkloadID: resourceattr.WORKLOAD_ID_BACKFILL, }, ); err != nil { return nil, nil, memUsedPerChunk, err diff --git a/pkg/sql/colfetcher/colbatch_direct_scan.go b/pkg/sql/colfetcher/colbatch_direct_scan.go index f4a54dc2759e..e0fb8f1cd1e9 100644 --- a/pkg/sql/colfetcher/colbatch_direct_scan.go +++ b/pkg/sql/colfetcher/colbatch_direct_scan.go @@ -219,6 +219,7 @@ func NewColBatchDirectScan( kvFetcherMemAcc, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, spec.FetchSpec.External, + flowCtx.WorkloadID, ) var hasDatumVec bool for _, t := range tableArgs.typs { diff --git a/pkg/sql/colfetcher/colbatch_scan.go b/pkg/sql/colfetcher/colbatch_scan.go index 0825fa8ef7dd..9f21690855cb 100644 --- a/pkg/sql/colfetcher/colbatch_scan.go +++ b/pkg/sql/colfetcher/colbatch_scan.go @@ -341,6 +341,7 @@ func NewColBatchScan( kvFetcherMemAcc, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, spec.FetchSpec.External, + flowCtx.WorkloadID, ) fetcher := cFetcherPool.Get().(*cFetcher) shouldCollectStats := execstats.ShouldCollectStats(ctx, flowCtx.CollectStats) diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 2e7979c3bb50..83a768f7313c 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -618,6 +618,7 @@ func NewColIndexJoin( kvFetcherMemAcc, spec.FetchSpec.External, tableArgs.RequiresRawMVCCValues(), + flowCtx.WorkloadID, ) } else { kvFetcher = row.NewKVFetcher( @@ -633,6 +634,7 @@ func NewColIndexJoin( kvFetcherMemAcc, flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, spec.FetchSpec.External, + flowCtx.WorkloadID, ) } diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 5f1528c2180b..1f25aef23070 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -243,6 +243,7 @@ func TestDrainOnlyInputDAG(t *testing.T) { nil, /* localVectorSources */ nil, /* onFlowCleanupEnd */ "", /* statementSQL */ + 0, /* workloadID */ ) vfc := newVectorizedFlowCreator( flowBase, componentCreator, false, /* recordingStats */ diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index e18c0a4247cd..5939ab292f23 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -570,6 +570,11 @@ func (ex *connExecutor) execStmtInOpenState( ctx = ih.Setup(ctx, ex, p, &stmt, os.ImplicitTxn.Get(), ex.state.mu.priority, ex.state.mu.autoRetryCounter) + // TODO(davidh): need to figure out different fingerprinting because + // implicitTxn doesn't make sense to have here, and not sure about + // "database", why not "app" too? + stmt.WorkloadID = uint64(appstatspb.ConstructStatementFingerprintID(stmt.StmtNoConstants, os.ImplicitTxn.Get(), p.SessionData().Database)) + // Note that here we always unconditionally defer a function that takes care // of finishing the instrumentation helper. This is needed since in order to // support plan-gist-matching of the statement diagnostics we might not know @@ -4397,11 +4402,15 @@ func (ex *connExecutor) execWithProfiling( } else { stmtNoConstants = tree.FormatStatementHideConstants(ast) } + // Calculate the statement fingerprint ID + stmtFingerprintId := appstatspb.ConstructStatementFingerprintID( + stmtNoConstants, ex.implicitTxn(), ex.sessionData().Database) labels := pprof.Labels( "appname", ex.sessionData().ApplicationName, "addr", remoteAddr, "stmt.tag", ast.StatementTag(), "stmt.no.constants", stmtNoConstants, + "stmt.fingerprint.id", fmt.Sprintf("%d", stmtFingerprintId), ) pprof.Do(ctx, labels, func(ctx context.Context) { err = op(ctx) diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index caecf6d69873..fc970bfac49e 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -573,7 +573,7 @@ func (n *createTableNode) startExec(params runParams) error { *ti = tableInserter{} tableInserterPool.Put(ti) }() - if err := ti.init(params.ctx, params.p.txn, params.p.EvalContext()); err != nil { + if err := ti.init(params.ctx, params.p.txn, params.p.EvalContext(), params.p.stmt.WorkloadID); err != nil { return err } diff --git a/pkg/sql/delete.go b/pkg/sql/delete.go index 7820c414b0b7..0ceedd37b7d2 100644 --- a/pkg/sql/delete.go +++ b/pkg/sql/delete.go @@ -92,7 +92,7 @@ func (d *deleteNode) startExec(params runParams) error { d.run.init(params, d.columns) - if err := d.run.td.init(params.ctx, params.p.txn, params.EvalContext()); err != nil { + if err := d.run.td.init(params.ctx, params.p.txn, params.EvalContext(), params.p.stmt.WorkloadID); err != nil { return err } diff --git a/pkg/sql/delete_swap.go b/pkg/sql/delete_swap.go index c07228e3470a..8f97d1498ab8 100644 --- a/pkg/sql/delete_swap.go +++ b/pkg/sql/delete_swap.go @@ -44,7 +44,7 @@ func (d *deleteSwapNode) startExec(params runParams) error { d.run.init(params, d.columns) - if err := d.run.td.init(params.ctx, params.p.txn, params.EvalContext()); err != nil { + if err := d.run.td.init(params.ctx, params.p.txn, params.EvalContext(), params.p.stmt.WorkloadID); err != nil { return err } diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 5246eaae0ede..8a10bc7ab515 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -385,7 +385,7 @@ func (ds *ServerImpl) setupFlow( // Create the FlowCtx for the flow. flowCtx := ds.newFlowContext( ctx, req.Flow.FlowID, evalCtx, monitor, diskMonitor, makeLeaf, req.TraceKV, - req.CollectStats, localState, req.Flow.Gateway == ds.NodeID.SQLInstanceID(), + req.CollectStats, localState, req.Flow.Gateway == ds.NodeID.SQLInstanceID(), req.WorkloadID, ) // req always contains the desired vectorize mode, regardless of whether we @@ -417,6 +417,9 @@ func (ds *ServerImpl) setupFlow( if req.StatementSQL != "" { bld.Add("distsql.stmt", req.StatementSQL) } + if req.WorkloadID != 0 { + bld.Add("workload.id", req.WorkloadID) + } bld.Add("distsql.gateway", req.Flow.Gateway) if req.EvalContext.SessionData.ApplicationName != "" { bld.Add("distsql.appname", req.EvalContext.SessionData.ApplicationName) @@ -488,6 +491,7 @@ func (ds *ServerImpl) newFlowContext( collectStats bool, localState LocalState, isGatewayNode bool, + workloadID uint64, ) execinfra.FlowCtx { // TODO(radu): we should sanity check some of these fields. flowCtx := execinfra.FlowCtx{ @@ -504,6 +508,7 @@ func (ds *ServerImpl) newFlowContext( Local: localState.IsLocal, Gateway: isGatewayNode, DiskMonitor: diskMonitor, + WorkloadID: workloadID, } if localState.IsLocal && localState.Collection != nil { diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 93e4f4d8ec2b..f5a81b796d30 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/obs/resourceattr" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/rpc/rpcbase" @@ -415,6 +416,7 @@ func (dsp *DistSQLPlanner) setupFlows( recv *DistSQLReceiver, localState distsql.LocalState, statementSQL string, + workloadID uint64, ) (context.Context, flowinfra.Flow, error) { thisNodeID := dsp.gatewaySQLInstanceID thisNodeSpec, ok := flows[thisNodeID] @@ -471,6 +473,7 @@ func (dsp *DistSQLPlanner) setupFlows( TraceKV: recv.tracing.KVTracingEnabled(), CollectStats: planCtx.collectExecStats, StatementSQL: statementSQL, + WorkloadID: workloadID, } if localState.IsLocal { // VectorizeMode is the only field that the setup code expects to be set @@ -708,6 +711,46 @@ func (dsp *DistSQLPlanner) setupFlows( const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan" const executingParallelAndSerialChecks = "executing %d checks concurrently and %d checks serially" +// TODO(davidh): unclear if this works or is useful. Probably needs rethinking. +func determineWorkloadID(ctx context.Context, planCtx *PlanningCtx, txn *kv.Txn) uint64 { + // First check if this is a user query + if planCtx.planner == nil { + // Not a user query - determine what kind of internal operation + if txn == nil { + return resourceattr.WORKLOAD_ID_BULKIO + } + + // Check for job tag + if jobTag, ok := logtags.FromContext(ctx).GetTag("job"); ok { + jobType := jobTag.ValueStr() + // Parse jobType to get specific workload + if strings.Contains(jobType, "BACKUP") { + return resourceattr.WORKLOAD_ID_BACKUP + } else if strings.Contains(jobType, "RESTORE") { + return resourceattr.WORKLOAD_ID_RESTORE + } else if strings.Contains(jobType, "IMPORT") { + return resourceattr.WORKLOAD_ID_IMPORT + } else if strings.Contains(jobType, "CHANGEFEED") { + return resourceattr.WORKLOAD_ID_CDC + } + return resourceattr.WORKLOAD_ID_JOB + } + + return resourceattr.WORKLOAD_ID_INTERNAL_UNKNOWN + } + + // It's a user query - could further classify + if planCtx.subOrPostQuery { + return resourceattr.WORKLOAD_ID_SUBQUERY + } + + if planCtx.planner != nil { + return planCtx.planner.stmt.WorkloadID + } + + return resourceattr.WORKLOAD_ID_UNKNOWN +} + // Run executes a physical plan. The plan should have been finalized using // FinalizePlan. // @@ -981,6 +1024,7 @@ func (dsp *DistSQLPlanner) Run( if planCtx.planner != nil { statementSQL = planCtx.planner.stmt.StmtNoConstants } + workloadID := determineWorkloadID(ctx, planCtx, txn) var flow flowinfra.Flow var err error @@ -988,7 +1032,7 @@ func (dsp *DistSQLPlanner) Run( flow = i.resumableFlow.flow } else { ctx, flow, err = dsp.setupFlows( - ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL, + ctx, evalCtx, planCtx, leafInputState, flows, recv, localState, statementSQL, workloadID, ) if i != nil { // TODO(yuzefovich): add a check that this flow runs in a single goroutine. diff --git a/pkg/sql/execinfra/flow_context.go b/pkg/sql/execinfra/flow_context.go index 0fb01b73fd5c..8c2c6b7e2521 100644 --- a/pkg/sql/execinfra/flow_context.go +++ b/pkg/sql/execinfra/flow_context.go @@ -97,6 +97,8 @@ type FlowCtx struct { // running EXPLAIN ANALYZE. Currently, it is only used by remote flows. // The gateway flow is handled by the connExecutor. TenantCPUMonitor multitenantcpu.CPUUsageHelper + + WorkloadID uint64 } // NewEvalCtx returns a modifiable copy of the FlowCtx's eval.Context. diff --git a/pkg/sql/execinfrapb/api.proto b/pkg/sql/execinfrapb/api.proto index d39269cf565a..eb8fd97c2f5b 100644 --- a/pkg/sql/execinfrapb/api.proto +++ b/pkg/sql/execinfrapb/api.proto @@ -65,6 +65,13 @@ message SetupFlowRequest { // is populated on a best effort basis. optional string statement_sql = 10 [(gogoproto.nullable) = false, (gogoproto.customname) = "StatementSQL"]; + + // WorkloadID is the identifier for this workload that we can use + // for profiling and tracing. + optional uint64 workload_id = 14 [(gogoproto.nullable) = false, + (gogoproto.customname) = "WorkloadID"]; + + // Next ID: 15. } // FlowSpec describes a "flow" which is a subgraph of a distributed SQL diff --git a/pkg/sql/execinfrapb/data.go b/pkg/sql/execinfrapb/data.go index f73f6138da10..b0be18654ce9 100644 --- a/pkg/sql/execinfrapb/data.go +++ b/pkg/sql/execinfrapb/data.go @@ -323,4 +323,6 @@ type DistSQLRemoteFlowInfo struct { Timestamp time.Time // StatementSQL is the SQL statement for which this flow is executing. StatementSQL string + // WorkloadID is the workload for which this flow is executing. + WorkloadID uint64 } diff --git a/pkg/sql/insert.go b/pkg/sql/insert.go index 16075f56029c..cb0b3d394af0 100644 --- a/pkg/sql/insert.go +++ b/pkg/sql/insert.go @@ -276,7 +276,7 @@ func (n *insertNode) startExec(params runParams) error { n.run.init(params, n.columns) - if err := n.run.ti.init(params.ctx, params.p.txn, params.EvalContext()); err != nil { + if err := n.run.ti.init(params.ctx, params.p.txn, params.EvalContext(), params.p.stmt.WorkloadID); err != nil { return err } diff --git a/pkg/sql/insert_fast_path.go b/pkg/sql/insert_fast_path.go index b062d713baf2..2e8fd85c5ef0 100644 --- a/pkg/sql/insert_fast_path.go +++ b/pkg/sql/insert_fast_path.go @@ -438,7 +438,7 @@ func (n *insertFastPathNode) startExec(params runParams) error { n.run.uniqSpanInfo = make([]insertFastPathFKUniqSpanInfo, 0, maxSpans) } - if err := n.run.ti.init(params.ctx, params.p.txn, params.EvalContext()); err != nil { + if err := n.run.ti.init(params.ctx, params.p.txn, params.EvalContext(), params.p.stmt.WorkloadID); err != nil { return err } diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 2b1a878de492..11e31fef16cf 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -347,6 +347,7 @@ type FetcherInitArgs struct { // row is being processed. In practice, this means that span IDs must be // passed in when SpansCanOverlap is true. SpansCanOverlap bool + WorkloadID uint64 } // Init sets up a Fetcher for a given table and index. @@ -510,6 +511,7 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { forceProductionKVBatchSize: args.ForceProductionKVBatchSize, kvPairsRead: &kvPairsRead, batchRequestsIssued: &batchRequestsIssued, + workloadID: args.WorkloadID, } if args.Txn != nil { fetcherArgs.sendFn = makeSendFunc(args.Txn, args.Spec.External, &batchRequestsIssued) @@ -536,15 +538,15 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { // // Note that this resets the number of batch requests issued by the Fetcher. // Consider using GetBatchRequestsIssued if that information is needed. -func (rf *Fetcher) SetTxn(txn *kv.Txn) error { +func (rf *Fetcher) SetTxn(txn *kv.Txn, workloadID uint64) error { var batchRequestsIssued int64 sendFn := makeSendFunc(txn, rf.args.Spec.External, &batchRequestsIssued) - return rf.setTxnAndSendFn(txn, sendFn) + return rf.setTxnAndSendFn(txn, sendFn, workloadID) } // setTxnAndSendFn peeks inside of the KVFetcher to update the underlying // txnKVFetcher with the new txn and sendFn. -func (rf *Fetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) error { +func (rf *Fetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc, workloadID uint64) error { // Disable buffered writes if any system columns are needed that require // MVCC decoding. if rf.mvccDecodeStrategy == storage.MVCCDecodingRequired { @@ -568,7 +570,7 @@ func (rf *Fetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) error { "unexpectedly the KVBatchFetcher is %T and not *txnKVFetcher", rf.kvFetcher.KVBatchFetcher, ) } - f.setTxnAndSendFn(txn, sendFn) + f.setTxnAndSendFn(txn, sendFn, workloadID) return nil } @@ -660,6 +662,7 @@ func (rf *Fetcher) StartInconsistentScan( batchBytesLimit rowinfra.BytesLimit, rowLimitHint rowinfra.RowLimit, qualityOfService sessiondatapb.QoSLevel, + workloadID uint64, ) error { if rf.args.StreamingKVFetcher != nil { return errors.AssertionFailedf("StartInconsistentScan is called instead of StartScan") @@ -742,7 +745,7 @@ func (rf *Fetcher) StartInconsistentScan( // TODO(radu): we should commit the last txn. Right now the commit is a no-op // on read transactions, but perhaps one day it will release some resources. - if err := rf.setTxnAndSendFn(txn, sendFn); err != nil { + if err := rf.setTxnAndSendFn(txn, sendFn, workloadID); err != nil { return err } diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 600614568696..c441f2e95c7e 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -215,6 +215,10 @@ type txnKVFetcher struct { requestAdmissionHeader kvpb.AdmissionHeader responseAdmissionQ *admission.WorkQueue admissionPacer *admission.Pacer + + // workloadID attaches to batch requests and is used to identify the + // target of the work for profiling and tracing. + workloadID uint64 } var _ KVBatchFetcher = &txnKVFetcher{} @@ -351,6 +355,7 @@ type newTxnKVFetcherArgs struct { kvPairsRead *int64 batchRequestsIssued *int64 rawMVCCValues bool + workloadID uint64 admission struct { // groups AC-related fields requestHeader kvpb.AdmissionHeader @@ -381,6 +386,7 @@ func newTxnKVFetcherInternal(args newTxnKVFetcherArgs) *txnKVFetcher { forceProductionKVBatchSize: args.forceProductionKVBatchSize, requestAdmissionHeader: args.admission.requestHeader, responseAdmissionQ: args.admission.responseQ, + workloadID: args.workloadID, } f.maybeInitAdmissionPacer( @@ -394,10 +400,11 @@ func newTxnKVFetcherInternal(args newTxnKVFetcherArgs) *txnKVFetcher { // setTxnAndSendFn updates the txnKVFetcher with the new txn and sendFn. txn and // sendFn are assumed to be non-nil. -func (f *txnKVFetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) { +func (f *txnKVFetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc, workloadID uint64) { f.sendFn = sendFn f.requestAdmissionHeader = txn.AdmissionHeader() f.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ + f.workloadID = workloadID f.admissionPacer.Close() f.maybeInitAdmissionPacer(txn.AdmissionHeader(), txn.DB().AdmissionPacerFactory, txn.DB().SettingsValues()) @@ -603,6 +610,7 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { ba.Header.TargetBytes = int64(f.batchBytesLimit) ba.Header.MaxSpanRequestKeys = int64(f.getBatchKeyLimit()) ba.Header.IsReverse = f.reverse + ba.Header.WorkloadId = f.workloadID if buildutil.CrdbTestBuild { if f.scanFormat == kvpb.COL_BATCH_RESPONSE && f.indexFetchSpec == nil { return errors.AssertionFailedf("IndexFetchSpec not provided with COL_BATCH_RESPONSE scan format") diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index d652d35625a0..4359bf628996 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -61,6 +61,7 @@ func newTxnKVFetcher( acc *mon.BoundAccount, forceProductionKVBatchSize bool, ext *fetchpb.IndexFetchSpec_ExternalRowData, + workloadID uint64, ) *txnKVFetcher { alloc := new(struct { batchRequestsIssued int64 @@ -112,6 +113,7 @@ func newTxnKVFetcher( forceProductionKVBatchSize: forceProductionKVBatchSize, kvPairsRead: &alloc.kvPairsRead, batchRequestsIssued: &alloc.batchRequestsIssued, + workloadID: workloadID, } fetcherArgs.admission.requestHeader = txn.AdmissionHeader() fetcherArgs.admission.responseQ = txn.DB().SQLKVResponseAdmissionQ @@ -143,10 +145,11 @@ func NewDirectKVBatchFetcher( acc *mon.BoundAccount, forceProductionKVBatchSize bool, ext *fetchpb.IndexFetchSpec_ExternalRowData, + workloadID uint64, ) KVBatchFetcher { f := newTxnKVFetcher( txn, bsHeader, reverse, rawMVCCValues, lockStrength, lockWaitPolicy, lockDurability, - lockTimeout, deadlockTimeout, acc, forceProductionKVBatchSize, ext, + lockTimeout, deadlockTimeout, acc, forceProductionKVBatchSize, ext, workloadID, ) f.scanFormat = kvpb.COL_BATCH_RESPONSE f.indexFetchSpec = spec @@ -172,10 +175,11 @@ func NewKVFetcher( acc *mon.BoundAccount, forceProductionKVBatchSize bool, ext *fetchpb.IndexFetchSpec_ExternalRowData, + workloadID uint64, ) *KVFetcher { return newKVFetcher(newTxnKVFetcher( txn, bsHeader, reverse, rawMVCCValues, lockStrength, lockWaitPolicy, lockDurability, - lockTimeout, deadlockTimeout, acc, forceProductionKVBatchSize, ext, + lockTimeout, deadlockTimeout, acc, forceProductionKVBatchSize, ext, workloadID, )) } @@ -203,6 +207,7 @@ func NewStreamingKVFetcher( kvFetcherMemAcc *mon.BoundAccount, ext *fetchpb.IndexFetchSpec_ExternalRowData, rawMVCCValues bool, + workloadID uint64, ) *KVFetcher { var kvPairsRead int64 var batchRequestsIssued int64 @@ -222,6 +227,7 @@ func NewStreamingKVFetcher( GetKeyLockingStrength(lockStrength), GetKeyLockingDurability(lockDurability), reverse, + workloadID, ) mode := kvstreamer.OutOfOrder if maintainOrdering { diff --git a/pkg/sql/rowexec/columnbackfiller.go b/pkg/sql/rowexec/columnbackfiller.go index 7ab6f3ecbe31..bdee43f954f1 100644 --- a/pkg/sql/rowexec/columnbackfiller.go +++ b/pkg/sql/rowexec/columnbackfiller.go @@ -321,6 +321,7 @@ func (cb *columnBackfiller) runChunk( updateChunkSizeThresholdBytes, true, /*alsoCommit*/ cb.flowCtx.TraceKV, + cb.flowCtx.WorkloadID, ) return err }, isql.WithPriority(admissionpb.BulkNormalPri)) diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index 3b31472b2df5..06c2d42b841b 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -303,6 +303,7 @@ func newInvertedJoiner( Spec: &spec.FetchSpec, TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + WorkloadID: flowCtx.WorkloadID, }, ); err != nil { return nil, err diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index b815872243f8..c203d9f2acfa 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -595,6 +595,7 @@ func newJoinReader( &jr.streamerInfo.txnKVStreamerMemAcc, spec.FetchSpec.External, row.FetchSpecRequiresRawMVCCValues(spec.FetchSpec), + flowCtx.WorkloadID, ) } else { // When not using the Streamer API, we want to limit the batch size hint @@ -624,6 +625,7 @@ func newJoinReader( TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, SpansCanOverlap: jr.spansCanOverlap, + WorkloadID: flowCtx.WorkloadID, }, ); err != nil { return nil, err diff --git a/pkg/sql/rowexec/rowfetcher.go b/pkg/sql/rowexec/rowfetcher.go index c295223ef0ff..0b46e330a9b4 100644 --- a/pkg/sql/rowexec/rowfetcher.go +++ b/pkg/sql/rowexec/rowfetcher.go @@ -34,6 +34,7 @@ type rowFetcher interface { batchBytesLimit rowinfra.BytesLimit, rowLimitHint rowinfra.RowLimit, qualityOfService sessiondatapb.QoSLevel, + workloadID uint64, ) error NextRow(ctx context.Context) (_ rowenc.EncDatumRow, spanID int, _ error) diff --git a/pkg/sql/rowexec/stats.go b/pkg/sql/rowexec/stats.go index 81fdee2ef9af..549a2cd01ae1 100644 --- a/pkg/sql/rowexec/stats.go +++ b/pkg/sql/rowexec/stats.go @@ -115,11 +115,12 @@ func (c *rowFetcherStatCollector) StartInconsistentScan( batchBytesLimit rowinfra.BytesLimit, limitHint rowinfra.RowLimit, qualityOfService sessiondatapb.QoSLevel, + workloadID uint64, ) error { start := timeutil.Now() c.cpuStopWatch.Start() err := c.Fetcher.StartInconsistentScan( - ctx, db, initialTimestamp, maxTimestampAge, spans, batchBytesLimit, limitHint, qualityOfService, + ctx, db, initialTimestamp, maxTimestampAge, spans, batchBytesLimit, limitHint, qualityOfService, workloadID, ) c.startScanStallTime += timeutil.Since(start) c.cpuStopWatch.Stop() diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index aca3d576b971..e3e6158ebeb8 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -151,6 +151,7 @@ func newTableReader( Spec: &spec.FetchSpec, TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + WorkloadID: flowCtx.WorkloadID, }, ); err != nil { return nil, err @@ -220,7 +221,7 @@ func (tr *tableReader) startScan(ctx context.Context) error { initialTS := tr.FlowCtx.Txn.ReadTimestamp() err = tr.fetcher.StartInconsistentScan( ctx, tr.FlowCtx.Cfg.DB.KV(), initialTS, tr.maxTimestampAge, tr.Spans, - bytesLimit, tr.limitHint, tr.FlowCtx.EvalCtx.QualityOfService(), + bytesLimit, tr.limitHint, tr.FlowCtx.EvalCtx.QualityOfService(), tr.FlowCtx.WorkloadID, ) } tr.scanStarted = true diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index 923711b9e7c1..a754ac755db0 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -476,6 +476,7 @@ func (z *zigzagJoiner) setupInfo( Spec: &spec.FetchSpec, TraceKV: flowCtx.TraceKV, ForceProductionKVBatchSize: flowCtx.EvalCtx.TestingKnobs.ForceProductionValues, + WorkloadID: flowCtx.WorkloadID, }, ); err != nil { return err diff --git a/pkg/sql/statement.go b/pkg/sql/statement.go index 2d5705b9be8f..93d97be24fd8 100644 --- a/pkg/sql/statement.go +++ b/pkg/sql/statement.go @@ -37,6 +37,8 @@ type Statement struct { Prepared *prep.Statement QueryTags []sqlcommenter.QueryTag + + WorkloadID uint64 } func makeStatement( diff --git a/pkg/sql/table.go b/pkg/sql/table.go index 2606ef2ab01f..b4c2581b88be 100644 --- a/pkg/sql/table.go +++ b/pkg/sql/table.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/obs/resourceattr" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -356,7 +357,7 @@ func (p *planner) writeTableDescToBatch( if tableDesc.IsNew() { if err := runSchemaChangesInTxn( - ctx, p, tableDesc, p.ExtendedEvalContext().Tracing.KVTracingEnabled(), + ctx, p, tableDesc, p.ExtendedEvalContext().Tracing.KVTracingEnabled(), resourceattr.WORKLOAD_ID_SCHEMA_CHANGE, ); err != nil { return err } diff --git a/pkg/sql/tablewriter.go b/pkg/sql/tablewriter.go index 9d3b16484329..1d2e35834cf1 100644 --- a/pkg/sql/tablewriter.go +++ b/pkg/sql/tablewriter.go @@ -89,6 +89,7 @@ type tableWriterBase struct { // originally written with before being replicated via Logical Data // Replication. originTimestamp hlc.Timestamp + workloadID uint64 } var maxBatchBytes = settings.RegisterByteSizeSetting( @@ -100,7 +101,7 @@ var maxBatchBytes = settings.RegisterByteSizeSetting( // init initializes the tableWriterBase with a Txn. func (tb *tableWriterBase) init( - txn *kv.Txn, tableDesc catalog.TableDescriptor, evalCtx *eval.Context, + txn *kv.Txn, tableDesc catalog.TableDescriptor, evalCtx *eval.Context, workloadID uint64, ) error { if txn.Type() != kv.RootTxn { return errors.AssertionFailedf("unexpectedly non-root txn is used by the table writer") @@ -111,6 +112,7 @@ func (tb *tableWriterBase) init( tb.deadlockTimeout = 0 tb.originID = 0 tb.originTimestamp = hlc.Timestamp{} + tb.workloadID = workloadID if evalCtx != nil { tb.lockTimeout = evalCtx.SessionData().LockTimeout tb.deadlockTimeout = evalCtx.SessionData().DeadlockTimeout @@ -219,6 +221,7 @@ func (tb *tableWriterBase) initNewBatch() { tb.putter.Batch = tb.b tb.b.Header.LockTimeout = tb.lockTimeout tb.b.Header.DeadlockTimeout = tb.deadlockTimeout + tb.b.Header.WorkloadId = tb.workloadID if tb.originID != 0 { tb.b.Header.WriteOptions = &kvpb.WriteOptions{ OriginID: tb.originID, diff --git a/pkg/sql/tablewriter_delete.go b/pkg/sql/tablewriter_delete.go index c936b4caad68..7894ff2aa73d 100644 --- a/pkg/sql/tablewriter_delete.go +++ b/pkg/sql/tablewriter_delete.go @@ -26,8 +26,10 @@ type tableDeleter struct { } // init initializes the tableDeleter with a Txn. -func (td *tableDeleter) init(_ context.Context, txn *kv.Txn, evalCtx *eval.Context) error { - return td.tableWriterBase.init(txn, td.tableDesc(), evalCtx) +func (td *tableDeleter) init( + _ context.Context, txn *kv.Txn, evalCtx *eval.Context, workloadID uint64, +) error { + return td.tableWriterBase.init(txn, td.tableDesc(), evalCtx, workloadID) } // row performs a delete. diff --git a/pkg/sql/tablewriter_insert.go b/pkg/sql/tablewriter_insert.go index f219117f8b7b..79f92f7b4e82 100644 --- a/pkg/sql/tablewriter_insert.go +++ b/pkg/sql/tablewriter_insert.go @@ -22,8 +22,10 @@ type tableInserter struct { } // init initializes the tableInserter with a Txn. -func (ti *tableInserter) init(_ context.Context, txn *kv.Txn, evalCtx *eval.Context) error { - return ti.tableWriterBase.init(txn, ti.tableDesc(), evalCtx) +func (ti *tableInserter) init( + _ context.Context, txn *kv.Txn, evalCtx *eval.Context, workloadID uint64, +) error { + return ti.tableWriterBase.init(txn, ti.tableDesc(), evalCtx, workloadID) } // row performs an insert. diff --git a/pkg/sql/tablewriter_update.go b/pkg/sql/tablewriter_update.go index b11e677c6ac4..67e412b0d9c2 100644 --- a/pkg/sql/tablewriter_update.go +++ b/pkg/sql/tablewriter_update.go @@ -22,8 +22,10 @@ type tableUpdater struct { } // init initializes the tableUpdater with a Txn. -func (tu *tableUpdater) init(_ context.Context, txn *kv.Txn, evalCtx *eval.Context) error { - return tu.tableWriterBase.init(txn, tu.tableDesc(), evalCtx) +func (tu *tableUpdater) init( + _ context.Context, txn *kv.Txn, evalCtx *eval.Context, workloadID uint64, +) error { + return tu.tableWriterBase.init(txn, tu.tableDesc(), evalCtx, workloadID) } // rowForUpdate performs an update. diff --git a/pkg/sql/tablewriter_upsert.go b/pkg/sql/tablewriter_upsert.go index defbe8d78d04..1db4eea7d12f 100644 --- a/pkg/sql/tablewriter_upsert.go +++ b/pkg/sql/tablewriter_upsert.go @@ -87,8 +87,10 @@ type tableUpserter struct { } // init initializes the tableUpserter with a Txn. -func (tu *tableUpserter) init(ctx context.Context, txn *kv.Txn, evalCtx *eval.Context) error { - if err := tu.tableWriterBase.init(txn, tu.ri.Helper.TableDesc, evalCtx); err != nil { +func (tu *tableUpserter) init( + ctx context.Context, txn *kv.Txn, evalCtx *eval.Context, workloadID uint64, +) error { + if err := tu.tableWriterBase.init(txn, tu.ri.Helper.TableDesc, evalCtx, workloadID); err != nil { return err } diff --git a/pkg/sql/update.go b/pkg/sql/update.go index 4e854a555af6..23322dc89844 100644 --- a/pkg/sql/update.go +++ b/pkg/sql/update.go @@ -100,7 +100,7 @@ func (u *updateNode) startExec(params runParams) error { u.run.init(params, u.columns) - if err := u.run.tu.init(params.ctx, params.p.txn, params.EvalContext()); err != nil { + if err := u.run.tu.init(params.ctx, params.p.txn, params.EvalContext(), params.p.stmt.WorkloadID); err != nil { return err } diff --git a/pkg/sql/update_swap.go b/pkg/sql/update_swap.go index 40c2404bb2e4..f3e0984cf18c 100644 --- a/pkg/sql/update_swap.go +++ b/pkg/sql/update_swap.go @@ -44,7 +44,7 @@ func (u *updateSwapNode) startExec(params runParams) error { u.run.init(params, u.columns) - if err := u.run.tu.init(params.ctx, params.p.txn, params.EvalContext()); err != nil { + if err := u.run.tu.init(params.ctx, params.p.txn, params.EvalContext(), params.p.stmt.WorkloadID); err != nil { return err } diff --git a/pkg/sql/upsert.go b/pkg/sql/upsert.go index 61a985f9f775..ba7bf7a4d992 100644 --- a/pkg/sql/upsert.go +++ b/pkg/sql/upsert.go @@ -53,7 +53,7 @@ func (r *upsertRun) init(params runParams) error { if ots := params.extendedEvalCtx.SessionData().OriginTimestampForLogicalDataReplication; ots.IsSet() { r.originTimestampCPutHelper.OriginTimestamp = ots } - return r.tw.init(params.ctx, params.p.txn, params.EvalContext()) + return r.tw.init(params.ctx, params.p.txn, params.EvalContext(), params.p.stmt.WorkloadID) } func (n *upsertNode) startExec(params runParams) error {