@@ -175,7 +175,10 @@ use crate::{
175175 vector:: vector_search_with_retries_timer,
176176 verify_invariants_timer,
177177 } ,
178- retention:: LeaderRetentionManager ,
178+ retention:: {
179+ LeaderRetentionManager ,
180+ LeaderRetentionWorkers ,
181+ } ,
179182 schema_registry:: SchemaRegistry ,
180183 search_index_bootstrap:: SearchIndexBootstrapWorker ,
181184 snapshot_manager:: {
@@ -217,7 +220,6 @@ use crate::{
217220 BootstrapComponentsModel ,
218221 ComponentRegistry ,
219222 ComponentsTable ,
220- FollowerRetentionManager ,
221223 SchemasTable ,
222224 TableIterator ,
223225 Transaction ,
@@ -274,7 +276,8 @@ pub struct Database<RT: Runtime> {
274276 pub ( crate ) runtime : RT ,
275277 reader : Arc < dyn PersistenceReader > ,
276278 write_commits_since_load : Arc < AtomicUsize > ,
277- retention_manager : LeaderRetentionManager < RT > ,
279+ retention_manager : Arc < LeaderRetentionManager < RT > > ,
280+ retention_workers : LeaderRetentionWorkers ,
278281 pub searcher : Arc < dyn Searcher > ,
279282 pub search_storage : Arc < OnceLock < Arc < dyn Storage > > > ,
280283 usage_counter : UsageCounter ,
@@ -960,18 +963,14 @@ impl<RT: Runtime> Database<RT> {
960963 // the latest timestamp to perform the load at.
961964 let snapshot_ts = new_idle_repeatable_ts ( persistence. as_ref ( ) , & runtime) . await ?;
962965
963- let follower_retention_manager = FollowerRetentionManager :: new_with_repeatable_ts (
964- runtime. clone ( ) ,
965- persistence. reader ( ) ,
966- snapshot_ts,
967- )
968- . await ?;
966+ let ( retention_manager, retention_worker_seed) =
967+ LeaderRetentionManager :: new ( runtime. clone ( ) , persistence. clone ( ) , snapshot_ts) . await ?;
969968
970969 let db_snapshot = DatabaseSnapshot :: load (
971970 runtime. clone ( ) ,
972971 reader. clone ( ) ,
973972 snapshot_ts,
974- Arc :: new ( follower_retention_manager . clone ( ) ) ,
973+ retention_manager . clone ( ) ,
975974 )
976975 . await ?;
977976 let max_ts = DatabaseSnapshot :: < RT > :: max_ts ( & * reader) . await ?;
@@ -992,16 +991,14 @@ impl<RT: Runtime> Database<RT> {
992991 let snapshot_manager = SnapshotManager :: new ( * ts, snapshot) ;
993992 let ( snapshot_reader, snapshot_writer) = new_split_rw_lock ( snapshot_manager) ;
994993
995- let retention_manager = LeaderRetentionManager :: new (
996- runtime. clone ( ) ,
997- persistence. clone ( ) ,
998- bootstrap_metadata. clone ( ) ,
999- snapshot_reader. clone ( ) ,
1000- follower_retention_manager,
1001- shutdown. clone ( ) ,
1002- retention_rate_limiter,
1003- )
1004- . await ?;
994+ let retention_workers = retention_worker_seed
995+ . start_retention (
996+ bootstrap_metadata. clone ( ) ,
997+ snapshot_reader. clone ( ) ,
998+ shutdown. clone ( ) ,
999+ retention_rate_limiter,
1000+ )
1001+ . await ?;
10051002
10061003 let persistence_reader = persistence. reader ( ) ;
10071004 let ( log_owner, log_reader, log_writer) = new_write_log ( * ts) ;
@@ -1012,7 +1009,7 @@ impl<RT: Runtime> Database<RT> {
10121009 snapshot_writer,
10131010 persistence,
10141011 runtime. clone ( ) ,
1015- Arc :: new ( retention_manager. clone ( ) ) ,
1012+ retention_manager. clone ( ) ,
10161013 shutdown,
10171014 ) ;
10181015 let table_mapping_snapshot_cache =
@@ -1028,6 +1025,7 @@ impl<RT: Runtime> Database<RT> {
10281025 runtime,
10291026 log : log_reader,
10301027 retention_manager,
1028+ retention_workers,
10311029 snapshot_manager : snapshot_reader,
10321030 reader : persistence_reader. clone ( ) ,
10331031 write_commits_since_load : Arc :: new ( AtomicUsize :: new ( 0 ) ) ,
@@ -1088,13 +1086,13 @@ impl<RT: Runtime> Database<RT> {
10881086 pub async fn shutdown ( & self ) -> anyhow:: Result < ( ) > {
10891087 self . committer . shutdown ( ) ;
10901088 self . subscriptions . shutdown ( ) ;
1091- self . retention_manager . shutdown ( ) . await ?;
1089+ self . retention_workers . shutdown ( ) . await ?;
10921090 tracing:: info!( "Database shutdown" ) ;
10931091 Ok ( ( ) )
10941092 }
10951093
10961094 pub fn retention_validator ( & self ) -> Arc < dyn RetentionValidator > {
1097- Arc :: new ( self . retention_manager . clone ( ) )
1095+ self . retention_manager . clone ( )
10981096 }
10991097
11001098 /// Load the set of documents and tombstones in the given table between
@@ -1696,7 +1694,7 @@ impl<RT: Runtime> Database<RT> {
16961694 RepeatablePersistence :: new (
16971695 self . reader . clone ( ) ,
16981696 repeatable_ts,
1699- Arc :: new ( self . retention_manager . clone ( ) ) ,
1697+ self . retention_manager . clone ( ) ,
17001698 )
17011699 . read_snapshot ( repeatable_ts) ?,
17021700 ) ,
@@ -1719,7 +1717,7 @@ impl<RT: Runtime> Database<RT> {
17191717 count_snapshot,
17201718 self . runtime . clone ( ) ,
17211719 usage_tracker,
1722- Arc :: new ( self . retention_manager . clone ( ) ) ,
1720+ self . retention_manager . clone ( ) ,
17231721 self . virtual_system_mapping . clone ( ) ,
17241722 ) ;
17251723 Ok ( tx)
0 commit comments