Skip to content

Commit 78c4c96

Browse files
sanityclaude
andcommitted
refactor: address PR review feedback for proximity cache
Improvements based on review feedback: 1. Use HashSet for O(1) duplicate checking in update.rs - Refactored get_broadcast_targets_update to use HashSet internally - Changed from O(n*m) to O(n+m) complexity when combining subscribers and proximity neighbors 2. Implement Default trait for ProximityCacheManager - Added Default trait implementation following Rust idioms - Made new() method call Self::default() 3. Extract magic constant to module-level constant - Created BATCH_ANNOUNCEMENT_INTERVAL constant (30 seconds) - Replaced hardcoded durations at lines 263 and 365 4. Fix fragile Instant→SystemTime conversion - Changed get_introspection_data return type to use Duration instead of SystemTime - Now returns time-since-last-update (monotonic, clock-change safe) - More useful for debugging purposes Tests: 215 unit tests passing [AI-assisted debugging and comment] 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 8ce1edf commit 78c4c96

File tree

2 files changed

+39
-20
lines changed

2 files changed

+39
-20
lines changed

crates/core/src/node/proximity_cache.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ use tracing::{debug, info, trace};
1010

1111
use super::PeerId;
1212

13+
/// Batch announcement interval - how often to send batched removal announcements
14+
const BATCH_ANNOUNCEMENT_INTERVAL: Duration = Duration::from_secs(30);
15+
1316
/// Proximity cache manager - tracks what contracts this node and its neighbors are caching
1417
pub struct ProximityCacheManager {
1518
/// Contracts we are caching locally
@@ -66,9 +69,8 @@ pub enum ProximityCacheMessage {
6669
CacheStateResponse { contracts: Vec<ContractInstanceId> },
6770
}
6871

69-
#[allow(dead_code)] // Some methods reserved for future use (stats, introspection, lifecycle management)
70-
impl ProximityCacheManager {
71-
pub fn new() -> Self {
72+
impl Default for ProximityCacheManager {
73+
fn default() -> Self {
7274
Self {
7375
my_cache: Arc::new(RwLock::new(HashSet::new())),
7476
neighbor_caches: Arc::new(DashMap::new()),
@@ -77,6 +79,13 @@ impl ProximityCacheManager {
7779
pending_removals: Arc::new(RwLock::new(HashSet::new())),
7880
}
7981
}
82+
}
83+
84+
#[allow(dead_code)] // Some methods reserved for future use (stats, introspection, lifecycle management)
85+
impl ProximityCacheManager {
86+
pub fn new() -> Self {
87+
Self::default()
88+
}
8089

8190
/// Called when we cache a new contract (PUT or successful GET)
8291
pub async fn on_contract_cached(
@@ -251,7 +260,7 @@ impl ProximityCacheManager {
251260
let mut last_announce = self.last_batch_announce.write().await;
252261

253262
// Only send batch announcements every 30 seconds
254-
if last_announce.elapsed() < Duration::from_secs(30) {
263+
if last_announce.elapsed() < BATCH_ANNOUNCEMENT_INTERVAL {
255264
return None;
256265
}
257266

@@ -289,24 +298,25 @@ impl ProximityCacheManager {
289298
}
290299

291300
/// Get introspection data for debugging
301+
/// Returns (my_cache, neighbor_data) where neighbor_data maps peer IDs to
302+
/// (cached contracts, time since last update)
292303
pub async fn get_introspection_data(
293304
&self,
294305
) -> (
295306
Vec<ContractInstanceId>,
296-
HashMap<String, (Vec<ContractInstanceId>, std::time::SystemTime)>,
307+
HashMap<String, (Vec<ContractInstanceId>, Duration)>,
297308
) {
298309
let my_cache = self.my_cache.read().await.iter().cloned().collect();
299-
let now = std::time::SystemTime::now();
300310

301311
let mut neighbor_data = HashMap::new();
302312
for entry in self.neighbor_caches.iter() {
303-
// Calculate SystemTime from Instant by subtracting elapsed time from now
304-
let last_update_system_time = now - entry.value().last_update.elapsed();
313+
// Use elapsed time since last update (monotonic, not affected by system clock changes)
314+
let time_since_update = entry.value().last_update.elapsed();
305315
neighbor_data.insert(
306316
entry.key().to_string(), // Convert PeerId to String for introspection
307317
(
308318
entry.value().contracts.iter().cloned().collect(),
309-
last_update_system_time,
319+
time_since_update,
310320
),
311321
);
312322
}
@@ -353,7 +363,7 @@ impl ProximityCacheManager {
353363
use crate::config::GlobalExecutor;
354364

355365
GlobalExecutor::spawn(async move {
356-
let mut interval = tokio::time::interval(Duration::from_secs(30));
366+
let mut interval = tokio::time::interval(BATCH_ANNOUNCEMENT_INTERVAL);
357367
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
358368

359369
info!("PROXIMITY_PROPAGATION: Periodic batch announcement task started");

crates/core/src/operations/update.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -715,32 +715,41 @@ impl OpManager {
715715
key: &ContractKey,
716716
sender: &PeerId,
717717
) -> Vec<PeerKeyLocation> {
718-
let subscribers = self
718+
use std::collections::HashSet;
719+
720+
// Collect subscribers into HashSet for O(1) duplicate checking
721+
let subscriber_peers: HashSet<PeerId> = self
719722
.ring
720723
.subscribers_of(key)
721724
.map(|subs| {
722725
subs.value()
723726
.iter()
724727
.filter(|pk| &pk.peer != sender)
725-
.cloned()
726-
.collect::<Vec<_>>()
728+
.map(|pk| pk.peer.clone())
729+
.collect()
727730
})
728731
.unwrap_or_default();
729732

730733
// Get neighbors who have this contract cached (proximity-based targeting)
731734
let interested_neighbors = self.proximity_cache.neighbors_with_contract(key);
732735

733-
// Combine subscribers and interested neighbors, removing duplicates and sender
734-
let mut targets = subscribers;
736+
// Combine subscribers and interested neighbors using HashSet for deduplication
737+
let mut all_peers: HashSet<PeerId> = subscriber_peers;
735738
for neighbor in interested_neighbors {
736-
if &neighbor != sender && !targets.iter().any(|t| t.peer == neighbor) {
737-
targets.push(PeerKeyLocation {
738-
peer: neighbor,
739-
location: None,
740-
});
739+
if &neighbor != sender {
740+
all_peers.insert(neighbor);
741741
}
742742
}
743743

744+
// Convert to Vec<PeerKeyLocation> for return
745+
let targets: Vec<PeerKeyLocation> = all_peers
746+
.into_iter()
747+
.map(|peer| PeerKeyLocation {
748+
peer,
749+
location: None,
750+
})
751+
.collect();
752+
744753
// Trace update propagation for debugging
745754
if !targets.is_empty() {
746755
tracing::info!(

0 commit comments

Comments
 (0)