Skip to content

Commit ca76264

Browse files
sanityclaude
andauthored
fix: handle Unsubscribed messages by removing peer from subscriber list (#1999)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent bd9ab04 commit ca76264

File tree

3 files changed

+137
-8
lines changed

3 files changed

+137
-8
lines changed

crates/core/src/node/mod.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -802,10 +802,15 @@ async fn process_message_v1<CB>(
802802
)
803803
.await;
804804
}
805-
NetMessageV1::Unsubscribed { ref key, .. } => {
806-
if let Err(error) = subscribe(op_manager, *key, None).await {
807-
tracing::error!(%error, "Failed to subscribe to contract");
808-
}
805+
NetMessageV1::Unsubscribed {
806+
ref key, ref from, ..
807+
} => {
808+
tracing::debug!(
809+
"Received Unsubscribed message for contract {} from peer {}",
810+
key,
811+
from
812+
);
813+
op_manager.ring.remove_subscriber(key, from);
809814
break;
810815
}
811816
_ => break, // Exit the loop if no applicable message type is found
@@ -1018,10 +1023,15 @@ where
10181023
)
10191024
.await;
10201025
}
1021-
NetMessageV1::Unsubscribed { ref key, .. } => {
1022-
if let Err(error) = subscribe(op_manager, *key, None).await {
1023-
tracing::error!(%error, "Failed to subscribe to contract");
1024-
}
1026+
NetMessageV1::Unsubscribed {
1027+
ref key, ref from, ..
1028+
} => {
1029+
tracing::debug!(
1030+
"Received Unsubscribed message for contract {} from peer {}",
1031+
key,
1032+
from
1033+
);
1034+
op_manager.ring.remove_subscriber(key, from);
10251035
break;
10261036
}
10271037
_ => break, // Exit the loop if no applicable message type is found

crates/core/src/ring/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,12 @@ impl Ring {
302302
self.seeding_manager.add_subscriber(contract, subscriber)
303303
}
304304

305+
/// Remove a subscriber by peer ID from a specific contract
306+
pub fn remove_subscriber(&self, contract: &ContractKey, peer: &PeerId) {
307+
self.seeding_manager
308+
.remove_subscriber_by_peer(contract, peer)
309+
}
310+
305311
pub fn subscribers_of(
306312
&self,
307313
contract: &ContractKey,

crates/core/src/ring/seeding.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,20 @@ impl SeedingManager {
140140
});
141141
}
142142

143+
/// Remove a subscriber by peer ID from a specific contract
144+
pub fn remove_subscriber_by_peer(&self, contract: &ContractKey, peer: &crate::node::PeerId) {
145+
if let Some(mut subs) = self.subscribers.get_mut(contract) {
146+
if let Some(pos) = subs.iter().position(|l| &l.peer == peer) {
147+
subs.swap_remove(pos);
148+
tracing::debug!(
149+
"Removed peer {} from subscriber list for contract {}",
150+
peer,
151+
contract
152+
);
153+
}
154+
}
155+
}
156+
143157
/// Get all subscriptions across all contracts
144158
pub fn all_subscriptions(&self) -> Vec<(ContractKey, Vec<PeerKeyLocation>)> {
145159
self.subscribers
@@ -148,3 +162,102 @@ impl SeedingManager {
148162
.collect()
149163
}
150164
}
165+
166+
#[cfg(test)]
167+
mod tests {
168+
use super::*;
169+
use crate::node::PeerId;
170+
use crate::transport::TransportKeypair;
171+
use freenet_stdlib::prelude::{ContractInstanceId, ContractKey};
172+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
173+
174+
// Helper to create test PeerIds without expensive key generation
175+
fn test_peer_id(id: u8) -> PeerId {
176+
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, id)), 1000 + id as u16);
177+
let pub_key = TransportKeypair::new().public().clone();
178+
PeerId::new(addr, pub_key)
179+
}
180+
181+
#[test]
182+
fn test_remove_subscriber_by_peer() {
183+
let seeding_manager = SeedingManager::new();
184+
let contract_key = ContractKey::from(ContractInstanceId::new([1u8; 32]));
185+
186+
// Create test peers
187+
let peer1 = test_peer_id(1);
188+
let peer2 = test_peer_id(2);
189+
let peer3 = test_peer_id(3);
190+
191+
let peer_loc1 = PeerKeyLocation {
192+
peer: peer1.clone(),
193+
location: Some(Location::try_from(0.1).unwrap()),
194+
};
195+
let peer_loc2 = PeerKeyLocation {
196+
peer: peer2.clone(),
197+
location: Some(Location::try_from(0.2).unwrap()),
198+
};
199+
let peer_loc3 = PeerKeyLocation {
200+
peer: peer3.clone(),
201+
location: Some(Location::try_from(0.3).unwrap()),
202+
};
203+
204+
// Add subscribers
205+
assert!(seeding_manager
206+
.add_subscriber(&contract_key, peer_loc1.clone())
207+
.is_ok());
208+
assert!(seeding_manager
209+
.add_subscriber(&contract_key, peer_loc2.clone())
210+
.is_ok());
211+
assert!(seeding_manager
212+
.add_subscriber(&contract_key, peer_loc3.clone())
213+
.is_ok());
214+
215+
// Verify all subscribers are present
216+
{
217+
let subs = seeding_manager.subscribers_of(&contract_key).unwrap();
218+
assert_eq!(subs.len(), 3);
219+
}
220+
221+
// Remove peer2
222+
seeding_manager.remove_subscriber_by_peer(&contract_key, &peer2);
223+
224+
// Verify peer2 was removed
225+
{
226+
let subs = seeding_manager.subscribers_of(&contract_key).unwrap();
227+
assert_eq!(subs.len(), 2);
228+
assert!(!subs.iter().any(|p| p.peer == peer2));
229+
assert!(subs.iter().any(|p| p.peer == peer1));
230+
assert!(subs.iter().any(|p| p.peer == peer3));
231+
}
232+
233+
// Remove peer1
234+
seeding_manager.remove_subscriber_by_peer(&contract_key, &peer1);
235+
236+
// Verify peer1 was removed
237+
{
238+
let subs = seeding_manager.subscribers_of(&contract_key).unwrap();
239+
assert_eq!(subs.len(), 1);
240+
assert!(!subs.iter().any(|p| p.peer == peer1));
241+
assert!(subs.iter().any(|p| p.peer == peer3));
242+
}
243+
244+
// Remove non-existent peer (should not error)
245+
seeding_manager.remove_subscriber_by_peer(&contract_key, &peer2);
246+
247+
// Verify count unchanged
248+
{
249+
let subs = seeding_manager.subscribers_of(&contract_key).unwrap();
250+
assert_eq!(subs.len(), 1);
251+
}
252+
}
253+
254+
#[test]
255+
fn test_remove_subscriber_from_nonexistent_contract() {
256+
let seeding_manager = SeedingManager::new();
257+
let contract_key = ContractKey::from(ContractInstanceId::new([2u8; 32]));
258+
let peer = test_peer_id(1);
259+
260+
// Should not panic when removing from non-existent contract
261+
seeding_manager.remove_subscriber_by_peer(&contract_key, &peer);
262+
}
263+
}

0 commit comments

Comments
 (0)