Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions crates/core/src/subscription/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use spacetimedb_physical_plan::plan::PhysicalPlan;
use spacetimedb_schema::schema::TableSchema;
use std::sync::Arc;

/// Scan strategy types for subscription queries
#[derive(Debug, Clone, Copy)]
enum ScanStrategy {
/// Full table scan - no indexes used
Sequential,
/// Uses index but requires post-filtering on non-indexed columns
IndexedWithFilter,
/// Fully indexed - no post-filtering needed
FullyIndexed,
/// Mixed strategy (combination of index and table scans)
Mixed,
/// Unknown/other strategy
Unknown,
}

/// Metrics data for a single subscription query execution
#[derive(Debug)]
pub struct QueryMetrics {
pub scan_type: String,
pub table_name: String,
pub unindexed_columns: String,
pub rows_scanned: u64,
pub execution_time_micros: u64,
}

impl std::fmt::Display for ScanStrategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Sequential => write!(f, "sequential"),
Self::IndexedWithFilter => write!(f, "indexed_with_filter"),
Self::FullyIndexed => write!(f, "fully_indexed"),
Self::Mixed => write!(f, "mixed"),
Self::Unknown => write!(f, "unknown"),
}
}
}

/// Recursively extracts column names from filter expressions
fn extract_columns(
expr: &spacetimedb_physical_plan::plan::PhysicalExpr,
schema: Option<&Arc<TableSchema>>,
columns: &mut Vec<String>,
) {
use spacetimedb_physical_plan::plan::PhysicalExpr;

match expr {
PhysicalExpr::Field(tuple_field) => {
let col_name = schema
.and_then(|s| s.columns.get(tuple_field.field_pos))
.map(|col| col.col_name.to_string())
.unwrap_or_else(|| format!("col_{}", tuple_field.field_pos));
columns.push(col_name);
}
PhysicalExpr::BinOp(_, lhs, rhs) => {
extract_columns(lhs, schema, columns);
extract_columns(rhs, schema, columns);
}
PhysicalExpr::LogOp(_, exprs) => {
for expr in exprs {
extract_columns(expr, schema, columns);
}
}
PhysicalExpr::Value(_) => {}
}
}

/// Analyzes subscription scan strategy and creates QueryMetrics
pub fn get_query_metrics(
table_name: &str,
plan: &PhysicalPlan,
rows_scanned: u64,
execution_time_micros: u64,
) -> QueryMetrics {
let has_table_scan = plan.any(&|p| matches!(p, PhysicalPlan::TableScan(..)));
let has_index_scan = plan.any(&|p| matches!(p, PhysicalPlan::IxScan(..)));
let has_post_filter = plan.any(&|p| matches!(p, PhysicalPlan::Filter(..)));

let strategy = if has_table_scan && has_index_scan {
ScanStrategy::Mixed
} else if has_table_scan {
ScanStrategy::Sequential
} else if has_index_scan && has_post_filter {
ScanStrategy::IndexedWithFilter
} else if has_index_scan {
ScanStrategy::FullyIndexed
} else {
ScanStrategy::Unknown
};

// Extract the schema from the plan
let mut schema: Option<Arc<TableSchema>> = None;
plan.visit(&mut |p| match p {
PhysicalPlan::TableScan(scan, _) => {
schema = Some(scan.schema.clone());
}
PhysicalPlan::IxScan(scan, _) => {
schema = Some(scan.schema.clone());
}
_ => {}
});

let mut columns = Vec::new();
plan.visit(&mut |p| {
if let PhysicalPlan::Filter(_, expr) = p {
extract_columns(expr, schema.as_ref(), &mut columns);
}
});

QueryMetrics {
scan_type: strategy.to_string(),
table_name: table_name.to_string(),
unindexed_columns: columns.join(","),
rows_scanned,
execution_time_micros,
}
}
51 changes: 35 additions & 16 deletions crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilder as _};
use crate::{error::DBError, worker_metrics::WORKER_METRICS};
use anyhow::Result;
use metrics::QueryMetrics;
use module_subscription_manager::Plan;
use prometheus::IntCounter;
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
Expand All @@ -19,6 +20,7 @@ use std::sync::Arc;

pub mod delta;
pub mod execution_unit;
pub mod metrics;
pub mod module_subscription_actor;
pub mod module_subscription_manager;
pub mod query;
Expand Down Expand Up @@ -239,7 +241,7 @@ pub fn execute_plans<Tx, F>(
plans: &[Arc<Plan>],
tx: &Tx,
update_type: TableUpdateType,
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics), DBError>
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics, Vec<QueryMetrics>), DBError>
where
Tx: Datastore + DeltaStore + Sync,
F: BuildableWebsocketFormat,
Expand All @@ -257,21 +259,33 @@ where
.map(|(sql, plan, table_id, table_name)| (sql, plan.optimize(auth), table_id, table_name))
.map(|(sql, plan, table_id, table_name)| {
plan.and_then(|plan| {
if plan.returns_view_table() {
let start_time = std::time::Instant::now();

let result = if plan.returns_view_table() {
if let Some(schema) = plan.return_table() {
let plan = PipelinedProject::from(plan);
let plan = ViewProject::new(plan, schema.num_cols(), schema.num_private_cols());
return collect_table_update_for_view(
&[plan],
table_id,
(&**table_name).into(),
tx,
update_type,
);
let pipelined_plan = PipelinedProject::from(plan.clone());
let view_plan = ViewProject::new(pipelined_plan, schema.num_cols(), schema.num_private_cols());
collect_table_update_for_view(&[view_plan], table_id, (&**table_name).into(), tx, update_type)?
} else {
let pipelined_plan = PipelinedProject::from(plan.clone());
collect_table_update(&[pipelined_plan], table_id, (&**table_name).into(), tx, update_type)?
}
}
let plan = PipelinedProject::from(plan);
collect_table_update(&[plan], table_id, (&**table_name).into(), tx, update_type)
} else {
let pipelined_plan = PipelinedProject::from(plan.clone());
collect_table_update(&[pipelined_plan], table_id, (&**table_name).into(), tx, update_type)?
};

let elapsed = start_time.elapsed();

let (ref _table_update, ref metrics) = result;
let query_metrics = metrics::get_query_metrics(
table_name,
&plan,
metrics.rows_scanned as u64,
elapsed.as_micros() as u64,
);

Ok((result.0, result.1, Some(query_metrics)))
})
.map_err(|err| DBError::WithSql {
sql: sql.into(),
Expand All @@ -283,10 +297,15 @@ where
let n = table_updates_with_metrics.len();
let mut tables = Vec::with_capacity(n);
let mut aggregated_metrics = ExecutionMetrics::default();
for (update, metrics) in table_updates_with_metrics {
let mut query_metrics_vec = Vec::new();

for (update, metrics, query_metrics) in table_updates_with_metrics {
tables.push(update);
aggregated_metrics.merge(metrics);
if let Some(qm) = query_metrics {
query_metrics_vec.push(qm);
}
}
(DatabaseUpdate { tables }, aggregated_metrics)
(DatabaseUpdate { tables }, aggregated_metrics, query_metrics_vec)
})
}
50 changes: 39 additions & 11 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::execution_unit::QueryHash;
use super::metrics::QueryMetrics;
use super::module_subscription_manager::{
from_tx_offset, spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats,
SubscriptionManager, TransactionOffset,
Expand Down Expand Up @@ -132,6 +133,24 @@ impl SubscriptionMetrics {
}
}

/// Records subscription query metrics
fn record_query_metrics(database_identity: &Identity, query_metrics: Vec<QueryMetrics>) {
for qm in query_metrics {
WORKER_METRICS
.subscription_rows_examined
.with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns)
.observe(qm.rows_scanned as f64);
WORKER_METRICS
.subscription_query_execution_time_micros
.with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns)
.observe(qm.execution_time_micros as f64);
WORKER_METRICS
.subscription_queries_total
.with_label_values(database_identity, &qm.scan_type, &qm.table_name, &qm.unindexed_columns)
.inc();
}
}

/// Inner result type of [`ModuleSubscriptions::commit_and_broadcast_event`].
pub type CommitAndBroadcastEventResult = Result<CommitAndBroadcastEventSuccess, WriteConflict>;

Expand Down Expand Up @@ -345,17 +364,22 @@ impl ModuleSubscriptions {
auth,
)?;

let database_identity = self.relational_db.database_identity();
let tx = DeltaTx::from(tx);
match sender.config.protocol {
let (update, metrics, query_metrics) = match sender.config.protocol {
Protocol::Binary => {
let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?;
Ok((FormatSwitch::Bsatn(update), metrics))
let (update, metrics, query_metrics) = execute_plans(auth, queries, &tx, update_type)?;
(FormatSwitch::Bsatn(update), metrics, query_metrics)
}
Protocol::Text => {
let (update, metrics) = execute_plans(auth, queries, &tx, update_type)?;
Ok((FormatSwitch::Json(update), metrics))
let (update, metrics, query_metrics) = execute_plans(auth, queries, &tx, update_type)?;
(FormatSwitch::Json(update), metrics, query_metrics)
}
}
};

record_query_metrics(&database_identity, query_metrics);

Ok((update, metrics))
}

/// Add a subscription to a single query.
Expand Down Expand Up @@ -896,13 +920,17 @@ impl ModuleSubscriptions {
drop(compile_timer);

let tx = DeltaTx::from(&*tx);
let (database_update, metrics) = match sender.config.protocol {
Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe)
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe)
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
let (database_update, metrics, query_metrics) = match sender.config.protocol {
Protocol::Binary => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe).map(
|(table_update, metrics, query_metrics)| (FormatSwitch::Bsatn(table_update), metrics, query_metrics),
)?,
Protocol::Text => execute_plans(&auth, &queries, &tx, TableUpdateType::Subscribe).map(
|(table_update, metrics, query_metrics)| (FormatSwitch::Json(table_update), metrics, query_metrics),
)?,
};

record_query_metrics(&database_identity, query_metrics);

// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
// but that should not pose an issue.
Expand Down
17 changes: 17 additions & 0 deletions crates/core/src/worker_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,23 @@ metrics_group!(
#[help = "The number of snapshot objects hardlinked in a single compression pass"]
#[labels(db: Identity)]
pub snapshot_compression_objects_hardlinked: IntGaugeVec,

#[name = spacetime_subscription_rows_examined]
#[help = "Distribution of rows examined per subscription query"]
#[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)]
#[buckets(100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000)]
pub subscription_rows_examined: HistogramVec,

#[name = spacetime_subscription_query_execution_time_micros]
#[help = "Time taken to execute and fetch records for an initial subscription query (in microseconds)"]
#[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)]
#[buckets(100, 500, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 5000000, 10000000)]
pub subscription_query_execution_time_micros: HistogramVec,

#[name = spacetime_subscription_queries_total]
#[help = "Total number of subscription queries by scan strategy"]
#[labels(db: Identity, scan_type: str, table: str, unindexed_columns: str)]
pub subscription_queries_total: IntCounterVec,
}
);

Expand Down
Loading