diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 3a5399e5a7e..cf2ab35c3a6 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -1073,6 +1073,50 @@ impl RelationalDB { } } +/// Duration after which expired unused views are cleaned up. +/// Value is chosen arbitrarily; can be tuned later if needed. +const VIEWS_EXPIRATION: std::time::Duration = std::time::Duration::from_secs(10 * 60); + +/// Duration to budget for each view cleanup job, +/// so that it doesn't hold transaction lock for to long. +//TODO: Make this value configurable +const VIEW_CLEANUP_BUDGET: std::time::Duration = std::time::Duration::from_millis(10); + +/// Spawn a background task that periodically cleans up expired views +pub fn spawn_view_cleanup_loop(db: Arc) -> tokio::task::AbortHandle { + tokio::spawn(async move { + let db = &db; + loop { + match db.with_auto_commit(Workload::Internal, |tx| { + tx.clear_expired_views(VIEWS_EXPIRATION, VIEW_CLEANUP_BUDGET) + .map_err(DBError::from) + }) { + Ok((cleared, total_expired)) => { + if cleared != total_expired { + //TODO: Report it as metric + log::info!( + "[{}] DATABASE: cleared {} expired views ({} remaining)", + db.database_identity(), + cleared, + total_expired - cleared + ); + } + } + Err(e) => { + log::error!( + "[{}] DATABASE: failed to clear expired views: {}", + db.database_identity(), + e + ); + } + } + + tokio::time::sleep(VIEWS_EXPIRATION).await; + } + }) + .abort_handle() +} + impl RelationalDB { pub fn create_table(&self, tx: &mut MutTx, schema: TableSchema) -> Result { Ok(self.inner.create_table_mut_tx(tx, schema)?) @@ -2368,6 +2412,7 @@ mod tests { use std::fs::OpenOptions; use std::path::PathBuf; use std::rc::Rc; + use std::time::Instant; use super::tests_utils::begin_mut_tx; use super::*; @@ -2608,6 +2653,88 @@ mod tests { Ok(()) } + #[test] + fn test_views() -> ResultTest<()> { + let stdb = TestDB::durable()?; + + let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?; + let row_type = view_def.product_type_ref; + let typespace = module_def.typespace(); + + let sender1 = Identity::ONE; + + // Sender 1 insert + insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender1, 42)?; + + assert_eq!( + project_views(&stdb, table_id, sender1)[0], + product![42u8], + "View row not inserted correctly" + ); + + // Sender 2 insert + let sender2 = Identity::ZERO; + let before_sender2 = Instant::now(); + insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 84)?; + + assert_eq!( + project_views(&stdb, table_id, sender2)[0], + product![84u8], + "Sender 2 view row not inserted correctly" + ); + + // Restart database (view rows should NOT persist) + let stdb = stdb.reopen()?; + + assert!( + project_views(&stdb, table_id, sender1).is_empty(), + "Sender 1 rows should NOT persist after reopen" + ); + assert!( + project_views(&stdb, table_id, sender2).is_empty(), + "Sender 2 rows should NOT persist after reopen" + ); + + let tx = begin_mut_tx(&stdb); + let st = tx.lookup_st_view_subs(view_id)?; + assert!(st.is_empty(), "st_view_subs should also be cleared after restart"); + stdb.commit_tx(tx)?; + + // Reinsert after restart + insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 91)?; + assert_eq!( + project_views(&stdb, table_id, sender2)[0], + product![91u8], + "Post-restart inserted rows must appear" + ); + + // Clean expired rows + let mut tx = begin_mut_tx(&stdb); + tx.clear_expired_views( + Instant::now().saturating_duration_since(before_sender2), + VIEW_CLEANUP_BUDGET, + )?; + stdb.commit_tx(tx)?; + + // Only sender2 exists after reinsertion + assert!( + project_views(&stdb, table_id, sender1).is_empty(), + "Sender 1 should remain empty" + ); + assert_eq!( + project_views(&stdb, table_id, sender2)[0], + product![91u8], + "Sender 2 row should remain" + ); + + // And st_view_subs must reflect only sender2 + let tx = begin_mut_tx(&stdb); + let st_after = tx.lookup_st_view_subs(view_id)?; + assert_eq!(st_after.len(), 1); + assert_eq!(st_after[0].identity.0, sender2); + + Ok(()) + } #[test] fn test_table_name() -> ResultTest<()> { let stdb = TestDB::durable()?; diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index f4178bb1422..cb40eb48240 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -5,7 +5,7 @@ use super::{Scheduler, UpdateDatabaseResult}; use crate::client::{ClientActorId, ClientName}; use crate::database_logger::DatabaseLogger; use crate::db::persistence::PersistenceProvider; -use crate::db::relational_db::{self, DiskSizeFn, RelationalDB, Txdata}; +use crate::db::relational_db::{self, spawn_view_cleanup_loop, DiskSizeFn, RelationalDB, Txdata}; use crate::db::{self, spawn_tx_metrics_recorder}; use crate::energy::{EnergyMonitor, EnergyQuanta, NullEnergyMonitor}; use crate::host::module_host::ModuleRuntime as _; @@ -746,6 +746,9 @@ struct Host { /// Handle to the task responsible for recording metrics for each transaction. /// The task is aborted when [`Host`] is dropped. tx_metrics_recorder_task: AbortHandle, + /// Handle to the task responsible for cleaning up old views. + /// The task is aborted when [`Host`] is dropped. + view_cleanup_task: AbortHandle, } impl Host { @@ -870,19 +873,17 @@ impl Host { scheduler_starter.start(&module_host)?; let disk_metrics_recorder_task = tokio::spawn(metric_reporter(replica_ctx.clone())).abort_handle(); + let view_cleanup_task = spawn_view_cleanup_loop(replica_ctx.relational_db.clone()); let module = watch::Sender::new(module_host); - //TODO(shub): Below code interfere with `exit_module` code, - // I suspect channel internally holds a reference to the module, - // even after we drop the sender. - // - // replica_ctx.subscriptions.init(module.subscribe()); + Ok(Host { module, replica_ctx, scheduler, disk_metrics_recorder_task, tx_metrics_recorder_task, + view_cleanup_task, }) } @@ -1059,6 +1060,7 @@ impl Drop for Host { fn drop(&mut self) { self.disk_metrics_recorder_task.abort(); self.tx_metrics_recorder_task.abort(); + self.view_cleanup_task.abort(); } } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 5d42000b0c3..daeac6d4be1 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -669,8 +669,8 @@ impl CommittedState { tx_data.has_rows_or_connect_disconnect(ctx.reducer_context()) } - pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId) { - self.read_sets.remove_view(view_id) + pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId, sender: Option) { + self.read_sets.remove_view(view_id, sender) } pub(super) fn merge(&mut self, tx_state: TxState, read_sets: ViewReadSets, ctx: &ExecutionContext) -> TxData { diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index d21d453b7af..8ef3f605628 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -107,9 +107,9 @@ impl ViewReadSets { } /// Removes keys for `view_id` from the read set - pub fn remove_view(&mut self, view_id: ViewId) { + pub fn remove_view(&mut self, view_id: ViewId, sender: Option) { self.tables.retain(|_, readset| { - readset.remove_view(view_id); + readset.remove_view(view_id, sender); !readset.is_empty() }); } @@ -144,9 +144,14 @@ impl TableReadSet { self.table_scans.is_empty() } - /// Removes keys for `view_id` from the read set - fn remove_view(&mut self, view_id: ViewId) { - self.table_scans.retain(|info| info.view_id != view_id); + /// Removes keys for `view_id` from the read set, optionally filtering by `sender` + fn remove_view(&mut self, view_id: ViewId, sender: Option) { + if let Some(identity) = sender { + self.table_scans + .retain(|call| !(call.view_id == view_id && call.sender.as_ref() == Some(&identity))); + } else { + self.table_scans.retain(|call| call.view_id != view_id); + } } /// Merge or union two read sets for this table @@ -221,7 +226,13 @@ impl MutTxId { /// Removes keys for `view_id` from the committed read set. /// Used for dropping views in an auto-migration. pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) { - self.committed_state_write_lock.drop_view_from_read_sets(view_id) + self.committed_state_write_lock.drop_view_from_read_sets(view_id, None) + } + + /// Removes a specific view call from the committed read set. + pub fn drop_view_with_sender_from_committed_read_set(&mut self, view_id: ViewId, sender: Identity) { + self.committed_state_write_lock + .drop_view_from_read_sets(view_id, Some(sender)) } } @@ -1961,6 +1972,90 @@ impl MutTxId { Ok(()) } + /// Clean up views that have no subscribers and haven’t been called recently. + /// + /// This function will scan for subscription entries in `st_view_sub` where: + /// - `has_subscribers == false`, `num_subscribers == 0`. + /// - `last_called` is older than `expiration_duration`. + /// + /// For each such expired view: + /// 1. It clears the backing table, + /// 2. Removes the view from the committed read set, and + /// 3. Deletes the subscription row. + /// + /// The cleanup is bounded by a total `max_duration`. The function stops when either: + /// - all expired views have been processed, or + /// - the `max_duration` budget is reached. + /// + /// Returns a tuple `(cleaned, total_expired)`: + /// - `cleaned`: Number of views actually cleaned (deleted) in this run. + /// - `total_expired`: Total number of expired views found (even if not all were cleaned due to time budget). + pub fn clear_expired_views( + &mut self, + expiration_duration: Duration, + max_duration: Duration, + ) -> Result<(usize, usize)> { + let start = std::time::Instant::now(); + let now = Timestamp::now(); + let expiration_threshold = now - expiration_duration; + let mut cleaned_count = 0; + + // Collect all expired views from st_view_sub + let expired_items: Vec<(ViewId, Identity, RowPointer)> = self + .iter_by_col_eq( + ST_VIEW_SUB_ID, + StViewSubFields::HasSubscribers, + &AlgebraicValue::from(false), + )? + .filter_map(|row_ref| { + let row = StViewSubRow::try_from(row_ref).expect("Failed to deserialize st_view_sub row"); + + if !row.has_subscribers && row.num_subscribers == 0 && row.last_called.0 < expiration_threshold { + Some((row.view_id, row.identity.into(), row_ref.pointer())) + } else { + None + } + }) + .collect(); + + let total_expired = expired_items.len(); + + // For each expired view subscription, clear the backing table and delete the subscription + for (view_id, sender, sub_row_ptr) in expired_items { + // Check if we've exceeded our time budget + if start.elapsed() >= max_duration { + break; + } + + let StViewRow { + table_id, is_anonymous, .. + } = self.lookup_st_view(view_id)?; + let table_id = table_id.expect("views have backing table"); + + if is_anonymous { + self.clear_table(table_id)?; + self.drop_view_from_committed_read_set(view_id); + } else { + let rows_to_delete = self + .iter_by_col_eq(table_id, 0, &sender.into())? + .map(|res| res.pointer()) + .collect::>(); + + for row_ptr in rows_to_delete { + self.delete(table_id, row_ptr)?; + } + + self.drop_view_with_sender_from_committed_read_set(view_id, sender); + } + + // Finally, delete the subscription row + self.delete(ST_VIEW_SUB_ID, sub_row_ptr)?; + cleaned_count += 1; + } + + Ok((cleaned_count, total_expired)) + } + /// Decrement `num_subscribers` in `st_view_sub` to effectively unsubscribe a caller from a view. pub fn unsubscribe_view(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> { use StViewSubFields::*;