Skip to content

Commit 9346731

Browse files
sanityclaude
andcommitted
feat: add proximity-based update forwarding
Implements proximity cache to track which neighbors have cached contracts. UPDATE operations now forward to neighbors who have the contract cached, not just explicit subscribers. This reduces update propagation failures in the network. Key changes: - New ProximityCacheManager tracks neighbor cache states - Immediate cache addition announcements - Batched cache removal announcements to reduce network traffic - UPDATE operation combines subscribers with proximity-based neighbors - PUT/GET operations announce cache additions after seeding Addresses #1848 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 538d0ef commit 9346731

File tree

9 files changed

+677
-6
lines changed

9 files changed

+677
-6
lines changed

crates/core/src/message.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010

1111
use crate::{
1212
client_events::{ClientId, HostResult},
13-
node::PeerId,
13+
node::{proximity_cache::ProximityCacheMessage, PeerId},
1414
operations::{
1515
connect::ConnectMsg, get::GetMsg, put::PutMsg, subscribe::SubscribeMsg, update::UpdateMsg,
1616
},
@@ -255,6 +255,10 @@ pub(crate) enum NetMessageV1 {
255255
},
256256
Update(UpdateMsg),
257257
Aborted(Transaction),
258+
ProximityCache {
259+
from: PeerId,
260+
message: ProximityCacheMessage,
261+
},
258262
}
259263

260264
trait Versioned {
@@ -279,6 +283,7 @@ impl Versioned for NetMessageV1 {
279283
NetMessageV1::Unsubscribed { .. } => semver::Version::new(1, 0, 0),
280284
NetMessageV1::Update(_) => semver::Version::new(1, 0, 0),
281285
NetMessageV1::Aborted(_) => semver::Version::new(1, 0, 0),
286+
NetMessageV1::ProximityCache { .. } => semver::Version::new(1, 0, 0),
282287
}
283288
}
284289
}
@@ -339,6 +344,11 @@ pub(crate) enum NodeEvent {
339344
target: PeerId,
340345
msg: Box<NetMessage>,
341346
},
347+
#[allow(dead_code)] // Reserved for future proximity cache broadcasting
348+
BroadcastProximityCache {
349+
from: PeerId,
350+
message: crate::node::proximity_cache::ProximityCacheMessage,
351+
},
342352
}
343353

344354
#[derive(Debug, Clone)]
@@ -418,6 +428,12 @@ impl Display for NodeEvent {
418428
NodeEvent::SendMessage { target, msg } => {
419429
write!(f, "SendMessage (to {target}, tx: {})", msg.id())
420430
}
431+
NodeEvent::BroadcastProximityCache { from, message } => {
432+
write!(
433+
f,
434+
"BroadcastProximityCache (from {from}, message: {message:?})"
435+
)
436+
}
421437
}
422438
}
423439
}
@@ -452,6 +468,7 @@ impl MessageStats for NetMessageV1 {
452468
NetMessageV1::Update(op) => op.id(),
453469
NetMessageV1::Aborted(tx) => tx,
454470
NetMessageV1::Unsubscribed { transaction, .. } => transaction,
471+
NetMessageV1::ProximityCache { .. } => Transaction::NULL,
455472
}
456473
}
457474

@@ -464,6 +481,7 @@ impl MessageStats for NetMessageV1 {
464481
NetMessageV1::Update(op) => op.target().as_ref().map(|b| b.borrow().clone()),
465482
NetMessageV1::Aborted(_) => None,
466483
NetMessageV1::Unsubscribed { .. } => None,
484+
NetMessageV1::ProximityCache { .. } => None,
467485
}
468486
}
469487

@@ -476,6 +494,7 @@ impl MessageStats for NetMessageV1 {
476494
NetMessageV1::Update(op) => op.requested_location(),
477495
NetMessageV1::Aborted(_) => None,
478496
NetMessageV1::Unsubscribed { .. } => None,
497+
NetMessageV1::ProximityCache { .. } => None,
479498
}
480499
}
481500
}
@@ -495,6 +514,9 @@ impl Display for NetMessage {
495514
Unsubscribed { key, from, .. } => {
496515
write!(f, "Unsubscribed {{ key: {key}, from: {from} }}")?;
497516
}
517+
ProximityCache { from, message } => {
518+
write!(f, "ProximityCache {{ from: {from}, message: {message:?} }}")?;
519+
}
498520
},
499521
};
500522
write!(f, "}}")

crates/core/src/node/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ mod message_processor;
7171
mod network_bridge;
7272
mod op_state_manager;
7373
mod p2p_impl;
74+
pub(crate) mod proximity_cache;
7475
mod request_router;
7576
pub(crate) mod testing_impl;
7677

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,30 @@ impl P2pConnManager {
815815
Err(e) => tracing::error!("Failed to send local subscribe response to result router: {}", e),
816816
}
817817
}
818+
NodeEvent::BroadcastProximityCache { from, message } => {
819+
// Broadcast ProximityCache message to all connected peers
820+
tracing::debug!(
821+
%from,
822+
?message,
823+
peer_count = ctx.connections.len(),
824+
"Broadcasting ProximityCache message to connected peers"
825+
);
826+
827+
use crate::message::{NetMessage, NetMessageV1};
828+
let msg = NetMessage::V1(NetMessageV1::ProximityCache {
829+
from: from.clone(),
830+
message: message.clone(),
831+
});
832+
833+
for peer in ctx.connections.keys() {
834+
if peer != &from {
835+
tracing::debug!(%peer, "Sending ProximityCache to peer");
836+
if let Err(e) = ctx.bridge.send(peer, msg.clone()).await {
837+
tracing::warn!(%peer, "Failed to send ProximityCache: {}", e);
838+
}
839+
}
840+
}
841+
}
818842
NodeEvent::Disconnect { cause } => {
819843
tracing::info!(
820844
"Disconnecting from network{}",

crates/core/src/node/op_state_manager.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ use crate::{
3232
ring::{ConnectionManager, LiveTransactionTracker, Ring},
3333
};
3434

35-
use super::{network_bridge::EventLoopNotificationsSender, NetEventRegister, NodeConfig};
35+
use super::{
36+
network_bridge::EventLoopNotificationsSender, proximity_cache::ProximityCacheManager,
37+
NetEventRegister, NodeConfig,
38+
};
3639

3740
#[cfg(debug_assertions)]
3841
macro_rules! check_id_op {
@@ -77,6 +80,8 @@ pub(crate) struct OpManager {
7780
pub peer_ready: Arc<AtomicBool>,
7881
/// Whether this node is a gateway
7982
pub is_gateway: bool,
83+
/// Proximity cache manager for tracking which neighbors have which contracts
84+
pub proximity_cache: Arc<ProximityCacheManager>,
8085
}
8186

8287
impl OpManager {
@@ -126,6 +131,8 @@ impl OpManager {
126131
tracing::debug!("Regular peer node: peer_ready will be set after first handshake");
127132
}
128133

134+
let proximity_cache = Arc::new(ProximityCacheManager::new());
135+
129136
Ok(Self {
130137
ring,
131138
ops,
@@ -135,6 +142,7 @@ impl OpManager {
135142
result_router_tx,
136143
peer_ready,
137144
is_gateway,
145+
proximity_cache,
138146
})
139147
}
140148

0 commit comments

Comments
 (0)