Skip to content

Commit 53044c3

Browse files
authored
Merge branch 'stack/update-fallback' into stack/subscription-routing
2 parents de0aaa9 + 3070c4a commit 53044c3

File tree

1 file changed

+85
-0
lines changed

1 file changed

+85
-0
lines changed

crates/core/src/client_events/session_actor.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,13 @@ use crate::contract::{ClientResponsesSender, SessionMessage};
88
use crate::message::Transaction;
99
use freenet_stdlib::client_api::ContractResponse;
1010
use std::collections::{HashMap, HashSet};
11+
use std::time::{Duration, Instant};
1112
use tokio::sync::mpsc;
1213
use tracing::debug;
1314

15+
const PENDING_RESULT_TTL: Duration = Duration::from_secs(60);
16+
const MAX_PENDING_RESULTS: usize = 2048;
17+
1418
/// Simple session actor for client connection refactor
1519
pub struct SessionActor {
1620
message_rx: mpsc::Receiver<SessionMessage>,
@@ -25,15 +29,21 @@ pub struct SessionActor {
2529
struct PendingResult {
2630
result: std::sync::Arc<HostResult>,
2731
delivered_clients: HashSet<ClientId>,
32+
last_accessed: Instant,
2833
}
2934

3035
impl PendingResult {
3136
fn new(result: std::sync::Arc<HostResult>) -> Self {
3237
Self {
3338
result,
3439
delivered_clients: HashSet::new(),
40+
last_accessed: Instant::now(),
3541
}
3642
}
43+
44+
fn touch(&mut self) {
45+
self.last_accessed = Instant::now();
46+
}
3747
}
3848

3949
impl SessionActor {
@@ -60,6 +70,7 @@ impl SessionActor {
6070

6171
/// Process a single message
6272
async fn process_message(&mut self, msg: SessionMessage) {
73+
self.prune_pending_results();
6374
match msg {
6475
SessionMessage::DeliverHostResponse { tx, response } => {
6576
self.handle_result_delivery(tx, response).await;
@@ -94,6 +105,7 @@ impl SessionActor {
94105
);
95106

96107
if let Some(result_arc) = self.pending_results.get_mut(&tx).and_then(|pending| {
108+
pending.touch();
97109
if pending.delivered_clients.insert(client_id) {
98110
Some(pending.result.clone())
99111
} else {
@@ -103,6 +115,7 @@ impl SessionActor {
103115
let mut recipients = HashSet::new();
104116
recipients.insert(client_id);
105117
self.deliver_result_to_clients(tx, recipients, result_arc);
118+
self.cleanup_transaction_entry(tx, client_id);
106119
}
107120
}
108121
SessionMessage::ClientDisconnect { client_id } => {
@@ -211,11 +224,18 @@ impl SessionActor {
211224

212225
let mut recipients = HashSet::new();
213226
let result_to_deliver = {
227+
if !self.pending_results.contains_key(&tx)
228+
&& self.pending_results.len() >= MAX_PENDING_RESULTS
229+
{
230+
self.enforce_pending_capacity();
231+
}
232+
214233
let entry = self
215234
.pending_results
216235
.entry(tx)
217236
.or_insert_with(|| PendingResult::new(result.clone()));
218237
entry.result = result.clone();
238+
entry.touch();
219239

220240
if let Some(waiting_clients) = self.client_transactions.remove(&tx) {
221241
for client_id in waiting_clients {
@@ -282,12 +302,19 @@ impl SessionActor {
282302
e
283303
);
284304
} else {
305+
if !self.pending_results.contains_key(&tx)
306+
&& self.pending_results.len() >= MAX_PENDING_RESULTS
307+
{
308+
self.enforce_pending_capacity();
309+
}
310+
285311
let entry = self
286312
.pending_results
287313
.entry(tx)
288314
.or_insert_with(|| PendingResult::new(result.clone()));
289315
entry.delivered_clients.insert(client_id);
290316
entry.result = result.clone();
317+
entry.touch();
291318

292319
tracing::debug!(
293320
"Delivered result for transaction {} to specific client {} with request correlation {}",
@@ -319,6 +346,64 @@ impl SessionActor {
319346
// Clean up RequestId mappings for this client across all transactions
320347
self.client_request_ids.retain(|(_, c), _| *c != client_id);
321348
}
349+
350+
fn prune_pending_results(&mut self) {
351+
if self.pending_results.is_empty() {
352+
return;
353+
}
354+
355+
let mut active_txs: HashSet<Transaction> =
356+
self.client_transactions.keys().copied().collect();
357+
active_txs.extend(self.client_request_ids.keys().map(|(tx, _)| *tx));
358+
359+
let now = Instant::now();
360+
let stale: Vec<Transaction> = self
361+
.pending_results
362+
.iter()
363+
.filter_map(|(tx, pending)| {
364+
if active_txs.contains(tx) {
365+
return None;
366+
}
367+
if now.duration_since(pending.last_accessed) > PENDING_RESULT_TTL {
368+
Some(*tx)
369+
} else {
370+
None
371+
}
372+
})
373+
.collect();
374+
375+
for tx in stale {
376+
self.pending_results.remove(&tx);
377+
}
378+
379+
if self.pending_results.len() >= MAX_PENDING_RESULTS {
380+
self.enforce_pending_capacity();
381+
}
382+
}
383+
384+
fn cleanup_transaction_entry(&mut self, tx: Transaction, client_id: ClientId) {
385+
if let Some(waiting_clients) = self.client_transactions.get_mut(&tx) {
386+
waiting_clients.remove(&client_id);
387+
if waiting_clients.is_empty() {
388+
self.client_transactions.remove(&tx);
389+
}
390+
}
391+
}
392+
393+
fn enforce_pending_capacity(&mut self) {
394+
if self.pending_results.len() < MAX_PENDING_RESULTS {
395+
return;
396+
}
397+
398+
if let Some(oldest_tx) = self
399+
.pending_results
400+
.iter()
401+
.min_by_key(|(_, pending)| pending.last_accessed)
402+
.map(|(tx, _)| *tx)
403+
{
404+
self.pending_results.remove(&oldest_tx);
405+
}
406+
}
322407
}
323408

324409
#[cfg(test)]

0 commit comments

Comments
 (0)