From 9cf4564da7789cc3077e7ded036ce82be7ad2dec Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Fri, 14 Nov 2025 13:48:25 +0100 Subject: [PATCH 1/2] Add metrics for subscription queries We have some metrics measuring various lower level things like index scans, but at the moment we have no easy way to figure out which columns might need an index. This commit introduces three new metrics that provide that information by labeling count, latency, and number of rows canned along with the scan type (index scan, table scan, mixed scan) and info about unindexed columns. --- crates/core/src/subscription/metrics.rs | 120 ++++++++++++++++++ crates/core/src/subscription/mod.rs | 51 +++++--- .../subscription/module_subscription_actor.rs | 50 ++++++-- crates/core/src/worker_metrics/mod.rs | 17 +++ 4 files changed, 211 insertions(+), 27 deletions(-) create mode 100644 crates/core/src/subscription/metrics.rs diff --git a/crates/core/src/subscription/metrics.rs b/crates/core/src/subscription/metrics.rs new file mode 100644 index 00000000000..9abe3e12430 --- /dev/null +++ b/crates/core/src/subscription/metrics.rs @@ -0,0 +1,120 @@ +use spacetimedb_physical_plan::plan::PhysicalPlan; +use spacetimedb_schema::schema::TableSchema; +use std::sync::Arc; + +/// Scan strategy types for subscription queries +#[derive(Debug, Clone, Copy)] +enum ScanStrategy { + /// Full table scan - no indexes used + Sequential, + /// Uses index but requires post-filtering on non-indexed columns + IndexedWithFilter, + /// Fully indexed - no post-filtering needed + FullyIndexed, + /// Mixed strategy (combination of index and table scans) + Mixed, + /// Unknown/other strategy + Unknown, +} + +/// Metrics data for a single subscription query execution +#[derive(Debug)] +pub struct QueryMetrics { + pub scan_type: String, + pub table_name: String, + pub unindexed_columns: String, + pub rows_scanned: u64, + pub execution_time_micros: u64, +} + +impl std::fmt::Display for ScanStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Sequential => write!(f, "sequential"), + Self::IndexedWithFilter => write!(f, "indexed_with_filter"), + Self::FullyIndexed => write!(f, "fully_indexed"), + Self::Mixed => write!(f, "mixed"), + Self::Unknown => write!(f, "unknown"), + } + } +} + +/// Recursively extracts column names from filter expressions +fn extract_columns( + expr: &spacetimedb_physical_plan::plan::PhysicalExpr, + schema: Option<&Arc>, + columns: &mut Vec, +) { + use spacetimedb_physical_plan::plan::PhysicalExpr; + + match expr { + PhysicalExpr::Field(tuple_field) => { + let col_name = schema + .and_then(|s| s.columns.get(tuple_field.field_pos)) + .map(|col| col.col_name.to_string()) + .unwrap_or_else(|| format!("col_{}", tuple_field.field_pos)); + columns.push(col_name); + } + PhysicalExpr::BinOp(_, lhs, rhs) => { + extract_columns(lhs, schema, columns); + extract_columns(rhs, schema, columns); + } + PhysicalExpr::LogOp(_, exprs) => { + for expr in exprs { + extract_columns(expr, schema, columns); + } + } + PhysicalExpr::Value(_) => {} + } +} + +/// Analyzes subscription scan strategy and creates QueryMetrics +pub fn get_query_metrics( + table_name: &str, + plan: &PhysicalPlan, + rows_scanned: u64, + execution_time_micros: u64, +) -> QueryMetrics { + let has_table_scan = plan.any(&|p| matches!(p, PhysicalPlan::TableScan(..))); + let has_index_scan = plan.any(&|p| matches!(p, PhysicalPlan::IxScan(..))); + let has_post_filter = plan.any(&|p| matches!(p, PhysicalPlan::Filter(..))); + + let strategy = if has_table_scan && has_index_scan { + ScanStrategy::Mixed + } else if has_table_scan { + ScanStrategy::Sequential + } else if has_index_scan && has_post_filter { + ScanStrategy::IndexedWithFilter + } else if has_index_scan { + ScanStrategy::FullyIndexed + } else { + ScanStrategy::Unknown + }; + + // Extract the schema from the plan + let mut schema: Option> = None; + plan.visit(&mut |p| match p { + PhysicalPlan::TableScan(scan, _) => { + schema = Some(scan.schema.clone()); + } + PhysicalPlan::IxScan(scan, _) => { + schema = Some(scan.schema.clone()); + } + _ => {} + }); + + let mut columns = Vec::new(); + plan.visit(&mut |p| { + if let PhysicalPlan::Filter(_, expr) = p { + extract_columns(expr, schema.as_ref(), &mut columns); + } + }); + + QueryMetrics { + scan_type: strategy.to_string(), + table_name: table_name.to_string(), + unindexed_columns: columns.join(","), + rows_scanned, + execution_time_micros, + } +} diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 5066c1d17b4..f893de157cc 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -1,6 +1,7 @@ use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilder as _}; use crate::{error::DBError, worker_metrics::WORKER_METRICS}; use anyhow::Result; +use metrics::QueryMetrics; use module_subscription_manager::Plan; use prometheus::IntCounter; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; @@ -19,6 +20,7 @@ use std::sync::Arc; pub mod delta; pub mod execution_unit; +pub mod metrics; pub mod module_subscription_actor; pub mod module_subscription_manager; pub mod query; @@ -239,7 +241,7 @@ pub fn execute_plans( plans: &[Arc], tx: &Tx, update_type: TableUpdateType, -) -> Result<(DatabaseUpdate, ExecutionMetrics), DBError> +) -> Result<(DatabaseUpdate, ExecutionMetrics, Vec), DBError> where Tx: Datastore + DeltaStore + Sync, F: BuildableWebsocketFormat, @@ -257,21 +259,33 @@ where .map(|(sql, plan, table_id, table_name)| (sql, plan.optimize(auth), table_id, table_name)) .map(|(sql, plan, table_id, table_name)| { plan.and_then(|plan| { - if plan.returns_view_table() { + let start_time = std::time::Instant::now(); + + let result = if plan.returns_view_table() { if let Some(schema) = plan.return_table() { - let plan = PipelinedProject::from(plan); - let plan = ViewProject::new(plan, schema.num_cols(), schema.num_private_cols()); - return collect_table_update_for_view( - &[plan], - table_id, - (&**table_name).into(), - tx, - update_type, - ); + let pipelined_plan = PipelinedProject::from(plan.clone()); + let view_plan = ViewProject::new(pipelined_plan, schema.num_cols(), schema.num_private_cols()); + collect_table_update_for_view(&[view_plan], table_id, (&**table_name).into(), tx, update_type)? + } else { + let pipelined_plan = PipelinedProject::from(plan.clone()); + collect_table_update(&[pipelined_plan], table_id, (&**table_name).into(), tx, update_type)? } - } - let plan = PipelinedProject::from(plan); - collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type) + } else { + let pipelined_plan = PipelinedProject::from(plan.clone()); + collect_table_update(&[pipelined_plan], table_id, (&**table_name).into(), tx, update_type)? + }; + + let elapsed = start_time.elapsed(); + + let (ref _table_update, ref metrics) = result; + let query_metrics = metrics::get_query_metrics( + table_name, + &plan, + metrics.rows_scanned as u64, + elapsed.as_micros() as u64, + ); + + Ok((result.0, result.1, Some(query_metrics))) }) .map_err(|err| DBError::WithSql { sql: sql.into(), @@ -283,10 +297,15 @@ where let n = table_updates_with_metrics.len(); let mut tables = Vec::with_capacity(n); let mut aggregated_metrics = ExecutionMetrics::default(); - for (update, metrics) in table_updates_with_metrics { + let mut query_metrics_vec = Vec::new(); + + for (update, metrics, query_metrics) in table_updates_with_metrics { tables.push(update); aggregated_metrics.merge(metrics); + if let Some(qm) = query_metrics { + query_metrics_vec.push(qm); + } } - (DatabaseUpdate { tables }, aggregated_metrics) + (DatabaseUpdate { tables }, aggregated_metrics, query_metrics_vec) }) } diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index f244c8aeedb..212d1bea58d 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1,4 +1,5 @@ use super::execution_unit::QueryHash; +use super::metrics::QueryMetrics; use super::module_subscription_manager::{ from_tx_offset, spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager, TransactionOffset, @@ -132,6 +133,24 @@ impl SubscriptionMetrics { } } +/// Records subscription query metrics +fn record_query_metrics(database_identity: &Identity, query_metrics: Vec) { + for qm in query_metrics { + WORKER_METRICS + .subscription_rows_examined + .with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns) + .observe(qm.rows_scanned as f64); + WORKER_METRICS + .subscription_query_execution_time_micros + .with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns) + .observe(qm.execution_time_micros as f64); + WORKER_METRICS + .subscription_queries_total + .with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns) + .inc(); + } +} + /// Inner result type of [`ModuleSubscriptions::commit_and_broadcast_event`]. pub type CommitAndBroadcastEventResult = Result; @@ -324,17 +343,22 @@ impl ModuleSubscriptions { auth, )?; + let database_identity = self.relational_db.database_identity(); let tx = DeltaTx::from(tx); - match sender.config.protocol { + let (update, metrics, query_metrics) = match sender.config.protocol { Protocol::Binary => { - let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?; - Ok((FormatSwitch::Bsatn(update), metrics)) + let (update, metrics, query_metrics) = execute_plans(auth, queries, &tx, update_type)?; + (FormatSwitch::Bsatn(update), metrics, query_metrics) } Protocol::Text => { - let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?; - Ok((FormatSwitch::Json(update), metrics)) + let (update, metrics, query_metrics) = execute_plans(auth, queries, &tx, update_type)?; + (FormatSwitch::Json(update), metrics, query_metrics) } - } + }; + + record_query_metrics(&database_identity, query_metrics); + + Ok((update, metrics)) } /// Add a subscription to a single query. @@ -880,13 +904,17 @@ impl ModuleSubscriptions { drop(compile_timer); let tx = DeltaTx::from(&*tx); - let (database_update, metrics) = match sender.config.protocol { - Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe) - .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?, - Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe) - .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?, + let (database_update, metrics, query_metrics) = match sender.config.protocol { + Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe).map( + |(table_update, metrics, query_metrics)| (FormatSwitch::Bsatn(table_update), metrics, query_metrics), + )?, + Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe).map( + |(table_update, metrics, query_metrics)| (FormatSwitch::Json(table_update), metrics, query_metrics), + )?, }; + record_query_metrics(&database_identity, query_metrics); + // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently. // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here // but that should not pose an issue. diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index cf1ec661a23..17aaaa113ad 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -387,6 +387,23 @@ metrics_group!( #[help = "The number of snapshot objects hardlinked in a single compression pass"] #[labels(db: Identity)] pub snapshot_compression_objects_hardlinked: IntGaugeVec, + + #[name = spacetime_subscription_rows_examined] + #[help = "Distribution of rows examined per subscription query"] + #[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)] + #[buckets(100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000)] + pub subscription_rows_examined: HistogramVec, + + #[name = spacetime_subscription_query_execution_time_micros] + #[help = "Time taken to execute subscription queries (in microseconds)"] + #[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)] + #[buckets(100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000, 10000000)] + pub subscription_query_execution_time_micros: HistogramVec, + + #[name = spacetime_subscription_queries_total] + #[help = "Total number of subscription queries by scan strategy"] + #[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)] + pub subscription_queries_total: IntCounterVec, } ); From f18cb14538e7e265826d226e867cd537024a26fa Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Mon, 24 Nov 2025 13:14:27 +0100 Subject: [PATCH 2/2] Clarify text for subscription execution time metric --- crates/core/src/worker_metrics/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 17aaaa113ad..1a47cf698b2 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -395,7 +395,7 @@ metrics_group!( pub subscription_rows_examined: HistogramVec, #[name = spacetime_subscription_query_execution_time_micros] - #[help = "Time taken to execute subscription queries (in microseconds)"] + #[help = "Time taken to execute and fetch records for an initial subscription query (in microseconds)"] #[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)] #[buckets(100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000, 10000000)] pub subscription_query_execution_time_micros: HistogramVec,