diff --git a/crates/core/src/client_events/mod.rs b/crates/core/src/client_events/mod.rs index c4db9c6a6..894bd1614 100644 --- a/crates/core/src/client_events/mod.rs +++ b/crates/core/src/client_events/mod.rs @@ -1155,27 +1155,17 @@ async fn process_open_request( tracing::debug!( peer_id = %peer_id, key = %key, - "Starting direct SUBSCRIBE operation (legacy mode)", + "Starting direct SUBSCRIBE operation", ); - // Legacy mode: direct operation without deduplication - let op_id = - crate::node::subscribe(op_manager.clone(), key, Some(client_id)) - .await - .inspect_err(|err| { - tracing::error!("Subscribe error: {}", err); - })?; - - tracing::debug!( - request_id = %request_id, - transaction_id = %op_id, - operation = "subscribe", - "Request-Transaction correlation" - ); + // Generate transaction, register first, then run op + let tx = crate::message::Transaction::new::< + crate::operations::subscribe::SubscribeMsg, + >(); op_manager .ch_outbound - .waiting_for_transaction_result(op_id, client_id, request_id) + .waiting_for_transaction_result(tx, client_id, request_id) .await .inspect_err(|err| { tracing::error!( @@ -1183,6 +1173,19 @@ async fn process_open_request( err ); })?; + + crate::node::subscribe_with_id(op_manager.clone(), key, None, Some(tx)) + .await + .inspect_err(|err| { + tracing::error!("Subscribe error: {}", err); + })?; + + tracing::debug!( + request_id = %request_id, + transaction_id = %tx, + operation = "subscribe", + "Request-Transaction correlation" + ); } } _ => { diff --git a/crates/core/src/client_events/session_actor.rs b/crates/core/src/client_events/session_actor.rs index 2f0825b4a..a5152f341 100644 --- a/crates/core/src/client_events/session_actor.rs +++ b/crates/core/src/client_events/session_actor.rs @@ -2,23 +2,100 @@ //! //! This module provides a simplified session actor that manages client sessions //! and handles efficient 1→N result delivery to multiple clients. +//! +//! # Cache Eviction Strategy +//! +//! The `pending_results` cache uses **lazy evaluation** for cleanup - there is no +//! background task or periodic timer. Eviction happens **only** as a side effect of +//! processing incoming messages. +//! +//! ## How It Works +//! +//! 1. **On every message** (`process_message`): `prune_pending_results()` is called +//! 2. **TTL-based pruning**: Removes entries older than `PENDING_RESULT_TTL` (60s) +//! 3. **Capacity enforcement**: When cache reaches `MAX_PENDING_RESULTS` (2048), +//! uses LRU eviction to remove the oldest entry +//! +//! ## Tradeoffs +//! +//! **Advantages:** +//! - Simpler implementation - no separate task management required +//! - Cleanup cost is amortized across normal message processing +//! - No overhead when actor is idle +//! +//! **Limitations:** +//! - **Idle memory retention**: During idle periods (no incoming messages), stale +//! entries remain in memory indefinitely until the next message arrives +//! - **Temporary overflow**: Cache size can temporarily exceed limits between messages +//! - **Burst accumulation**: After a burst of activity, cache may sit at max capacity +//! until next message triggers pruning +//! - **Memory pressure**: With large `HostResult` payloads, 2048 entries could consume +//! significant memory during idle periods +//! +//! ## Future Considerations +//! +//! If idle memory retention becomes problematic in production: +//! - Add a background tokio task with periodic cleanup (e.g., every 30s) +//! - Implement memory-based limits in addition to count-based limits +//! - Add metrics/monitoring for cache size to detect accumulation patterns -use crate::client_events::{ClientId, HostResult, RequestId}; +use crate::client_events::{ClientId, HostResponse, HostResult, RequestId}; use crate::contract::{ClientResponsesSender, SessionMessage}; use crate::message::Transaction; +use freenet_stdlib::client_api::ContractResponse; use std::collections::{HashMap, HashSet}; +use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tracing::debug; +/// Time-to-live for cached pending results. Entries older than this duration are +/// eligible for removal during pruning (triggered on message processing). +/// +/// Note: Due to lazy evaluation, stale entries may persist beyond TTL during idle periods. +const PENDING_RESULT_TTL: Duration = Duration::from_secs(60); + +/// Maximum number of cached pending results. When this limit is reached, LRU eviction +/// removes the oldest entry to make room for new ones. +/// +/// Note: Cache may temporarily exceed this limit between messages since enforcement +/// is lazy (triggered only during message processing). +const MAX_PENDING_RESULTS: usize = 2048; + /// Simple session actor for client connection refactor pub struct SessionActor { message_rx: mpsc::Receiver, client_transactions: HashMap>, // Track RequestId correlation for each (Transaction, ClientId) pair client_request_ids: HashMap<(Transaction, ClientId), RequestId>, + /// Cache of pending results for late-arriving subscribers. + /// + /// Uses lazy evaluation for cleanup - entries are pruned only during message processing. + /// See module-level documentation for detailed cache eviction strategy and limitations. + pending_results: HashMap, client_responses: ClientResponsesSender, } +#[derive(Clone)] +struct PendingResult { + result: std::sync::Arc, + delivered_clients: HashSet, + last_accessed: Instant, +} + +impl PendingResult { + fn new(result: std::sync::Arc) -> Self { + Self { + result, + delivered_clients: HashSet::new(), + last_accessed: Instant::now(), + } + } + + fn touch(&mut self) { + self.last_accessed = Instant::now(); + } +} + impl SessionActor { /// Create a new session actor pub fn new( @@ -29,6 +106,7 @@ impl SessionActor { message_rx, client_transactions: HashMap::new(), client_request_ids: HashMap::new(), + pending_results: HashMap::new(), client_responses, } } @@ -40,8 +118,12 @@ impl SessionActor { } } - /// Process a single message + /// Process a single message. + /// + /// Note: This method triggers cache pruning on EVERY message via `prune_pending_results()`. + /// This is the only mechanism for cache cleanup (lazy evaluation - no background task). async fn process_message(&mut self, msg: SessionMessage) { + self.prune_pending_results(); match msg { SessionMessage::DeliverHostResponse { tx, response } => { self.handle_result_delivery(tx, response).await; @@ -74,6 +156,20 @@ impl SessionActor { request_id, self.client_transactions.get(&tx).map_or(0, |s| s.len()) ); + + if let Some(result_arc) = self.pending_results.get_mut(&tx).and_then(|pending| { + pending.touch(); + if pending.delivered_clients.insert(client_id) { + Some(pending.result.clone()) + } else { + None + } + }) { + let mut recipients = HashSet::new(); + recipients.insert(client_id); + self.deliver_result_to_clients(tx, recipients, result_arc); + self.cleanup_transaction_entry(tx, client_id); + } } SessionMessage::ClientDisconnect { client_id } => { self.cleanup_client_transactions(client_id); @@ -96,62 +192,122 @@ impl SessionActor { } } - /// CORE: 1→N Result Delivery with RequestId correlation - /// Optimized with Arc to minimize cloning overhead in 1→N delivery - async fn handle_result_delivery( + fn deliver_result_to_clients( &mut self, tx: Transaction, - result: std::sync::Arc, + waiting_clients: HashSet, + result: std::sync::Arc, ) { - tracing::info!("Session actor attempting to deliver result for transaction {}, registered transactions: {}", tx, self.client_transactions.len()); - if let Some(waiting_clients) = self.client_transactions.remove(&tx) { - let client_count = waiting_clients.len(); + let client_count = waiting_clients.len(); + tracing::info!( + "Delivering result for transaction {} to {} clients", + tx, + client_count + ); + + if let Ok(HostResponse::ContractResponse(ContractResponse::GetResponse { + key, + state, + .. + })) = result.as_ref() + { tracing::info!( - "Delivering result for transaction {} to {} clients", - tx, - client_count + "Contract GET response ready for delivery: contract={} bytes={}", + key, + state.as_ref().len() ); + } - // Optimized 1→N delivery with RequestId correlation - for client_id in waiting_clients { - // Look up the RequestId for this (transaction, client) pair - let request_id = - self.client_request_ids - .remove(&(tx, client_id)) - .unwrap_or_else(|| { - tracing::warn!( - "No RequestId found for transaction {} and client {}, using default", - tx, client_id - ); - RequestId::new() - }); - - if let Err(e) = - self.client_responses - .send((client_id, request_id, (*result).clone())) - { + // Optimized 1→N delivery with RequestId correlation + for client_id in waiting_clients { + // Look up the RequestId for this (transaction, client) pair + let request_id = self + .client_request_ids + .remove(&(tx, client_id)) + .unwrap_or_else(|| { tracing::warn!( - "Failed to deliver result to client {} (request {}): {}", - client_id, - request_id, - e - ); - } else { - tracing::debug!( - "Delivered result for transaction {} to client {} with request correlation {}", - tx, client_id, request_id + "No RequestId found for transaction {} and client {}, using default", + tx, + client_id ); - } - } + RequestId::new() + }); - if client_count > 1 { + if let Err(e) = self + .client_responses + .send((client_id, request_id, (*result).clone())) + { + tracing::warn!( + "Failed to deliver result to client {} (request {}): {}", + client_id, + request_id, + e + ); + } else { tracing::debug!( - "Successfully delivered result for transaction {} to {} clients via optimized 1→N fanout with RequestId correlation", - tx, client_count + "Delivered result for transaction {} to client {} with request correlation {}", + tx, + client_id, + request_id ); } + } + + if client_count > 1 { + tracing::debug!( + "Successfully delivered result for transaction {} to {} clients via optimized 1→N fanout with RequestId correlation", + tx, + client_count + ); + } + } + + /// CORE: 1→N Result Delivery with RequestId correlation + /// Optimized with Arc to minimize cloning overhead in 1→N delivery + async fn handle_result_delivery( + &mut self, + tx: Transaction, + result: std::sync::Arc, + ) { + tracing::info!( + "Session actor attempting to deliver result for transaction {}, registered transactions: {}", + tx, + self.client_transactions.len() + ); + + let mut recipients = HashSet::new(); + let result_to_deliver = { + if !self.pending_results.contains_key(&tx) + && self.pending_results.len() >= MAX_PENDING_RESULTS + { + self.enforce_pending_capacity(); + } + + let entry = self + .pending_results + .entry(tx) + .or_insert_with(|| PendingResult::new(result.clone())); + entry.result = result.clone(); + entry.touch(); + + if let Some(waiting_clients) = self.client_transactions.remove(&tx) { + for client_id in waiting_clients { + if entry.delivered_clients.insert(client_id) { + recipients.insert(client_id); + } + } + } + + entry.result.clone() + }; + + if !recipients.is_empty() { + self.deliver_result_to_clients(tx, recipients, result_to_deliver); } else { - tracing::debug!("No clients waiting for transaction result: {}", tx); + tracing::debug!( + "No clients waiting for transaction result: {}, caching response for deferred delivery", + tx + ); } } @@ -199,6 +355,20 @@ impl SessionActor { e ); } else { + if !self.pending_results.contains_key(&tx) + && self.pending_results.len() >= MAX_PENDING_RESULTS + { + self.enforce_pending_capacity(); + } + + let entry = self + .pending_results + .entry(tx) + .or_insert_with(|| PendingResult::new(result.clone())); + entry.delivered_clients.insert(client_id); + entry.result = result.clone(); + entry.touch(); + tracing::debug!( "Delivered result for transaction {} to specific client {} with request correlation {}", tx, client_id, request_id @@ -229,6 +399,93 @@ impl SessionActor { // Clean up RequestId mappings for this client across all transactions self.client_request_ids.retain(|(_, c), _| *c != client_id); } + + /// Prune stale pending results based on TTL and enforce capacity limits. + /// + /// This is the **only** cache cleanup mechanism - there is no background task. + /// Called on every message in `process_message()`. + /// + /// # Cleanup Strategy (Lazy Evaluation) + /// + /// 1. **Skip if empty**: Early return if no cached results + /// 2. **Identify active transactions**: Collect all transactions that still have waiting clients + /// 3. **TTL-based removal**: Remove inactive entries older than `PENDING_RESULT_TTL` + /// 4. **Capacity enforcement**: If still at/over `MAX_PENDING_RESULTS`, trigger LRU eviction + /// + /// # Lazy Evaluation Implications + /// + /// - During idle periods (no messages), stale entries persist in memory + /// - Cache cleanup happens only when actor receives messages + /// - Stale entries may remain beyond TTL until next message arrives + fn prune_pending_results(&mut self) { + if self.pending_results.is_empty() { + return; + } + + let mut active_txs: HashSet = + self.client_transactions.keys().copied().collect(); + active_txs.extend(self.client_request_ids.keys().map(|(tx, _)| *tx)); + + let now = Instant::now(); + let stale: Vec = self + .pending_results + .iter() + .filter_map(|(tx, pending)| { + if active_txs.contains(tx) { + return None; + } + if now.duration_since(pending.last_accessed) > PENDING_RESULT_TTL { + Some(*tx) + } else { + None + } + }) + .collect(); + + for tx in stale { + self.pending_results.remove(&tx); + } + + if self.pending_results.len() >= MAX_PENDING_RESULTS { + self.enforce_pending_capacity(); + } + } + + fn cleanup_transaction_entry(&mut self, tx: Transaction, client_id: ClientId) { + if let Some(waiting_clients) = self.client_transactions.get_mut(&tx) { + waiting_clients.remove(&client_id); + if waiting_clients.is_empty() { + self.client_transactions.remove(&tx); + } + } + } + + /// Enforce capacity limits using LRU (Least Recently Used) eviction. + /// + /// Removes the entry with the oldest `last_accessed` timestamp when the cache + /// reaches or exceeds `MAX_PENDING_RESULTS`. + /// + /// # Lazy Evaluation Note + /// + /// This is only called: + /// 1. At the end of `prune_pending_results()` if still at capacity + /// 2. Before inserting new entries when already at capacity + /// + /// Between messages, cache size may temporarily exceed the limit. + fn enforce_pending_capacity(&mut self) { + if self.pending_results.len() < MAX_PENDING_RESULTS { + return; + } + + if let Some(oldest_tx) = self + .pending_results + .iter() + .min_by_key(|(_, pending)| pending.last_accessed) + .map(|(tx, _)| *tx) + { + self.pending_results.remove(&oldest_tx); + } + } } #[cfg(test)] @@ -358,6 +615,172 @@ mod tests { actor_handle.await.unwrap(); } + #[tokio::test] + async fn test_pending_result_reaches_late_registered_clients() { + use crate::contract::client_responses_channel; + use crate::operations::subscribe::SubscribeMsg; + use freenet_stdlib::client_api::{ContractResponse, HostResponse}; + use freenet_stdlib::prelude::{ContractInstanceId, ContractKey}; + + let (session_tx, session_rx) = mpsc::channel(100); + let (mut client_responses_rx, client_responses_tx) = client_responses_channel(); + let actor = SessionActor::new(session_rx, client_responses_tx); + + let actor_handle = tokio::spawn(async move { + actor.run().await; + }); + + let tx = Transaction::new::(); + let contract_key = ContractKey::from(ContractInstanceId::new([7u8; 32])); + let host_result = Ok(HostResponse::ContractResponse( + ContractResponse::SubscribeResponse { + key: contract_key, + subscribed: true, + }, + )); + + // Deliver result before any clients register; this models LocalSubscribeComplete firing + // before the session actor processes the pending subscription registration. + session_tx + .send(SessionMessage::DeliverHostResponse { + tx, + response: std::sync::Arc::new(host_result.clone()), + }) + .await + .unwrap(); + + // First client registers and should receive the cached result. + let client_one = ClientId::FIRST; + let request_one = RequestId::new(); + session_tx + .send(SessionMessage::RegisterTransaction { + tx, + client_id: client_one, + request_id: request_one, + }) + .await + .unwrap(); + + let (delivered_client_one, delivered_request_one, delivered_result_one) = + tokio::time::timeout( + tokio::time::Duration::from_millis(200), + client_responses_rx.recv(), + ) + .await + .expect("session actor failed to deliver cached result to first client") + .expect("client response channel closed unexpectedly"); + assert_eq!(delivered_client_one, client_one); + assert_eq!(delivered_request_one, request_one); + match delivered_result_one { + Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse { + key, + subscribed, + })) => { + assert_eq!(key, contract_key); + assert!(subscribed); + } + other => panic!("unexpected result delivered to first client: {:?}", other), + } + + // Second client registers later; we expect the cached result to still be available. + let client_two = ClientId::next(); + let request_two = RequestId::new(); + session_tx + .send(SessionMessage::RegisterTransaction { + tx, + client_id: client_two, + request_id: request_two, + }) + .await + .unwrap(); + + let (delivered_client_two, delivered_request_two, delivered_result_two) = + tokio::time::timeout( + tokio::time::Duration::from_millis(200), + client_responses_rx.recv(), + ) + .await + .expect("pending result was not delivered to late-registered client") + .expect("client response channel closed unexpectedly for late registrant"); + assert_eq!(delivered_client_two, client_two); + assert_eq!(delivered_request_two, request_two); + match delivered_result_two { + Ok(HostResponse::ContractResponse(ContractResponse::SubscribeResponse { + key, + subscribed, + })) => { + assert_eq!(key, contract_key); + assert!(subscribed); + } + other => panic!( + "unexpected result delivered to late-registered client: {:?}", + other + ), + } + + actor_handle.abort(); + } + + #[tokio::test] + async fn test_pending_result_delivered_after_registration() { + use crate::contract::client_responses_channel; + + let (session_tx, session_rx) = mpsc::channel(100); + let (mut client_responses_rx, client_responses_tx) = client_responses_channel(); + let actor = SessionActor::new(session_rx, client_responses_tx); + + let actor_handle = tokio::spawn(async move { + actor.run().await; + }); + + let tx = Transaction::new::(); + let client_id = ClientId::FIRST; + let request_id = RequestId::new(); + let host_result = Arc::new(Ok(HostResponse::Ok)); + + session_tx + .send(SessionMessage::DeliverHostResponse { + tx, + response: host_result.clone(), + }) + .await + .unwrap(); + + // Ensure the actor processes the pending result before registration. + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + + session_tx + .send(SessionMessage::RegisterTransaction { + tx, + client_id, + request_id, + }) + .await + .unwrap(); + + let delivered = tokio::time::timeout( + tokio::time::Duration::from_millis(200), + client_responses_rx.recv(), + ) + .await + .expect("Timed out waiting for pending result delivery") + .expect("Client response channel closed unexpectedly"); + + let (returned_client, returned_request, returned_result) = delivered; + assert_eq!(returned_client, client_id); + assert_eq!(returned_request, request_id); + match returned_result { + Ok(HostResponse::Ok) => {} + other => panic!( + "Unexpected result delivered. got={:?}, expected=Ok(HostResponse::Ok)", + other + ), + } + + drop(session_tx); + actor_handle.await.unwrap(); + } + #[tokio::test] async fn test_session_actor_client_disconnect_cleanup() { use crate::contract::client_responses_channel; diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index 4acdba4a6..699f59c73 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -343,6 +343,33 @@ impl ContractHandlerChannel { Ok(()) } + + pub async fn waiting_for_subscription_result( + &self, + tx: Transaction, + contract_key: ContractInstanceId, + client_id: ClientId, + request_id: RequestId, + ) -> Result<(), ContractError> { + self.end + .wait_for_res_tx + .send((client_id, WaitingTransaction::Subscription { contract_key })) + .await + .map_err(|_| ContractError::NoEvHandlerResponse)?; + + if let Some(session_tx) = &self.session_adapter_tx { + let msg = SessionMessage::RegisterTransaction { + tx, + client_id, + request_id, + }; + if let Err(e) = session_tx.try_send(msg) { + tracing::warn!("Failed to notify session actor: {}", e); + } + } + + Ok(()) + } } impl ContractHandlerChannel { diff --git a/crates/core/src/node/mod.rs b/crates/core/src/node/mod.rs index f3a4b165a..c50ac8be1 100644 --- a/crates/core/src/node/mod.rs +++ b/crates/core/src/node/mod.rs @@ -37,10 +37,7 @@ use self::p2p_impl::NodeP2P; use crate::{ client_events::{BoxedClient, ClientEventsProxy, ClientId, OpenRequest}, config::{Address, GatewayConfig, WebsocketApiConfig}, - contract::{ - Callback, ExecutorError, ExecutorToEventLoopChannel, NetworkContractHandler, - WaitingTransaction, - }, + contract::{Callback, ExecutorError, ExecutorToEventLoopChannel, NetworkContractHandler}, local_node::Executor, message::{InnerMessage, NetMessage, Transaction, TransactionType}, operations::{ @@ -1105,6 +1102,7 @@ async fn handle_pure_network_result( } /// Attempts to subscribe to a contract +#[allow(dead_code)] pub async fn subscribe( op_manager: Arc, key: ContractKey, @@ -1131,13 +1129,7 @@ pub async fn subscribe_with_id( let request_id = RequestId::new(); let _ = op_manager .ch_outbound - .waiting_for_transaction_result( - WaitingTransaction::Subscription { - contract_key: *key.id(), - }, - client_id, - request_id, - ) + .waiting_for_subscription_result(id, *key.id(), client_id, request_id) .await; } // Initialize a subscribe op.