Skip to content

Commit 9cf4564

Browse files
committed
Add metrics for subscription queries
We have some metrics measuring various lower level things like index scans, but at the moment we have no easy way to figure out which columns might need an index. This commit introduces three new metrics that provide that information by labeling count, latency, and number of rows canned along with the scan type (index scan, table scan, mixed scan) and info about unindexed columns.
1 parent 6bd5572 commit 9cf4564

File tree

4 files changed

+211
-27
lines changed

4 files changed

+211
-27
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use spacetimedb_physical_plan::plan::PhysicalPlan;
2+
use spacetimedb_schema::schema::TableSchema;
3+
use std::sync::Arc;
4+
5+
/// Scan strategy types for subscription queries
6+
#[derive(Debug, Clone, Copy)]
7+
enum ScanStrategy {
8+
/// Full table scan - no indexes used
9+
Sequential,
10+
/// Uses index but requires post-filtering on non-indexed columns
11+
IndexedWithFilter,
12+
/// Fully indexed - no post-filtering needed
13+
FullyIndexed,
14+
/// Mixed strategy (combination of index and table scans)
15+
Mixed,
16+
/// Unknown/other strategy
17+
Unknown,
18+
}
19+
20+
/// Metrics data for a single subscription query execution
21+
#[derive(Debug)]
22+
pub struct QueryMetrics {
23+
pub scan_type: String,
24+
pub table_name: String,
25+
pub unindexed_columns: String,
26+
pub rows_scanned: u64,
27+
pub execution_time_micros: u64,
28+
}
29+
30+
impl std::fmt::Display for ScanStrategy {
31+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32+
match self {
33+
Self::Sequential => write!(f, "sequential"),
34+
Self::IndexedWithFilter => write!(f, "indexed_with_filter"),
35+
Self::FullyIndexed => write!(f, "fully_indexed"),
36+
Self::Mixed => write!(f, "mixed"),
37+
Self::Unknown => write!(f, "unknown"),
38+
}
39+
}
40+
}
41+
42+
/// Recursively extracts column names from filter expressions
43+
fn extract_columns(
44+
expr: &spacetimedb_physical_plan::plan::PhysicalExpr,
45+
schema: Option<&Arc<TableSchema>>,
46+
columns: &mut Vec<String>,
47+
) {
48+
use spacetimedb_physical_plan::plan::PhysicalExpr;
49+
50+
match expr {
51+
PhysicalExpr::Field(tuple_field) => {
52+
let col_name = schema
53+
.and_then(|s| s.columns.get(tuple_field.field_pos))
54+
.map(|col| col.col_name.to_string())
55+
.unwrap_or_else(|| format!("col_{}", tuple_field.field_pos));
56+
columns.push(col_name);
57+
}
58+
PhysicalExpr::BinOp(_, lhs, rhs) => {
59+
extract_columns(lhs, schema, columns);
60+
extract_columns(rhs, schema, columns);
61+
}
62+
PhysicalExpr::LogOp(_, exprs) => {
63+
for expr in exprs {
64+
extract_columns(expr, schema, columns);
65+
}
66+
}
67+
PhysicalExpr::Value(_) => {}
68+
}
69+
}
70+
71+
/// Analyzes subscription scan strategy and creates QueryMetrics
72+
pub fn get_query_metrics(
73+
table_name: &str,
74+
plan: &PhysicalPlan,
75+
rows_scanned: u64,
76+
execution_time_micros: u64,
77+
) -> QueryMetrics {
78+
let has_table_scan = plan.any(&|p| matches!(p, PhysicalPlan::TableScan(..)));
79+
let has_index_scan = plan.any(&|p| matches!(p, PhysicalPlan::IxScan(..)));
80+
let has_post_filter = plan.any(&|p| matches!(p, PhysicalPlan::Filter(..)));
81+
82+
let strategy = if has_table_scan && has_index_scan {
83+
ScanStrategy::Mixed
84+
} else if has_table_scan {
85+
ScanStrategy::Sequential
86+
} else if has_index_scan && has_post_filter {
87+
ScanStrategy::IndexedWithFilter
88+
} else if has_index_scan {
89+
ScanStrategy::FullyIndexed
90+
} else {
91+
ScanStrategy::Unknown
92+
};
93+
94+
// Extract the schema from the plan
95+
let mut schema: Option<Arc<TableSchema>> = None;
96+
plan.visit(&mut |p| match p {
97+
PhysicalPlan::TableScan(scan, _) => {
98+
schema = Some(scan.schema.clone());
99+
}
100+
PhysicalPlan::IxScan(scan, _) => {
101+
schema = Some(scan.schema.clone());
102+
}
103+
_ => {}
104+
});
105+
106+
let mut columns = Vec::new();
107+
plan.visit(&mut |p| {
108+
if let PhysicalPlan::Filter(_, expr) = p {
109+
extract_columns(expr, schema.as_ref(), &mut columns);
110+
}
111+
});
112+
113+
QueryMetrics {
114+
scan_type: strategy.to_string(),
115+
table_name: table_name.to_string(),
116+
unindexed_columns: columns.join(","),
117+
rows_scanned,
118+
execution_time_micros,
119+
}
120+
}

crates/core/src/subscription/mod.rs

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilder as _};
22
use crate::{error::DBError, worker_metrics::WORKER_METRICS};
33
use anyhow::Result;
4+
use metrics::QueryMetrics;
45
use module_subscription_manager::Plan;
56
use prometheus::IntCounter;
67
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
@@ -19,6 +20,7 @@ use std::sync::Arc;
1920

2021
pub mod delta;
2122
pub mod execution_unit;
23+
pub mod metrics;
2224
pub mod module_subscription_actor;
2325
pub mod module_subscription_manager;
2426
pub mod query;
@@ -239,7 +241,7 @@ pub fn execute_plans<Tx, F>(
239241
plans: &[Arc<Plan>],
240242
tx: &Tx,
241243
update_type: TableUpdateType,
242-
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics), DBError>
244+
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics, Vec<QueryMetrics>), DBError>
243245
where
244246
Tx: Datastore + DeltaStore + Sync,
245247
F: BuildableWebsocketFormat,
@@ -257,21 +259,33 @@ where
257259
.map(|(sql, plan, table_id, table_name)| (sql, plan.optimize(auth), table_id, table_name))
258260
.map(|(sql, plan, table_id, table_name)| {
259261
plan.and_then(|plan| {
260-
if plan.returns_view_table() {
262+
let start_time = std::time::Instant::now();
263+
264+
let result = if plan.returns_view_table() {
261265
if let Some(schema) = plan.return_table() {
262-
let plan = PipelinedProject::from(plan);
263-
let plan = ViewProject::new(plan, schema.num_cols(), schema.num_private_cols());
264-
return collect_table_update_for_view(
265-
&[plan],
266-
table_id,
267-
(&**table_name).into(),
268-
tx,
269-
update_type,
270-
);
266+
let pipelined_plan = PipelinedProject::from(plan.clone());
267+
let view_plan = ViewProject::new(pipelined_plan, schema.num_cols(), schema.num_private_cols());
268+
collect_table_update_for_view(&[view_plan], table_id, (&**table_name).into(), tx, update_type)?
269+
} else {
270+
let pipelined_plan = PipelinedProject::from(plan.clone());
271+
collect_table_update(&[pipelined_plan], table_id, (&**table_name).into(), tx, update_type)?
271272
}
272-
}
273-
let plan = PipelinedProject::from(plan);
274-
collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type)
273+
} else {
274+
let pipelined_plan = PipelinedProject::from(plan.clone());
275+
collect_table_update(&[pipelined_plan], table_id, (&**table_name).into(), tx, update_type)?
276+
};
277+
278+
let elapsed = start_time.elapsed();
279+
280+
let (ref _table_update, ref metrics) = result;
281+
let query_metrics = metrics::get_query_metrics(
282+
table_name,
283+
&plan,
284+
metrics.rows_scanned as u64,
285+
elapsed.as_micros() as u64,
286+
);
287+
288+
Ok((result.0, result.1, Some(query_metrics)))
275289
})
276290
.map_err(|err| DBError::WithSql {
277291
sql: sql.into(),
@@ -283,10 +297,15 @@ where
283297
let n = table_updates_with_metrics.len();
284298
let mut tables = Vec::with_capacity(n);
285299
let mut aggregated_metrics = ExecutionMetrics::default();
286-
for (update, metrics) in table_updates_with_metrics {
300+
let mut query_metrics_vec = Vec::new();
301+
302+
for (update, metrics, query_metrics) in table_updates_with_metrics {
287303
tables.push(update);
288304
aggregated_metrics.merge(metrics);
305+
if let Some(qm) = query_metrics {
306+
query_metrics_vec.push(qm);
307+
}
289308
}
290-
(DatabaseUpdate { tables }, aggregated_metrics)
309+
(DatabaseUpdate { tables }, aggregated_metrics, query_metrics_vec)
291310
})
292311
}

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use super::execution_unit::QueryHash;
2+
use super::metrics::QueryMetrics;
23
use super::module_subscription_manager::{
34
from_tx_offset, spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats,
45
SubscriptionManager, TransactionOffset,
@@ -132,6 +133,24 @@ impl SubscriptionMetrics {
132133
}
133134
}
134135

136+
/// Records subscription query metrics
137+
fn record_query_metrics(database_identity: &Identity, query_metrics: Vec<QueryMetrics>) {
138+
for qm in query_metrics {
139+
WORKER_METRICS
140+
.subscription_rows_examined
141+
.with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns)
142+
.observe(qm.rows_scanned as f64);
143+
WORKER_METRICS
144+
.subscription_query_execution_time_micros
145+
.with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns)
146+
.observe(qm.execution_time_micros as f64);
147+
WORKER_METRICS
148+
.subscription_queries_total
149+
.with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns)
150+
.inc();
151+
}
152+
}
153+
135154
/// Inner result type of [`ModuleSubscriptions::commit_and_broadcast_event`].
136155
pub type CommitAndBroadcastEventResult = Result<CommitAndBroadcastEventSuccess, WriteConflict>;
137156

@@ -324,17 +343,22 @@ impl ModuleSubscriptions {
324343
auth,
325344
)?;
326345

346+
let database_identity = self.relational_db.database_identity();
327347
let tx = DeltaTx::from(tx);
328-
match sender.config.protocol {
348+
let (update, metrics, query_metrics) = match sender.config.protocol {
329349
Protocol::Binary => {
330-
let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?;
331-
Ok((FormatSwitch::Bsatn(update), metrics))
350+
let (update, metrics, query_metrics) = execute_plans(auth, queries, &tx, update_type)?;
351+
(FormatSwitch::Bsatn(update), metrics, query_metrics)
332352
}
333353
Protocol::Text => {
334-
let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?;
335-
Ok((FormatSwitch::Json(update), metrics))
354+
let (update, metrics, query_metrics) = execute_plans(auth, queries, &tx, update_type)?;
355+
(FormatSwitch::Json(update), metrics, query_metrics)
336356
}
337-
}
357+
};
358+
359+
record_query_metrics(&database_identity, query_metrics);
360+
361+
Ok((update, metrics))
338362
}
339363

340364
/// Add a subscription to a single query.
@@ -880,13 +904,17 @@ impl ModuleSubscriptions {
880904
drop(compile_timer);
881905

882906
let tx = DeltaTx::from(&*tx);
883-
let (database_update, metrics) = match sender.config.protocol {
884-
Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe)
885-
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
886-
Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe)
887-
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
907+
let (database_update, metrics, query_metrics) = match sender.config.protocol {
908+
Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe).map(
909+
|(table_update, metrics, query_metrics)| (FormatSwitch::Bsatn(table_update), metrics, query_metrics),
910+
)?,
911+
Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe).map(
912+
|(table_update, metrics, query_metrics)| (FormatSwitch::Json(table_update), metrics, query_metrics),
913+
)?,
888914
};
889915

916+
record_query_metrics(&database_identity, query_metrics);
917+
890918
// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
891919
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
892920
// but that should not pose an issue.

crates/core/src/worker_metrics/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,23 @@ metrics_group!(
387387
#[help = "The number of snapshot objects hardlinked in a single compression pass"]
388388
#[labels(db: Identity)]
389389
pub snapshot_compression_objects_hardlinked: IntGaugeVec,
390+
391+
#[name = spacetime_subscription_rows_examined]
392+
#[help = "Distribution of rows examined per subscription query"]
393+
#[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)]
394+
#[buckets(100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000)]
395+
pub subscription_rows_examined: HistogramVec,
396+
397+
#[name = spacetime_subscription_query_execution_time_micros]
398+
#[help = "Time taken to execute subscription queries (in microseconds)"]
399+
#[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)]
400+
#[buckets(100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000, 10000000)]
401+
pub subscription_query_execution_time_micros: HistogramVec,
402+
403+
#[name = spacetime_subscription_queries_total]
404+
#[help = "Total number of subscription queries by scan strategy"]
405+
#[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)]
406+
pub subscription_queries_total: IntCounterVec,
390407
}
391408
);
392409

0 commit comments

Comments
 (0)