diff --git a/crates/core/src/subscription/metrics.rs b/crates/core/src/subscription/metrics.rs new file mode 100644 index 00000000000..9abe3e12430 --- /dev/null +++ b/crates/core/src/subscription/metrics.rs @@ -0,0 +1,120 @@ +use spacetimedb_physical_plan::plan::PhysicalPlan; +use spacetimedb_schema::schema::TableSchema; +use std::sync::Arc; + +/// Scan strategy types for subscription queries +#[derive(Debug, Clone, Copy)] +enum ScanStrategy { + /// Full table scan - no indexes used + Sequential, + /// Uses index but requires post-filtering on non-indexed columns + IndexedWithFilter, + /// Fully indexed - no post-filtering needed + FullyIndexed, + /// Mixed strategy (combination of index and table scans) + Mixed, + /// Unknown/other strategy + Unknown, +} + +/// Metrics data for a single subscription query execution +#[derive(Debug)] +pub struct QueryMetrics { + pub scan_type: String, + pub table_name: String, + pub unindexed_columns: String, + pub rows_scanned: u64, + pub execution_time_micros: u64, +} + +impl std::fmt::Display for ScanStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Sequential => write!(f, "sequential"), + Self::IndexedWithFilter => write!(f, "indexed_with_filter"), + Self::FullyIndexed => write!(f, "fully_indexed"), + Self::Mixed => write!(f, "mixed"), + Self::Unknown => write!(f, "unknown"), + } + } +} + +/// Recursively extracts column names from filter expressions +fn extract_columns( + expr: &spacetimedb_physical_plan::plan::PhysicalExpr, + schema: Option<&Arc>, + columns: &mut Vec, +) { + use spacetimedb_physical_plan::plan::PhysicalExpr; + + match expr { + PhysicalExpr::Field(tuple_field) => { + let col_name = schema + .and_then(|s| s.columns.get(tuple_field.field_pos)) + .map(|col| col.col_name.to_string()) + .unwrap_or_else(|| format!("col_{}", tuple_field.field_pos)); + columns.push(col_name); + } + PhysicalExpr::BinOp(_, lhs, rhs) => { + extract_columns(lhs, schema, columns); + extract_columns(rhs, schema, columns); + } + PhysicalExpr::LogOp(_, exprs) => { + for expr in exprs { + extract_columns(expr, schema, columns); + } + } + PhysicalExpr::Value(_) => {} + } +} + +/// Analyzes subscription scan strategy and creates QueryMetrics +pub fn get_query_metrics( + table_name: &str, + plan: &PhysicalPlan, + rows_scanned: u64, + execution_time_micros: u64, +) -> QueryMetrics { + let has_table_scan = plan.any(&|p| matches!(p, PhysicalPlan::TableScan(..))); + let has_index_scan = plan.any(&|p| matches!(p, PhysicalPlan::IxScan(..))); + let has_post_filter = plan.any(&|p| matches!(p, PhysicalPlan::Filter(..))); + + let strategy = if has_table_scan && has_index_scan { + ScanStrategy::Mixed + } else if has_table_scan { + ScanStrategy::Sequential + } else if has_index_scan && has_post_filter { + ScanStrategy::IndexedWithFilter + } else if has_index_scan { + ScanStrategy::FullyIndexed + } else { + ScanStrategy::Unknown + }; + + // Extract the schema from the plan + let mut schema: Option> = None; + plan.visit(&mut |p| match p { + PhysicalPlan::TableScan(scan, _) => { + schema = Some(scan.schema.clone()); + } + PhysicalPlan::IxScan(scan, _) => { + schema = Some(scan.schema.clone()); + } + _ => {} + }); + + let mut columns = Vec::new(); + plan.visit(&mut |p| { + if let PhysicalPlan::Filter(_, expr) = p { + extract_columns(expr, schema.as_ref(), &mut columns); + } + }); + + QueryMetrics { + scan_type: strategy.to_string(), + table_name: table_name.to_string(), + unindexed_columns: columns.join(","), + rows_scanned, + execution_time_micros, + } +} diff --git a/crates/core/src/subscription/mod.rs b/crates/core/src/subscription/mod.rs index 5066c1d17b4..f893de157cc 100644 --- a/crates/core/src/subscription/mod.rs +++ b/crates/core/src/subscription/mod.rs @@ -1,6 +1,7 @@ use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilder as _}; use crate::{error::DBError, worker_metrics::WORKER_METRICS}; use anyhow::Result; +use metrics::QueryMetrics; use module_subscription_manager::Plan; use prometheus::IntCounter; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; @@ -19,6 +20,7 @@ use std::sync::Arc; pub mod delta; pub mod execution_unit; +pub mod metrics; pub mod module_subscription_actor; pub mod module_subscription_manager; pub mod query; @@ -239,7 +241,7 @@ pub fn execute_plans( plans: &[Arc], tx: &Tx, update_type: TableUpdateType, -) -> Result<(DatabaseUpdate, ExecutionMetrics), DBError> +) -> Result<(DatabaseUpdate, ExecutionMetrics, Vec), DBError> where Tx: Datastore + DeltaStore + Sync, F: BuildableWebsocketFormat, @@ -257,21 +259,33 @@ where .map(|(sql, plan, table_id, table_name)| (sql, plan.optimize(auth), table_id, table_name)) .map(|(sql, plan, table_id, table_name)| { plan.and_then(|plan| { - if plan.returns_view_table() { + let start_time = std::time::Instant::now(); + + let result = if plan.returns_view_table() { if let Some(schema) = plan.return_table() { - let plan = PipelinedProject::from(plan); - let plan = ViewProject::new(plan, schema.num_cols(), schema.num_private_cols()); - return collect_table_update_for_view( - &[plan], - table_id, - (&**table_name).into(), - tx, - update_type, - ); + let pipelined_plan = PipelinedProject::from(plan.clone()); + let view_plan = ViewProject::new(pipelined_plan, schema.num_cols(), schema.num_private_cols()); + collect_table_update_for_view(&[view_plan], table_id, (&**table_name).into(), tx, update_type)? + } else { + let pipelined_plan = PipelinedProject::from(plan.clone()); + collect_table_update(&[pipelined_plan], table_id, (&**table_name).into(), tx, update_type)? } - } - let plan = PipelinedProject::from(plan); - collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type) + } else { + let pipelined_plan = PipelinedProject::from(plan.clone()); + collect_table_update(&[pipelined_plan], table_id, (&**table_name).into(), tx, update_type)? + }; + + let elapsed = start_time.elapsed(); + + let (ref _table_update, ref metrics) = result; + let query_metrics = metrics::get_query_metrics( + table_name, + &plan, + metrics.rows_scanned as u64, + elapsed.as_micros() as u64, + ); + + Ok((result.0, result.1, Some(query_metrics))) }) .map_err(|err| DBError::WithSql { sql: sql.into(), @@ -283,10 +297,15 @@ where let n = table_updates_with_metrics.len(); let mut tables = Vec::with_capacity(n); let mut aggregated_metrics = ExecutionMetrics::default(); - for (update, metrics) in table_updates_with_metrics { + let mut query_metrics_vec = Vec::new(); + + for (update, metrics, query_metrics) in table_updates_with_metrics { tables.push(update); aggregated_metrics.merge(metrics); + if let Some(qm) = query_metrics { + query_metrics_vec.push(qm); + } } - (DatabaseUpdate { tables }, aggregated_metrics) + (DatabaseUpdate { tables }, aggregated_metrics, query_metrics_vec) }) } diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 4e84f41b52b..2e442fa30aa 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1,4 +1,5 @@ use super::execution_unit::QueryHash; +use super::metrics::QueryMetrics; use super::module_subscription_manager::{ from_tx_offset, spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager, TransactionOffset, @@ -132,6 +133,24 @@ impl SubscriptionMetrics { } } +/// Records subscription query metrics +fn record_query_metrics(database_identity: &Identity, query_metrics: Vec) { + for qm in query_metrics { + WORKER_METRICS + .subscription_rows_examined + .with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns) + .observe(qm.rows_scanned as f64); + WORKER_METRICS + .subscription_query_execution_time_micros + .with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns) + .observe(qm.execution_time_micros as f64); + WORKER_METRICS + .subscription_queries_total + .with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns) + .inc(); + } +} + /// Inner result type of [`ModuleSubscriptions::commit_and_broadcast_event`]. pub type CommitAndBroadcastEventResult = Result; @@ -345,17 +364,22 @@ impl ModuleSubscriptions { auth, )?; + let database_identity = self.relational_db.database_identity(); let tx = DeltaTx::from(tx); - match sender.config.protocol { + let (update, metrics, query_metrics) = match sender.config.protocol { Protocol::Binary => { - let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?; - Ok((FormatSwitch::Bsatn(update), metrics)) + let (update, metrics, query_metrics) = execute_plans(auth, queries, &tx, update_type)?; + (FormatSwitch::Bsatn(update), metrics, query_metrics) } Protocol::Text => { - let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?; - Ok((FormatSwitch::Json(update), metrics)) + let (update, metrics, query_metrics) = execute_plans(auth, queries, &tx, update_type)?; + (FormatSwitch::Json(update), metrics, query_metrics) } - } + }; + + record_query_metrics(&database_identity, query_metrics); + + Ok((update, metrics)) } /// Add a subscription to a single query. @@ -896,13 +920,17 @@ impl ModuleSubscriptions { drop(compile_timer); let tx = DeltaTx::from(&*tx); - let (database_update, metrics) = match sender.config.protocol { - Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe) - .map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?, - Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe) - .map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?, + let (database_update, metrics, query_metrics) = match sender.config.protocol { + Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe).map( + |(table_update, metrics, query_metrics)| (FormatSwitch::Bsatn(table_update), metrics, query_metrics), + )?, + Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe).map( + |(table_update, metrics, query_metrics)| (FormatSwitch::Json(table_update), metrics, query_metrics), + )?, }; + record_query_metrics(&database_identity, query_metrics); + // It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently. // This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here // but that should not pose an issue. diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 0f20a12d632..2b461a235aa 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -387,6 +387,23 @@ metrics_group!( #[help = "The number of snapshot objects hardlinked in a single compression pass"] #[labels(db: Identity)] pub snapshot_compression_objects_hardlinked: IntGaugeVec, + + #[name = spacetime_subscription_rows_examined] + #[help = "Distribution of rows examined per subscription query"] + #[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)] + #[buckets(100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000)] + pub subscription_rows_examined: HistogramVec, + + #[name = spacetime_subscription_query_execution_time_micros] + #[help = "Time taken to execute and fetch records for an initial subscription query (in microseconds)"] + #[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)] + #[buckets(100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000, 10000000)] + pub subscription_query_execution_time_micros: HistogramVec, + + #[name = spacetime_subscription_queries_total] + #[help = "Total number of subscription queries by scan strategy"] + #[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)] + pub subscription_queries_total: IntCounterVec, } );