From 90032876306c785540e42f04ac65d37af56a0bd8 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 7 Nov 2025 16:30:17 +0100 Subject: [PATCH 1/3] fix: stabilize session actor fanout --- crates/core/src/client_events/mod.rs | 35 +- .../core/src/client_events/session_actor.rs | 344 +++++++++++++++--- crates/core/src/contract/handler.rs | 27 ++ crates/core/src/node/mod.rs | 14 +- 4 files changed, 349 insertions(+), 71 deletions(-) diff --git a/crates/core/src/client_events/mod.rs b/crates/core/src/client_events/mod.rs index c4db9c6a6..894bd1614 100644 --- a/crates/core/src/client_events/mod.rs +++ b/crates/core/src/client_events/mod.rs @@ -1155,27 +1155,17 @@ async fn process_open_request( tracing::debug!( peer_id = %peer_id, key = %key, - "Starting direct SUBSCRIBE operation (legacy mode)", + "Starting direct SUBSCRIBE operation", ); - // Legacy mode: direct operation without deduplication - let op_id = - crate::node::subscribe(op_manager.clone(), key, Some(client_id)) - .await - .inspect_err(|err| { - tracing::error!("Subscribe error: {}", err); - })?; - - tracing::debug!( - request_id = %request_id, - transaction_id = %op_id, - operation = "subscribe", - "Request-Transaction correlation" - ); + // Generate transaction, register first, then run op + let tx = crate::message::Transaction::new::< + crate::operations::subscribe::SubscribeMsg, + >(); op_manager .ch_outbound - .waiting_for_transaction_result(op_id, client_id, request_id) + .waiting_for_transaction_result(tx, client_id, request_id) .await .inspect_err(|err| { tracing::error!( @@ -1183,6 +1173,19 @@ async fn process_open_request( err ); })?; + + crate::node::subscribe_with_id(op_manager.clone(), key, None, Some(tx)) + .await + .inspect_err(|err| { + tracing::error!("Subscribe error: {}", err); + })?; + + tracing::debug!( + request_id = %request_id, + transaction_id = %tx, + operation = "subscribe", + "Request-Transaction correlation" + ); } } _ => { diff --git a/crates/core/src/client_events/session_actor.rs b/crates/core/src/client_events/session_actor.rs index 2f0825b4a..6961659f3 100644 --- a/crates/core/src/client_events/session_actor.rs +++ b/crates/core/src/client_events/session_actor.rs @@ -3,9 +3,10 @@ //! This module provides a simplified session actor that manages client sessions //! and handles efficient 1→N result delivery to multiple clients. -use crate::client_events::{ClientId, HostResult, RequestId}; +use crate::client_events::{ClientId, HostResponse, HostResult, RequestId}; use crate::contract::{ClientResponsesSender, SessionMessage}; use crate::message::Transaction; +use freenet_stdlib::client_api::ContractResponse; use std::collections::{HashMap, HashSet}; use tokio::sync::mpsc; use tracing::debug; @@ -16,9 +17,25 @@ pub struct SessionActor { client_transactions: HashMap>, // Track RequestId correlation for each (Transaction, ClientId) pair client_request_ids: HashMap<(Transaction, ClientId), RequestId>, + pending_results: HashMap, client_responses: ClientResponsesSender, } +#[derive(Clone)] +struct PendingResult { + result: std::sync::Arc, + delivered_clients: HashSet, +} + +impl PendingResult { + fn new(result: std::sync::Arc) -> Self { + Self { + result, + delivered_clients: HashSet::new(), + } + } +} + impl SessionActor { /// Create a new session actor pub fn new( @@ -29,6 +46,7 @@ impl SessionActor { message_rx, client_transactions: HashMap::new(), client_request_ids: HashMap::new(), + pending_results: HashMap::new(), client_responses, } } @@ -74,6 +92,18 @@ impl SessionActor { request_id, self.client_transactions.get(&tx).map_or(0, |s| s.len()) ); + + if let Some(result_arc) = self.pending_results.get_mut(&tx).and_then(|pending| { + if pending.delivered_clients.insert(client_id) { + Some(pending.result.clone()) + } else { + None + } + }) { + let mut recipients = HashSet::new(); + recipients.insert(client_id); + self.deliver_result_to_clients(tx, recipients, result_arc); + } } SessionMessage::ClientDisconnect { client_id } => { self.cleanup_client_transactions(client_id); @@ -96,62 +126,115 @@ impl SessionActor { } } - /// CORE: 1→N Result Delivery with RequestId correlation - /// Optimized with Arc to minimize cloning overhead in 1→N delivery - async fn handle_result_delivery( + fn deliver_result_to_clients( &mut self, tx: Transaction, - result: std::sync::Arc, + waiting_clients: HashSet, + result: std::sync::Arc, ) { - tracing::info!("Session actor attempting to deliver result for transaction {}, registered transactions: {}", tx, self.client_transactions.len()); - if let Some(waiting_clients) = self.client_transactions.remove(&tx) { - let client_count = waiting_clients.len(); + let client_count = waiting_clients.len(); + tracing::info!( + "Delivering result for transaction {} to {} clients", + tx, + client_count + ); + + if let Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, + state, + .. + })) = result.as_ref() + { tracing::info!( - "Delivering result for transaction {} to {} clients", - tx, - client_count + "Contract GET response ready for delivery: contract={} bytes={}", + key, + state.as_ref().len() ); + } - // Optimized 1→N delivery with RequestId correlation - for client_id in waiting_clients { - // Look up the RequestId for this (transaction, client) pair - let request_id = - self.client_request_ids - .remove(&(tx, client_id)) - .unwrap_or_else(|| { - tracing::warn!( - "No RequestId found for transaction {} and client {}, using default", - tx, client_id - ); - RequestId::new() - }); - - if let Err(e) = - self.client_responses - .send((client_id, request_id, (*result).clone())) - { + // Optimized 1→N delivery with RequestId correlation + for client_id in waiting_clients { + // Look up the RequestId for this (transaction, client) pair + let request_id = self + .client_request_ids + .remove(&(tx, client_id)) + .unwrap_or_else(|| { tracing::warn!( - "Failed to deliver result to client {} (request {}): {}", - client_id, - request_id, - e - ); - } else { - tracing::debug!( - "Delivered result for transaction {} to client {} with request correlation {}", - tx, client_id, request_id + "No RequestId found for transaction {} and client {}, using default", + tx, + client_id ); - } - } + RequestId::new() + }); - if client_count > 1 { + if let Err(e) = self + .client_responses + .send((client_id, request_id, (*result).clone())) + { + tracing::warn!( + "Failed to deliver result to client {} (request {}): {}", + client_id, + request_id, + e + ); + } else { tracing::debug!( - "Successfully delivered result for transaction {} to {} clients via optimized 1→N fanout with RequestId correlation", - tx, client_count + "Delivered result for transaction {} to client {} with request correlation {}", + tx, + client_id, + request_id ); } + } + + if client_count > 1 { + tracing::debug!( + "Successfully delivered result for transaction {} to {} clients via optimized 1→N fanout with RequestId correlation", + tx, + client_count + ); + } + } + + /// CORE: 1→N Result Delivery with RequestId correlation + /// Optimized with Arc to minimize cloning overhead in 1→N delivery + async fn handle_result_delivery( + &mut self, + tx: Transaction, + result: std::sync::Arc, + ) { + tracing::info!( + "Session actor attempting to deliver result for transaction {}, registered transactions: {}", + tx, + self.client_transactions.len() + ); + + let mut recipients = HashSet::new(); + let result_to_deliver = { + let entry = self + .pending_results + .entry(tx) + .or_insert_with(|| PendingResult::new(result.clone())); + entry.result = result.clone(); + + if let Some(waiting_clients) = self.client_transactions.remove(&tx) { + for client_id in waiting_clients { + if entry.delivered_clients.insert(client_id) { + recipients.insert(client_id); + } + } + } + + entry.result.clone() + }; + + if !recipients.is_empty() { + self.deliver_result_to_clients(tx, recipients, result_to_deliver); } else { - tracing::debug!("No clients waiting for transaction result: {}", tx); + tracing::debug!( + "No clients waiting for transaction result: {}, caching response for deferred delivery", + tx + ); } } @@ -199,6 +282,13 @@ impl SessionActor { e ); } else { + let entry = self + .pending_results + .entry(tx) + .or_insert_with(|| PendingResult::new(result.clone())); + entry.delivered_clients.insert(client_id); + entry.result = result.clone(); + tracing::debug!( "Delivered result for transaction {} to specific client {} with request correlation {}", tx, client_id, request_id @@ -358,6 +448,172 @@ mod tests { actor_handle.await.unwrap(); } + #[tokio::test] + async fn test_pending_result_reaches_late_registered_clients() { + use crate::contract::client_responses_channel; + use crate::operations::subscribe::SubscribeMsg; + use freenet_stdlib::client_api::{ContractResponse, HostResponse}; + use freenet_stdlib::prelude::{ContractInstanceId, ContractKey}; + + let (session_tx, session_rx) = mpsc::channel(100); + let (mut client_responses_rx, client_responses_tx) = client_responses_channel(); + let actor = SessionActor::new(session_rx, client_responses_tx); + + let actor_handle = tokio::spawn(async move { + actor.run().await; + }); + + let tx = Transaction::new::(); + let contract_key = ContractKey::from(ContractInstanceId::new([7u8; 32])); + let host_result = Ok(HostResponse::ContractResponse( + ContractResponse::SubscribeResponse { + key: contract_key, + subscribed: true, + }, + )); + + // Deliver result before any clients register; this models LocalSubscribeComplete firing + // before the session actor processes the pending subscription registration. + session_tx + .send(SessionMessage::DeliverHostResponse { + tx, + response: std::sync::Arc::new(host_result.clone()), + }) + .await + .unwrap(); + + // First client registers and should receive the cached result. + let client_one = ClientId::FIRST; + let request_one = RequestId::new(); + session_tx + .send(SessionMessage::RegisterTransaction { + tx, + client_id: client_one, + request_id: request_one, + }) + .await + .unwrap(); + + let (delivered_client_one, delivered_request_one, delivered_result_one) = + tokio::time::timeout( + tokio::time::Duration::from_millis(200), + client_responses_rx.recv(), + ) + .await + .expect("session actor failed to deliver cached result to first client") + .expect("client response channel closed unexpectedly"); + assert_eq!(delivered_client_one, client_one); + assert_eq!(delivered_request_one, request_one); + match delivered_result_one { + Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse { + key, + subscribed, + })) => { + assert_eq!(key, contract_key); + assert!(subscribed); + } + other => panic!("unexpected result delivered to first client: {:?}", other), + } + + // Second client registers later; we expect the cached result to still be available. + let client_two = ClientId::next(); + let request_two = RequestId::new(); + session_tx + .send(SessionMessage::RegisterTransaction { + tx, + client_id: client_two, + request_id: request_two, + }) + .await + .unwrap(); + + let (delivered_client_two, delivered_request_two, delivered_result_two) = + tokio::time::timeout( + tokio::time::Duration::from_millis(200), + client_responses_rx.recv(), + ) + .await + .expect("pending result was not delivered to late-registered client") + .expect("client response channel closed unexpectedly for late registrant"); + assert_eq!(delivered_client_two, client_two); + assert_eq!(delivered_request_two, request_two); + match delivered_result_two { + Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse { + key, + subscribed, + })) => { + assert_eq!(key, contract_key); + assert!(subscribed); + } + other => panic!( + "unexpected result delivered to late-registered client: {:?}", + other + ), + } + + actor_handle.abort(); + } + + #[tokio::test] + async fn test_pending_result_delivered_after_registration() { + use crate::contract::client_responses_channel; + + let (session_tx, session_rx) = mpsc::channel(100); + let (mut client_responses_rx, client_responses_tx) = client_responses_channel(); + let actor = SessionActor::new(session_rx, client_responses_tx); + + let actor_handle = tokio::spawn(async move { + actor.run().await; + }); + + let tx = Transaction::new::(); + let client_id = ClientId::FIRST; + let request_id = RequestId::new(); + let host_result = Arc::new(Ok(HostResponse::Ok)); + + session_tx + .send(SessionMessage::DeliverHostResponse { + tx, + response: host_result.clone(), + }) + .await + .unwrap(); + + // Ensure the actor processes the pending result before registration. + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + + session_tx + .send(SessionMessage::RegisterTransaction { + tx, + client_id, + request_id, + }) + .await + .unwrap(); + + let delivered = tokio::time::timeout( + tokio::time::Duration::from_millis(200), + client_responses_rx.recv(), + ) + .await + .expect("Timed out waiting for pending result delivery") + .expect("Client response channel closed unexpectedly"); + + let (returned_client, returned_request, returned_result) = delivered; + assert_eq!(returned_client, client_id); + assert_eq!(returned_request, request_id); + match returned_result { + Ok(HostResponse::Ok) => {} + other => panic!( + "Unexpected result delivered. got={:?}, expected=Ok(HostResponse::Ok)", + other + ), + } + + drop(session_tx); + actor_handle.await.unwrap(); + } + #[tokio::test] async fn test_session_actor_client_disconnect_cleanup() { use crate::contract::client_responses_channel; diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index 4acdba4a6..699f59c73 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -343,6 +343,33 @@ impl ContractHandlerChannel { Ok(()) } + + pub async fn waiting_for_subscription_result( + &self, + tx: Transaction, + contract_key: ContractInstanceId, + client_id: ClientId, + request_id: RequestId, + ) -> Result<(), ContractError> { + self.end + .wait_for_res_tx + .send((client_id, WaitingTransaction::Subscription { contract_key })) + .await + .map_err(|_| ContractError::NoEvHandlerResponse)?; + + if let Some(session_tx) = &self.session_adapter_tx { + let msg = SessionMessage::RegisterTransaction { + tx, + client_id, + request_id, + }; + if let Err(e) = session_tx.try_send(msg) { + tracing::warn!("Failed to notify session actor: {}", e); + } + } + + Ok(()) + } } impl ContractHandlerChannel { diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index f3a4b165a..c50ac8be1 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -37,10 +37,7 @@ use self::p2p_impl::NodeP2P; use crate::{ client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest}, config::{Address, GatewayConfig, WebsocketApiConfig}, - contract::{ - Callback, ExecutorError, ExecutorToEventLoopChannel, NetworkContractHandler, - WaitingTransaction, - }, + contract::{Callback, ExecutorError, ExecutorToEventLoopChannel, NetworkContractHandler}, local_node::Executor, message::{InnerMessage, NetMessage, Transaction, TransactionType}, operations::{ @@ -1105,6 +1102,7 @@ async fn handle_pure_network_result( } /// Attempts to subscribe to a contract +#[allow(dead_code)] pub async fn subscribe( op_manager: Arc, key: ContractKey, @@ -1131,13 +1129,7 @@ pub async fn subscribe_with_id( let request_id = RequestId::new(); let _ = op_manager .ch_outbound - .waiting_for_transaction_result( - WaitingTransaction::Subscription { - contract_key: *key.id(), - }, - client_id, - request_id, - ) + .waiting_for_subscription_result(id, *key.id(), client_id, request_id) .await; } // Initialize a subscribe op. From 7fdad4aeb3e12f754a13847fc86447d01c313e95 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 7 Nov 2025 22:10:40 +0100 Subject: [PATCH 2/3] fix(session): prune pending results --- .../core/src/client_events/session_actor.rs | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/crates/core/src/client_events/session_actor.rs b/crates/core/src/client_events/session_actor.rs index 6961659f3..4de891016 100644 --- a/crates/core/src/client_events/session_actor.rs +++ b/crates/core/src/client_events/session_actor.rs @@ -8,9 +8,13 @@ use crate::contract::{ClientResponsesSender, SessionMessage}; use crate::message::Transaction; use freenet_stdlib::client_api::ContractResponse; use std::collections::{HashMap, HashSet}; +use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tracing::debug; +const PENDING_RESULT_TTL: Duration = Duration::from_secs(60); +const MAX_PENDING_RESULTS: usize = 2048; + /// Simple session actor for client connection refactor pub struct SessionActor { message_rx: mpsc::Receiver, @@ -25,6 +29,7 @@ pub struct SessionActor { struct PendingResult { result: std::sync::Arc, delivered_clients: HashSet, + last_accessed: Instant, } impl PendingResult { @@ -32,8 +37,13 @@ impl PendingResult { Self { result, delivered_clients: HashSet::new(), + last_accessed: Instant::now(), } } + + fn touch(&mut self) { + self.last_accessed = Instant::now(); + } } impl SessionActor { @@ -60,6 +70,7 @@ impl SessionActor { /// Process a single message async fn process_message(&mut self, msg: SessionMessage) { + self.prune_pending_results(); match msg { SessionMessage::DeliverHostResponse { tx, response } => { self.handle_result_delivery(tx, response).await; @@ -94,6 +105,7 @@ impl SessionActor { ); if let Some(result_arc) = self.pending_results.get_mut(&tx).and_then(|pending| { + pending.touch(); if pending.delivered_clients.insert(client_id) { Some(pending.result.clone()) } else { @@ -103,6 +115,7 @@ impl SessionActor { let mut recipients = HashSet::new(); recipients.insert(client_id); self.deliver_result_to_clients(tx, recipients, result_arc); + self.cleanup_transaction_entry(tx, client_id); } } SessionMessage::ClientDisconnect { client_id } => { @@ -211,11 +224,18 @@ impl SessionActor { let mut recipients = HashSet::new(); let result_to_deliver = { + if !self.pending_results.contains_key(&tx) + && self.pending_results.len() >= MAX_PENDING_RESULTS + { + self.enforce_pending_capacity(); + } + let entry = self .pending_results .entry(tx) .or_insert_with(|| PendingResult::new(result.clone())); entry.result = result.clone(); + entry.touch(); if let Some(waiting_clients) = self.client_transactions.remove(&tx) { for client_id in waiting_clients { @@ -282,12 +302,19 @@ impl SessionActor { e ); } else { + if !self.pending_results.contains_key(&tx) + && self.pending_results.len() >= MAX_PENDING_RESULTS + { + self.enforce_pending_capacity(); + } + let entry = self .pending_results .entry(tx) .or_insert_with(|| PendingResult::new(result.clone())); entry.delivered_clients.insert(client_id); entry.result = result.clone(); + entry.touch(); tracing::debug!( "Delivered result for transaction {} to specific client {} with request correlation {}", @@ -319,6 +346,64 @@ impl SessionActor { // Clean up RequestId mappings for this client across all transactions self.client_request_ids.retain(|(_, c), _| *c != client_id); } + + fn prune_pending_results(&mut self) { + if self.pending_results.is_empty() { + return; + } + + let mut active_txs: HashSet = + self.client_transactions.keys().copied().collect(); + active_txs.extend(self.client_request_ids.keys().map(|(tx, _)| *tx)); + + let now = Instant::now(); + let stale: Vec = self + .pending_results + .iter() + .filter_map(|(tx, pending)| { + if active_txs.contains(tx) { + return None; + } + if now.duration_since(pending.last_accessed) > PENDING_RESULT_TTL { + Some(*tx) + } else { + None + } + }) + .collect(); + + for tx in stale { + self.pending_results.remove(&tx); + } + + if self.pending_results.len() >= MAX_PENDING_RESULTS { + self.enforce_pending_capacity(); + } + } + + fn cleanup_transaction_entry(&mut self, tx: Transaction, client_id: ClientId) { + if let Some(waiting_clients) = self.client_transactions.get_mut(&tx) { + waiting_clients.remove(&client_id); + if waiting_clients.is_empty() { + self.client_transactions.remove(&tx); + } + } + } + + fn enforce_pending_capacity(&mut self) { + if self.pending_results.len() < MAX_PENDING_RESULTS { + return; + } + + if let Some(oldest_tx) = self + .pending_results + .iter() + .min_by_key(|(_, pending)| pending.last_accessed) + .map(|(tx, _)| *tx) + { + self.pending_results.remove(&oldest_tx); + } + } } #[cfg(test)] From 10cb7f6ee478c6db8faa837ff43ce9bc2aa24bd6 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Sun, 9 Nov 2025 11:38:03 +0000 Subject: [PATCH 3/3] docs(session): document lazy cache eviction strategy and limitations Add comprehensive documentation explaining the lazy evaluation approach used for cache cleanup in the session actor's pending_results cache: - Module-level documentation describing the eviction strategy - Detailed explanation of lazy evaluation vs preemptive cleanup - Tradeoffs: advantages (simplicity, amortized cost) and limitations (idle memory retention, temporary overflow, burst accumulation) - Future considerations for production monitoring and improvements - Function-level comments for prune_pending_results and enforce_pending_capacity - Constant documentation explaining TTL and capacity limits This addresses the need for future maintainers to understand the design decision to use lazy evaluation and its implications for memory usage during idle periods. Co-authored-by: nacho.d.g --- .../core/src/client_events/session_actor.rs | 84 ++++++++++++++++++- 1 file changed, 83 insertions(+), 1 deletion(-) diff --git a/crates/core/src/client_events/session_actor.rs b/crates/core/src/client_events/session_actor.rs index 4de891016..a5152f341 100644 --- a/crates/core/src/client_events/session_actor.rs +++ b/crates/core/src/client_events/session_actor.rs @@ -2,6 +2,42 @@ //! //! This module provides a simplified session actor that manages client sessions //! and handles efficient 1→N result delivery to multiple clients. +//! +//! # Cache Eviction Strategy +//! +//! The `pending_results` cache uses **lazy evaluation** for cleanup - there is no +//! background task or periodic timer. Eviction happens **only** as a side effect of +//! processing incoming messages. +//! +//! ## How It Works +//! +//! 1. **On every message** (`process_message`): `prune_pending_results()` is called +//! 2. **TTL-based pruning**: Removes entries older than `PENDING_RESULT_TTL` (60s) +//! 3. **Capacity enforcement**: When cache reaches `MAX_PENDING_RESULTS` (2048), +//! uses LRU eviction to remove the oldest entry +//! +//! ## Tradeoffs +//! +//! **Advantages:** +//! - Simpler implementation - no separate task management required +//! - Cleanup cost is amortized across normal message processing +//! - No overhead when actor is idle +//! +//! **Limitations:** +//! - **Idle memory retention**: During idle periods (no incoming messages), stale +//! entries remain in memory indefinitely until the next message arrives +//! - **Temporary overflow**: Cache size can temporarily exceed limits between messages +//! - **Burst accumulation**: After a burst of activity, cache may sit at max capacity +//! until next message triggers pruning +//! - **Memory pressure**: With large `HostResult` payloads, 2048 entries could consume +//! significant memory during idle periods +//! +//! ## Future Considerations +//! +//! If idle memory retention becomes problematic in production: +//! - Add a background tokio task with periodic cleanup (e.g., every 30s) +//! - Implement memory-based limits in addition to count-based limits +//! - Add metrics/monitoring for cache size to detect accumulation patterns use crate::client_events::{ClientId, HostResponse, HostResult, RequestId}; use crate::contract::{ClientResponsesSender, SessionMessage}; @@ -12,7 +48,17 @@ use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tracing::debug; +/// Time-to-live for cached pending results. Entries older than this duration are +/// eligible for removal during pruning (triggered on message processing). +/// +/// Note: Due to lazy evaluation, stale entries may persist beyond TTL during idle periods. const PENDING_RESULT_TTL: Duration = Duration::from_secs(60); + +/// Maximum number of cached pending results. When this limit is reached, LRU eviction +/// removes the oldest entry to make room for new ones. +/// +/// Note: Cache may temporarily exceed this limit between messages since enforcement +/// is lazy (triggered only during message processing). const MAX_PENDING_RESULTS: usize = 2048; /// Simple session actor for client connection refactor @@ -21,6 +67,10 @@ pub struct SessionActor { client_transactions: HashMap>, // Track RequestId correlation for each (Transaction, ClientId) pair client_request_ids: HashMap<(Transaction, ClientId), RequestId>, + /// Cache of pending results for late-arriving subscribers. + /// + /// Uses lazy evaluation for cleanup - entries are pruned only during message processing. + /// See module-level documentation for detailed cache eviction strategy and limitations. pending_results: HashMap, client_responses: ClientResponsesSender, } @@ -68,7 +118,10 @@ impl SessionActor { } } - /// Process a single message + /// Process a single message. + /// + /// Note: This method triggers cache pruning on EVERY message via `prune_pending_results()`. + /// This is the only mechanism for cache cleanup (lazy evaluation - no background task). async fn process_message(&mut self, msg: SessionMessage) { self.prune_pending_results(); match msg { @@ -347,6 +400,23 @@ impl SessionActor { self.client_request_ids.retain(|(_, c), _| *c != client_id); } + /// Prune stale pending results based on TTL and enforce capacity limits. + /// + /// This is the **only** cache cleanup mechanism - there is no background task. + /// Called on every message in `process_message()`. + /// + /// # Cleanup Strategy (Lazy Evaluation) + /// + /// 1. **Skip if empty**: Early return if no cached results + /// 2. **Identify active transactions**: Collect all transactions that still have waiting clients + /// 3. **TTL-based removal**: Remove inactive entries older than `PENDING_RESULT_TTL` + /// 4. **Capacity enforcement**: If still at/over `MAX_PENDING_RESULTS`, trigger LRU eviction + /// + /// # Lazy Evaluation Implications + /// + /// - During idle periods (no messages), stale entries persist in memory + /// - Cache cleanup happens only when actor receives messages + /// - Stale entries may remain beyond TTL until next message arrives fn prune_pending_results(&mut self) { if self.pending_results.is_empty() { return; @@ -390,6 +460,18 @@ impl SessionActor { } } + /// Enforce capacity limits using LRU (Least Recently Used) eviction. + /// + /// Removes the entry with the oldest `last_accessed` timestamp when the cache + /// reaches or exceeds `MAX_PENDING_RESULTS`. + /// + /// # Lazy Evaluation Note + /// + /// This is only called: + /// 1. At the end of `prune_pending_results()` if still at capacity + /// 2. Before inserting new entries when already at capacity + /// + /// Between messages, cache size may temporarily exceed the limit. fn enforce_pending_capacity(&mut self) { if self.pending_results.len() < MAX_PENDING_RESULTS { return;