From 54b77db39d58f695e6ab3f7bd80e440227bb132e Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 13 Nov 2025 01:53:01 +0530 Subject: [PATCH 1/7] views cleanup --- crates/core/src/db/relational_db.rs | 25 ++++++ crates/core/src/host/host_controller.rs | 14 ++-- .../locking_tx_datastore/committed_state.rs | 4 +- .../src/locking_tx_datastore/mut_tx.rs | 84 +++++++++++++++++-- 4 files changed, 113 insertions(+), 14 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index a85edb3e6b9..ce4135572af 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1059,6 +1059,31 @@ impl RelationalDB { } } +/// Duration after which expired unused views are cleaned up. +/// Value is chosen arbitrarily; can be tuned later if needed. +const VIEWS_EXPIRATION: std::time::Duration = std::time::Duration::from_secs(100); + +/// Spawn a background task that periodically cleans up expired views. +pub fn spawn_view_cleanup_loop(db: Arc) -> tokio::task::AbortHandle { + tokio::spawn(async move { + let db = &db; + loop { + if let Err(e) = db.with_auto_commit(Workload::Internal, |tx| { + tx.clear_expired_views(VIEWS_EXPIRATION).map_err(DBError::from) + }) { + log::error!( + "[{}] DATABASE: failed to clear expired views: {}", + db.database_identity(), + e + ); + } + + tokio::time::sleep(VIEWS_EXPIRATION).await; + } + }) + .abort_handle() +} + impl RelationalDB { pub fn create_table(&self, tx: &mut MutTx, schema: TableSchema) -> Result { Ok(self.inner.create_table_mut_tx(tx, schema)?) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index f9343734b4f..084cdebc3c8 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -5,7 +5,7 @@ use super::{Scheduler, UpdateDatabaseResult}; use crate::client::{ClientActorId, ClientName}; use crate::database_logger::DatabaseLogger; use crate::db::persistence::PersistenceProvider; -use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata}; +use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata}; use crate::db::{self, spawn_tx_metrics_recorder}; use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor}; use crate::host::module_host::ModuleRuntime as _; @@ -740,6 +740,9 @@ struct Host { /// Handle to the task responsible for recording metrics for each transaction. /// The task is aborted when [`Host`] is dropped. tx_metrics_recorder_task: AbortHandle, + /// Handle to the task responsible for cleaning up old views. + /// The task is aborted when [`Host`] is dropped. + view_cleanup_task: AbortHandle, } impl Host { @@ -864,19 +867,17 @@ impl Host { scheduler_starter.start(&module_host)?; let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle(); + let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db.clone()); let module = watch::Sender::new(module_host); - //TODO(shub): Below code interfere with `exit_module` code, - // I suspect channel internally holds a reference to the module, - // even after we drop the sender. - // - // replica_ctx.subscriptions.init(module.subscribe()); + Ok(Host { module, replica_ctx, scheduler, disk_metrics_recorder_task, tx_metrics_recorder_task, + view_cleanup_task, }) } @@ -1053,6 +1054,7 @@ impl Drop for Host { fn drop(&mut self) { self.disk_metrics_recorder_task.abort(); self.tx_metrics_recorder_task.abort(); + self.view_cleanup_task.abort(); } } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 22e4d4d00ad..9980fe79bc5 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -634,8 +634,8 @@ impl CommittedState { tx_data.has_rows_or_connect_disconnect(ctx.reducer_context()) } - pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId) { - self.read_sets.remove_view(view_id) + pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId, sender: Option) { + self.read_sets.remove_view(view_id, sender) } pub(super) fn merge(&mut self, tx_state: TxState, read_sets: ViewReadSets, ctx: &ExecutionContext) -> TxData { diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index e3c496d67b6..157e71389c0 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -107,9 +107,9 @@ impl ViewReadSets { } /// Removes keys for `view_id` from the read set - pub fn remove_view(&mut self, view_id: ViewId) { + pub fn remove_view(&mut self, view_id: ViewId, sender: Option) { self.tables.retain(|_, readset| { - readset.remove_view(view_id); + readset.remove_view(view_id, sender); !readset.is_empty() }); } @@ -144,9 +144,14 @@ impl TableReadSet { self.table_scans.is_empty() } - /// Removes keys for `view_id` from the read set - fn remove_view(&mut self, view_id: ViewId) { - self.table_scans.retain(|info| info.view_id != view_id); + /// Removes keys for `view_id` from the read set, optionally filtering by `sender` + fn remove_view(&mut self, view_id: ViewId, sender: Option) { + if let Some(identity) = sender { + self.table_scans + .retain(|call| !(call.view_id == view_id && call.sender.as_ref() == Some(&identity))); + } else { + self.table_scans.retain(|call| call.view_id != view_id); + } } /// Merge or union two read sets for this table @@ -221,7 +226,14 @@ impl MutTxId { /// Removes keys for `view_id` from the committed read set. /// Used for dropping views in an auto-migration. pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) { - self.committed_state_write_lock.drop_view_from_read_sets(view_id) + self.committed_state_write_lock.drop_view_from_read_sets(view_id, None) + } + + /// Removes a specific view call from the committed read set. + /// Used when dropping views due to + pub fn drop_view_with_sender_from_committed_read_set(&mut self, view_id: ViewId, sender: Identity) { + self.committed_state_write_lock + .drop_view_from_read_sets(view_id, Some(sender)) } } @@ -1959,6 +1971,66 @@ impl MutTxId { Ok(()) } + /// Clean up Views that have no subscribers and haven't been called within the expiration duration. + /// + /// It looks for rows in `st_view_sub` where: + /// - `has_subscribers` is `false`, + /// - `last_called` timestamp is older than the expiration threshold. + /// + /// for each such row, it clears the backing table, readset entry and deletes the subscription row. + pub fn clear_expired_views(&mut self, expiration_duration: Duration) -> Result<()> { + let now = Timestamp::now(); + let expiration_threshold = now - expiration_duration; + + // Collect rows that meet expiration criteria + let expired_sub_rows: Vec<(StViewSubRow, RowPointer)> = self + .iter_by_col_eq( + ST_VIEW_ID, + StViewSubFields::HasSubscribers, + &AlgebraicValue::from(false), + )? + .map(|row_ref| { + StViewSubRow::try_from(row_ref) + .map(|row| (row, row_ref.pointer())) + .expect("Failed to deserialize st_view_sub row") + }) + .filter(|(row, _)| { + !row.has_subscribers && row.num_subscribers == 0 && row.last_called.0 < expiration_threshold + }) + .collect(); + + // For each expired view subscription, clear the backing table and delete the subscription + for (row, ptr) in expired_sub_rows { + let view_id = row.view_id; + let sender: Identity = row.identity.into(); + // Get the view's backing table_id from st_view + let StViewRow { + table_id, is_anonymous, .. + } = self.lookup_st_view(view_id)?; + let table_id = table_id.unwrap(); + + if is_anonymous { + self.clear_table(table_id)?; + self.drop_view_from_committed_read_set(view_id); + } else { + let rows_to_delete = self + .iter_by_col_eq(table_id, StViewSubFields::Identity, &sender.into())? + .map(|res| res.pointer()) + .collect::>(); + + for row_ptr in rows_to_delete { + self.delete(table_id, row_ptr)?; + } + + self.drop_view_with_sender_from_committed_read_set(view_id, sender); + } + + // Finally, delete the st_view_sub row + self.delete(ST_VIEW_SUB_ID, ptr)?; + } + Ok(()) + } + /// Decrement `num_subscribers` in `st_view_sub` to effectively unsubscribe a caller from a view. pub fn unsubscribe_view(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> { use StViewSubFields::*; From 93959d4f98c8be77501b48df4812ea94d6812388 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 13 Nov 2025 02:00:31 +0530 Subject: [PATCH 2/7] unwrap -> expect Signed-off-by: Shubham Mishra --- crates/datastore/src/locking_tx_datastore/mut_tx.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 157e71389c0..9de6b8651be 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -2007,7 +2007,7 @@ impl MutTxId { let StViewRow { table_id, is_anonymous, .. } = self.lookup_st_view(view_id)?; - let table_id = table_id.unwrap(); + let table_id = table_id.expect("views have backing table"); if is_anonymous { self.clear_table(table_id)?; From 6615aac3f5a1a72acccae140e2727987f19fc65f Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 13 Nov 2025 02:02:03 +0530 Subject: [PATCH 3/7] doc Signed-off-by: Shubham Mishra --- crates/datastore/src/locking_tx_datastore/mut_tx.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 9de6b8651be..d6e5f3df242 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -230,7 +230,6 @@ impl MutTxId { } /// Removes a specific view call from the committed read set. - /// Used when dropping views due to pub fn drop_view_with_sender_from_committed_read_set(&mut self, view_id: ViewId, sender: Identity) { self.committed_state_write_lock .drop_view_from_read_sets(view_id, Some(sender)) @@ -1985,7 +1984,7 @@ impl MutTxId { // Collect rows that meet expiration criteria let expired_sub_rows: Vec<(StViewSubRow, RowPointer)> = self .iter_by_col_eq( - ST_VIEW_ID, + ST_VIEW_SUB_ID, StViewSubFields::HasSubscribers, &AlgebraicValue::from(false), )? From 2765046955b0f8377d87629f3914510a0808656d Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 13 Nov 2025 21:43:46 +0530 Subject: [PATCH 4/7] test --- crates/core/src/db/relational_db.rs | 111 ++++++++++++++++++ .../src/locking_tx_datastore/mut_tx.rs | 6 +- 2 files changed, 114 insertions(+), 3 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index ce4135572af..5a801a398bf 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -2368,6 +2368,7 @@ mod tests { use std::fs::OpenOptions; use std::path::PathBuf; use std::rc::Rc; + use std::time::Instant; use super::tests_utils::begin_mut_tx; use super::*; @@ -2387,6 +2388,7 @@ mod tests { ST_SEQUENCE_ID, ST_TABLE_ID, }; use spacetimedb_fs_utils::compression::CompressType; + use spacetimedb_lib::bsatn::to_vec; use spacetimedb_lib::db::raw_def::v9::{btree, RawTableDefBuilder}; use spacetimedb_lib::error::ResultTest; use spacetimedb_lib::Identity; @@ -2420,6 +2422,27 @@ mod tests { TableSchema::from_module_def(&def, table, (), TableId::SENTINEL) } + fn view_module_def() -> ModuleDef { + let mut builder = RawModuleDefV9Builder::new(); + + let return_type_ref = builder.add_algebraic_type( + [], + "my_view_return_type", + AlgebraicType::product([("b", AlgebraicType::U8)]), + true, + ); + builder.add_view( + "my_view", + 0, + true, + false, + ProductType::unit(), + AlgebraicType::array(AlgebraicType::Ref(return_type_ref)), + ); + let raw = builder.finish(); + raw.try_into().expect("table validation failed") + } + fn table_auto_inc() -> TableSchema { table( "MyTable", @@ -2504,6 +2527,94 @@ mod tests { Ok(()) } + #[test] + fn test_views() -> ResultTest<()> { + let stdb = TestDB::durable()?; + let module_def = view_module_def(); + let view_def = module_def.view("my_view").unwrap(); + + let row_type = view_def.product_type_ref; + let typespace = module_def.typespace(); + + let to_bstan = |pv: &ProductValue| { + Bytes::from(to_vec(&AlgebraicValue::Array([pv.clone()].into())).expect("bstan serialization failed")) + }; + let row_pv = |v: u8| ProductValue::from_iter(vec![AlgebraicValue::U8(v)]); + + let project_views = |stdb: &TestDB, table_id: TableId, sender: Identity| { + let tx = begin_tx(stdb); + stdb.iter_by_col_eq(&tx, table_id, 0, &sender.into()) + .unwrap() + .map(|row| ProductValue { + elements: row.to_product_value().elements.iter().skip(1).cloned().collect(), + }) + .collect::>() + }; + + // Create the view + let mut tx = begin_mut_tx(&stdb); + let (view_id, table_id) = stdb.create_view(&mut tx, &module_def, view_def)?; + stdb.commit_tx(tx)?; + + let apply_view_update = |sender: Identity, val: u8| -> ResultTest<()> { + let mut tx = begin_mut_tx(&stdb); + tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?; + stdb.materialize_view(&mut tx, table_id, sender, row_type, to_bstan(&row_pv(val)), typespace)?; + stdb.commit_tx(tx)?; + Ok(()) + }; + + // Sender 1 + let sender1 = Identity::ONE; + apply_view_update(sender1, 42)?; + let mut tx = begin_mut_tx(&stdb); + tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender1)?; + stdb.commit_tx(tx)?; + + assert_eq!( + project_views(&stdb, table_id, sender1)[0], + row_pv(42), + "Materialized view row does not match inserted row" + ); + + // Sender 2 + let sender2 = Identity::ZERO; + let before_sender2 = Instant::now(); + apply_view_update(sender2, 84)?; + + assert_eq!( + project_views(&stdb, table_id, sender2)[0], + row_pv(84), + "Materialized view row does not match inserted row" + ); + + // Clear expired views + let mut tx = begin_mut_tx(&stdb); + tx.clear_expired_views(Instant::now().saturating_duration_since(before_sender2))?; + stdb.commit_tx(tx)?; + + assert!( + project_views(&stdb, table_id, sender1).is_empty(), + "Sender 1 rows should be cleared" + ); + assert_eq!( + project_views(&stdb, table_id, sender2)[0], + row_pv(84), + "Sender 2 rows should not be cleared" + ); + + // Validate st_view_subs state + let tx = begin_mut_tx(&stdb); + let st_view_row = tx.lookup_st_view_subs(view_id)?; + assert_eq!(st_view_row.len(), 1, "Sender 1 should be removed from st_view_subs"); + assert_eq!( + st_view_row[0].identity.0, sender2, + "Sender 1 should be removed from st_view_subs" + ); + + Ok(()) + } + #[test] fn test_table_name() -> ResultTest<()> { let stdb = TestDB::durable()?; diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index d6e5f3df242..49a7465a2e9 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1999,7 +1999,7 @@ impl MutTxId { .collect(); // For each expired view subscription, clear the backing table and delete the subscription - for (row, ptr) in expired_sub_rows { + for (row, sub_row_ptr) in expired_sub_rows { let view_id = row.view_id; let sender: Identity = row.identity.into(); // Get the view's backing table_id from st_view @@ -2013,7 +2013,7 @@ impl MutTxId { self.drop_view_from_committed_read_set(view_id); } else { let rows_to_delete = self - .iter_by_col_eq(table_id, StViewSubFields::Identity, &sender.into())? + .iter_by_col_eq(table_id, 0, &sender.into())? .map(|res| res.pointer()) .collect::>(); @@ -2025,7 +2025,7 @@ impl MutTxId { } // Finally, delete the st_view_sub row - self.delete(ST_VIEW_SUB_ID, ptr)?; + self.delete(ST_VIEW_SUB_ID, sub_row_ptr)?; } Ok(()) } From d5b60eee2a7e5d9d125b89951244c2cce3ac806a Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 20 Nov 2025 13:10:17 +0530 Subject: [PATCH 5/7] max_duration in views --- crates/core/src/db/relational_db.rs | 42 +++++++++--- .../src/locking_tx_datastore/mut_tx.rs | 66 +++++++++++++------ 2 files changed, 77 insertions(+), 31 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 5a801a398bf..79d6ed8d299 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1061,21 +1061,40 @@ impl RelationalDB { /// Duration after which expired unused views are cleaned up. /// Value is chosen arbitrarily; can be tuned later if needed. -const VIEWS_EXPIRATION: std::time::Duration = std::time::Duration::from_secs(100); +const VIEWS_EXPIRATION: std::time::Duration = std::time::Duration::from_secs(10 * 60); -/// Spawn a background task that periodically cleans up expired views. +/// Duration to budget for each view cleanup job, so that it doesn't hold database lock for too +/// long. +//TODO: Make this value configurable +const VIEW_CLEANUP_BUDGET: std::time::Duration = std::time::Duration::from_millis(10); + +/// Spawn a background task that periodically cleans up expired views pub fn spawn_view_cleanup_loop(db: Arc) -> tokio::task::AbortHandle { tokio::spawn(async move { let db = &db; loop { - if let Err(e) = db.with_auto_commit(Workload::Internal, |tx| { - tx.clear_expired_views(VIEWS_EXPIRATION).map_err(DBError::from) + match db.with_auto_commit(Workload::Internal, |tx| { + tx.clear_expired_views(VIEWS_EXPIRATION, VIEW_CLEANUP_BUDGET) + .map_err(DBError::from) }) { - log::error!( - "[{}] DATABASE: failed to clear expired views: {}", - db.database_identity(), - e - ); + Ok((cleared, total_expired)) => { + if cleared != total_expired { + //TODO: Report it as metric + log::info!( + "[{}] DATABASE: cleared {} expired views ({} remaining)", + db.database_identity(), + cleared, + total_expired - cleared + ); + } + } + Err(e) => { + log::error!( + "[{}] DATABASE: failed to clear expired views: {}", + db.database_identity(), + e + ); + } } tokio::time::sleep(VIEWS_EXPIRATION).await; @@ -2590,7 +2609,10 @@ mod tests { // Clear expired views let mut tx = begin_mut_tx(&stdb); - tx.clear_expired_views(Instant::now().saturating_duration_since(before_sender2))?; + tx.clear_expired_views( + Instant::now().saturating_duration_since(before_sender2), + VIEW_CLEANUP_BUDGET, + )?; stdb.commit_tx(tx)?; assert!( diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 49a7465a2e9..e30934e87b8 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1970,39 +1970,61 @@ impl MutTxId { Ok(()) } - /// Clean up Views that have no subscribers and haven't been called within the expiration duration. + /// Clean up views that have no subscribers and haven’t been called recently. /// - /// It looks for rows in `st_view_sub` where: - /// - `has_subscribers` is `false`, - /// - `last_called` timestamp is older than the expiration threshold. + /// This function will scan for subscription entries in `st_view_sub` where: + /// - `has_subscribers == false`, `num_subscribers == 0`. + /// - `last_called` is older than `expiration_duration`. /// - /// for each such row, it clears the backing table, readset entry and deletes the subscription row. - pub fn clear_expired_views(&mut self, expiration_duration: Duration) -> Result<()> { + /// For each such expired view: + /// 1. It clears the backing table, + /// 2. Removes the view from the committed read set, and + /// 3. Deletes the subscription row. + /// + /// The cleanup is bounded by a total `max_duration`. The function stops when either: + /// - all expired views have been processed, or + /// - the `max_duration` budget is reached. + /// + /// Returns a tuple `(cleaned, total_expired)`: + /// - `cleaned`: Number of views actually cleaned (deleted) in this run. + /// - `total_expired`: Total number of expired views found (even if not all were cleaned due to time budget). + pub fn clear_expired_views( + &mut self, + expiration_duration: Duration, + max_duration: Duration, + ) -> Result<(usize, usize)> { + let start = std::time::Instant::now(); let now = Timestamp::now(); let expiration_threshold = now - expiration_duration; + let mut cleaned_count = 0; - // Collect rows that meet expiration criteria - let expired_sub_rows: Vec<(StViewSubRow, RowPointer)> = self + // Collect all expired views from st_view_sub + let expired_items: Vec<(ViewId, Identity, RowPointer)> = self .iter_by_col_eq( ST_VIEW_SUB_ID, StViewSubFields::HasSubscribers, &AlgebraicValue::from(false), )? - .map(|row_ref| { - StViewSubRow::try_from(row_ref) - .map(|row| (row, row_ref.pointer())) - .expect("Failed to deserialize st_view_sub row") - }) - .filter(|(row, _)| { - !row.has_subscribers && row.num_subscribers == 0 && row.last_called.0 < expiration_threshold + .filter_map(|row_ref| { + let row = StViewSubRow::try_from(row_ref).expect("Failed to deserialize st_view_sub row"); + + if !row.has_subscribers && row.num_subscribers == 0 && row.last_called.0 < expiration_threshold { + Some((row.view_id, row.identity.into(), row_ref.pointer())) + } else { + None + } }) .collect(); + let total_expired = expired_items.len(); + // For each expired view subscription, clear the backing table and delete the subscription - for (row, sub_row_ptr) in expired_sub_rows { - let view_id = row.view_id; - let sender: Identity = row.identity.into(); - // Get the view's backing table_id from st_view + for (view_id, sender, sub_row_ptr) in expired_items { + // Check if we've exceeded our time budget + if start.elapsed() >= max_duration { + break; + } + let StViewRow { table_id, is_anonymous, .. } = self.lookup_st_view(view_id)?; @@ -2024,10 +2046,12 @@ impl MutTxId { self.drop_view_with_sender_from_committed_read_set(view_id, sender); } - // Finally, delete the st_view_sub row + // Finally, delete the subscription row self.delete(ST_VIEW_SUB_ID, sub_row_ptr)?; + cleaned_count += 1; } - Ok(()) + + Ok((cleaned_count, total_expired)) } /// Decrement `num_subscribers` in `st_view_sub` to effectively unsubscribe a caller from a view. From 815cd620e7e527ac35b313f5dc0e23e47071105a Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 20 Nov 2025 16:40:31 +0530 Subject: [PATCH 6/7] rebased --- crates/core/src/db/relational_db.rs | 98 +++++++++++++---------------- 1 file changed, 44 insertions(+), 54 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 216f2dc901f..806d39e6cda 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -2432,7 +2432,6 @@ mod tests { ST_SEQUENCE_ID, ST_TABLE_ID, }; use spacetimedb_fs_utils::compression::CompressType; - use spacetimedb_lib::bsatn::to_vec; use spacetimedb_lib::db::raw_def::v9::{btree, RawTableDefBuilder}; use spacetimedb_lib::error::ResultTest; use spacetimedb_lib::Identity; @@ -2657,65 +2656,59 @@ mod tests { #[test] fn test_views() -> ResultTest<()> { let stdb = TestDB::durable()?; - let module_def = view_module_def(); - let view_def = module_def.view("my_view").unwrap(); + let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?; let row_type = view_def.product_type_ref; let typespace = module_def.typespace(); - let to_bstan = |pv: &ProductValue| { - Bytes::from(to_vec(&AlgebraicValue::Array([pv.clone()].into())).expect("bstan serialization failed")) - }; - let row_pv = |v: u8| ProductValue::from_iter(vec![AlgebraicValue::U8(v)]); - - let project_views = |stdb: &TestDB, table_id: TableId, sender: Identity| { - let tx = begin_tx(stdb); - stdb.iter_by_col_eq(&tx, table_id, 0, &sender.into()) - .unwrap() - .map(|row| ProductValue { - elements: row.to_product_value().elements.iter().skip(1).cloned().collect(), - }) - .collect::>() - }; - - // Create the view - let mut tx = begin_mut_tx(&stdb); - let (view_id, table_id) = stdb.create_view(&mut tx, &module_def, view_def)?; - stdb.commit_tx(tx)?; - - let apply_view_update = |sender: Identity, val: u8| -> ResultTest<()> { - let mut tx = begin_mut_tx(&stdb); - tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?; - stdb.materialize_view(&mut tx, table_id, sender, row_type, to_bstan(&row_pv(val)), typespace)?; - stdb.commit_tx(tx)?; - Ok(()) - }; - - // Sender 1 let sender1 = Identity::ONE; - apply_view_update(sender1, 42)?; - let mut tx = begin_mut_tx(&stdb); - tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender1)?; - stdb.commit_tx(tx)?; + + // Sender 1 insert + insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender1, 42)?; assert_eq!( project_views(&stdb, table_id, sender1)[0], - row_pv(42), - "Materialized view row does not match inserted row" + product![42u8], + "View row not inserted correctly" ); - // Sender 2 + // Sender 2 insert let sender2 = Identity::ZERO; let before_sender2 = Instant::now(); - apply_view_update(sender2, 84)?; + insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 84)?; assert_eq!( project_views(&stdb, table_id, sender2)[0], - row_pv(84), - "Materialized view row does not match inserted row" + product![84u8], + "Sender 2 view row not inserted correctly" ); - // Clear expired views + // Restart database (view rows should NOT persist) + let stdb = stdb.reopen()?; + + assert!( + project_views(&stdb, table_id, sender1).is_empty(), + "Sender 1 rows should NOT persist after reopen" + ); + assert!( + project_views(&stdb, table_id, sender2).is_empty(), + "Sender 2 rows should NOT persist after reopen" + ); + + let tx = begin_mut_tx(&stdb); + let st = tx.lookup_st_view_subs(view_id)?; + assert!(st.is_empty(), "st_view_subs should also be cleared after restart"); + stdb.commit_tx(tx)?; + + // Reinsert after restart + insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 91)?; + assert_eq!( + project_views(&stdb, table_id, sender2)[0], + product![91u8], + "Post-restart inserted rows must appear" + ); + + // Clean expired rows let mut tx = begin_mut_tx(&stdb); tx.clear_expired_views( Instant::now().saturating_duration_since(before_sender2), @@ -2723,28 +2716,25 @@ mod tests { )?; stdb.commit_tx(tx)?; + // Only sender2 exists after reinsertion assert!( project_views(&stdb, table_id, sender1).is_empty(), - "Sender 1 rows should be cleared" + "Sender 1 should remain empty" ); assert_eq!( project_views(&stdb, table_id, sender2)[0], - row_pv(84), - "Sender 2 rows should not be cleared" + product![91u8], + "Sender 2 row should remain" ); - // Validate st_view_subs state + // And st_view_subs must reflect only sender2 let tx = begin_mut_tx(&stdb); - let st_view_row = tx.lookup_st_view_subs(view_id)?; - assert_eq!(st_view_row.len(), 1, "Sender 1 should be removed from st_view_subs"); - assert_eq!( - st_view_row[0].identity.0, sender2, - "Sender 1 should be removed from st_view_subs" - ); + let st_after = tx.lookup_st_view_subs(view_id)?; + assert_eq!(st_after.len(), 1); + assert_eq!(st_after[0].identity.0, sender2); Ok(()) } - #[test] fn test_table_name() -> ResultTest<()> { let stdb = TestDB::durable()?; From 507548559c333374db42949977d5ae38824f8cdd Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Thu, 20 Nov 2025 16:46:21 +0530 Subject: [PATCH 7/7] rebased --- crates/core/src/db/relational_db.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 806d39e6cda..cf2ab35c3a6 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1077,8 +1077,8 @@ impl RelationalDB { /// Value is chosen arbitrarily; can be tuned later if needed. const VIEWS_EXPIRATION: std::time::Duration = std::time::Duration::from_secs(10 * 60); -/// Duration to budget for each view cleanup job, so that it doesn't hold database lock for too -/// long. +/// Duration to budget for each view cleanup job, +/// so that it doesn't hold transaction lock for to long. //TODO: Make this value configurable const VIEW_CLEANUP_BUDGET: std::time::Duration = std::time::Duration::from_millis(10);