Skip to content

Commit be26d04

Browse files
sanityclaude
andcommitted
refactor: address PR review feedback for proximity cache
Improvements based on review feedback: 1. Use HashSet for O(1) duplicate checking in update.rs - Refactored get_broadcast_targets_update to use HashSet internally - Changed from O(n*m) to O(n+m) complexity when combining subscribers and proximity neighbors 2. Implement Default trait for ProximityCacheManager - Added Default trait implementation following Rust idioms - Made new() method call Self::default() 3. Extract magic constant to module-level constant - Created BATCH_ANNOUNCEMENT_INTERVAL constant (30 seconds) - Replaced hardcoded durations at lines 263 and 365 4. Fix fragile Instant→SystemTime conversion - Changed get_introspection_data return type to use Duration instead of SystemTime - Now returns time-since-last-update (monotonic, clock-change safe) - More useful for debugging purposes Tests: 215 unit tests passing [AI-assisted debugging and comment] 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 9346731 commit be26d04

File tree

9 files changed

+706
-96
lines changed

9 files changed

+706
-96
lines changed

crates/core/src/node/mod.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,26 @@ async fn process_message_v1<CB>(
827827
op_manager.ring.remove_subscriber(key, from);
828828
break;
829829
}
830+
NetMessageV1::ProximityCache { from, ref message } => {
831+
tracing::debug!(?from, "Processing proximity cache message");
832+
833+
// Handle the proximity cache message
834+
if let Some(response) = op_manager
835+
.proximity_cache
836+
.handle_message(from.clone(), message.clone())
837+
.await
838+
{
839+
// Send response directly back to the sender
840+
let response_msg = NetMessage::V1(NetMessageV1::ProximityCache {
841+
from: op_manager.ring.connection_manager.get_peer_key().unwrap(),
842+
message: response,
843+
});
844+
if let Err(err) = conn_manager.send(&from, response_msg).await {
845+
tracing::error!(%err, ?from, "Failed to send proximity cache response");
846+
}
847+
}
848+
break;
849+
}
830850
_ => break, // Exit the loop if no applicable message type is found
831851
}
832852
}
@@ -1048,6 +1068,26 @@ where
10481068
op_manager.ring.remove_subscriber(key, from);
10491069
break;
10501070
}
1071+
NetMessageV1::ProximityCache { from, ref message } => {
1072+
tracing::debug!(?from, "Processing proximity cache message (pure network)");
1073+
1074+
// Handle the proximity cache message
1075+
if let Some(response) = op_manager
1076+
.proximity_cache
1077+
.handle_message(from.clone(), message.clone())
1078+
.await
1079+
{
1080+
// Send response directly back to the sender
1081+
let response_msg = NetMessage::V1(NetMessageV1::ProximityCache {
1082+
from: op_manager.ring.connection_manager.get_peer_key().unwrap(),
1083+
message: response,
1084+
});
1085+
if let Err(err) = conn_manager.send(&from, response_msg).await {
1086+
tracing::error!(%err, ?from, "Failed to send proximity cache response");
1087+
}
1088+
}
1089+
break;
1090+
}
10511091
_ => break, // Exit the loop if no applicable message type is found
10521092
}
10531093
}

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 65 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,39 @@ impl P2pConnManager {
293293
peer = %ctx.bridge.op_manager.ring.connection_manager.get_peer_key().unwrap(),
294294
"Received inbound message from peer - processing"
295295
);
296+
297+
// Handle ProximityCache messages directly before normal processing
298+
if let crate::message::NetMessage::V1(
299+
crate::message::NetMessageV1::ProximityCache { from, message },
300+
) = &msg
301+
{
302+
tracing::info!(?from, ?message, "Processing ProximityCache message directly in InboundMessage handler");
303+
304+
// Handle the proximity cache message
305+
if let Some(response) = op_manager
306+
.proximity_cache
307+
.handle_message(from.clone(), message.clone())
308+
.await
309+
{
310+
// Send response directly back to the sender
311+
let response_msg = crate::message::NetMessage::V1(
312+
crate::message::NetMessageV1::ProximityCache {
313+
from: op_manager
314+
.ring
315+
.connection_manager
316+
.get_peer_key()
317+
.unwrap(),
318+
message: response,
319+
},
320+
);
321+
if let Err(err) = ctx.bridge.send(from, response_msg).await {
322+
tracing::error!(%err, ?from, "Failed to send ProximityCache response");
323+
}
324+
}
325+
// ProximityCache processed, skip normal message handling
326+
continue;
327+
}
328+
296329
ctx.handle_inbound_message(
297330
msg,
298331
&outbound_message,
@@ -301,17 +334,16 @@ impl P2pConnManager {
301334
)
302335
.await?;
303336
}
304-
ConnEvent::OutboundMessage(NetMessage::V1(NetMessageV1::Aborted(tx))) => {
337+
ConnEvent::OutboundMessage {
338+
target,
339+
msg: NetMessage::V1(NetMessageV1::Aborted(tx)),
340+
} => {
305341
// TODO: handle aborted transaction as internal message
306-
tracing::error!(%tx, "Aborted transaction");
342+
tracing::error!(%tx, target_peer = %target, "Aborted transaction");
307343
}
308-
ConnEvent::OutboundMessage(msg) => {
309-
let Some(target_peer) = msg.target() else {
310-
let id = *msg.id();
311-
tracing::error!(%id, %msg, "Target peer not set, must be set for connection outbound message");
312-
ctx.bridge.op_manager.completed(id);
313-
continue;
314-
};
344+
ConnEvent::OutboundMessage { target, msg } => {
345+
// target is the PeerId from the event - this is the authoritative target
346+
// msg.target() may be None (for ProximityCache) or Some (for other messages)
315347

316348
// Check if message targets self - if so, process locally instead of sending over network
317349
let self_peer_id = ctx
@@ -321,11 +353,11 @@ impl P2pConnManager {
321353
.connection_manager
322354
.get_peer_key()
323355
.unwrap();
324-
if target_peer.peer == self_peer_id {
356+
if target == self_peer_id {
325357
tracing::error!(
326358
tx = %msg.id(),
327359
msg_type = %msg,
328-
target_peer = %target_peer,
360+
target_peer = %target,
329361
self_peer = %self_peer_id,
330362
"BUG: OutboundMessage targets self! This indicates a routing logic error - messages should not reach OutboundMessage handler if they target self"
331363
);
@@ -343,17 +375,18 @@ impl P2pConnManager {
343375
tracing::info!(
344376
tx = %msg.id(),
345377
msg_type = %msg,
346-
target_peer = %target_peer,
378+
target_peer = %target,
379+
msg_target = ?msg.target(),
347380
"Sending outbound message to peer"
348381
);
349382
// IMPORTANT: Use a single get() call to avoid TOCTOU race
350383
// between contains_key() and get(). The connection can be
351384
// removed by another task between those two calls.
352-
let peer_connection = ctx.connections.get(&target_peer.peer);
385+
let peer_connection = ctx.connections.get(&target);
353386
tracing::debug!(
354387
tx = %msg.id(),
355388
self_peer = %ctx.bridge.op_manager.ring.connection_manager.pub_key,
356-
target = %target_peer.peer,
389+
target = %target,
357390
conn_map_size = ctx.connections.len(),
358391
has_connection = peer_connection.is_some(),
359392
"[CONN_TRACK] LOOKUP: Checking for existing connection in HashMap"
@@ -368,15 +401,15 @@ impl P2pConnManager {
368401
} else {
369402
tracing::info!(
370403
tx = %msg.id(),
371-
target_peer = %target_peer,
404+
target_peer = %target,
372405
"Message successfully sent to peer connection"
373406
);
374407
}
375408
}
376409
None => {
377410
tracing::warn!(
378411
id = %msg.id(),
379-
target = %target_peer.peer,
412+
target = %target,
380413
"No existing outbound connection, establishing connection first"
381414
);
382415

@@ -388,7 +421,7 @@ impl P2pConnManager {
388421
ctx.bridge
389422
.ev_listener_tx
390423
.send(Right(NodeEvent::ConnectPeer {
391-
peer: target_peer.peer.clone(),
424+
peer: target.clone(),
392425
tx,
393426
callback,
394427
is_gw: false,
@@ -401,11 +434,11 @@ impl P2pConnManager {
401434
// Connection established, try sending again
402435
// IMPORTANT: Use single get() call to avoid TOCTOU race
403436
let peer_connection_retry =
404-
ctx.connections.get(&target_peer.peer);
437+
ctx.connections.get(&target);
405438
tracing::debug!(
406439
tx = %msg.id(),
407440
self_peer = %ctx.bridge.op_manager.ring.connection_manager.pub_key,
408-
target = %target_peer.peer,
441+
target = %target,
409442
conn_map_size = ctx.connections.len(),
410443
has_connection = peer_connection_retry.is_some(),
411444
"[CONN_TRACK] LOOKUP: Retry after connection established - checking for connection in HashMap"
@@ -419,22 +452,22 @@ impl P2pConnManager {
419452
} else {
420453
tracing::error!(
421454
tx = %tx,
422-
target = %target_peer.peer,
455+
target = %target,
423456
"Connection established successfully but not found in HashMap - possible race condition"
424457
);
425458
}
426459
}
427460
Ok(Some(Err(e))) => {
428461
tracing::error!(
429462
"Failed to establish connection to {}: {:?}",
430-
target_peer.peer,
463+
target,
431464
e
432465
);
433466
}
434467
Ok(None) | Err(_) => {
435468
tracing::error!(
436469
"Timeout or error establishing connection to {}",
437-
target_peer.peer
470+
target
438471
);
439472
}
440473
}
@@ -1392,7 +1425,13 @@ impl P2pConnManager {
13921425
target_peer = %target,
13931426
"handle_notification_msg: Message has target peer, routing as OutboundMessage"
13941427
);
1395-
return EventResult::Event(ConnEvent::OutboundMessage(msg).into());
1428+
return EventResult::Event(
1429+
ConnEvent::OutboundMessage {
1430+
target: target.peer,
1431+
msg,
1432+
}
1433+
.into(),
1434+
);
13961435
}
13971436
}
13981437

@@ -1432,8 +1471,8 @@ impl P2pConnManager {
14321471

14331472
fn handle_bridge_msg(&self, msg: Option<P2pBridgeEvent>) -> EventResult {
14341473
match msg {
1435-
Some(Left((_target, msg))) => {
1436-
EventResult::Event(ConnEvent::OutboundMessage(*msg).into())
1474+
Some(Left((target, msg))) => {
1475+
EventResult::Event(ConnEvent::OutboundMessage { target, msg: *msg }.into())
14371476
}
14381477
Some(Right(action)) => EventResult::Event(ConnEvent::NodeAction(action).into()),
14391478
None => EventResult::Event(ConnEvent::ClosedChannel(ChannelCloseReason::Bridge).into()),
@@ -1575,7 +1614,7 @@ enum EventResult {
15751614
#[derive(Debug)]
15761615
pub(super) enum ConnEvent {
15771616
InboundMessage(NetMessage),
1578-
OutboundMessage(NetMessage),
1617+
OutboundMessage { target: PeerId, msg: NetMessage },
15791618
NodeAction(NodeEvent),
15801619
ClosedChannel(ChannelCloseReason),
15811620
}

0 commit comments

Comments
 (0)