From 66110e0daf6bac29cf72f2b8f55ae8e20792a811 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 7 Nov 2025 16:36:04 +0100 Subject: [PATCH 01/10] fix: add update fallback propagation --- crates/core/src/contract/executor/runtime.rs | 5 + crates/core/src/operations/update.rs | 213 ++++++++++++++++-- crates/core/src/wasm_runtime/store.rs | 19 +- .../wasm_runtime/tests/contract_metering.rs | 9 +- crates/core/src/wasm_runtime/tests/mod.rs | 5 +- 5 files changed, 221 insertions(+), 30 deletions(-) diff --git a/crates/core/src/contract/executor/runtime.rs b/crates/core/src/contract/executor/runtime.rs index dc9cdabc1..ff152c69e 100644 --- a/crates/core/src/contract/executor/runtime.rs +++ b/crates/core/src/contract/executor/runtime.rs @@ -871,6 +871,11 @@ impl Executor { .await .map_err(ExecutorError::other)?; + tracing::info!( + "Contract state updated for {key}, new_size_bytes={}", + new_state.as_ref().len() + ); + if let Err(err) = self .send_update_notification(key, parameters, &new_state) .await diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 4b21ccc72..b6ba487a4 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -4,7 +4,7 @@ use freenet_stdlib::client_api::{ErrorKind, HostResponse}; use freenet_stdlib::prelude::*; pub(crate) use self::messages::UpdateMsg; -use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; +use super::{get, OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::contract::{ContractHandlerEvent, StoreResponse}; use crate::message::{InnerMessage, NetMessage, NodeEvent, Transaction}; use crate::node::IsOperationCompleted; @@ -13,6 +13,7 @@ use crate::{ client_events::HostResult, node::{NetworkBridge, OpManager, PeerId}, }; +use std::collections::HashSet; pub(crate) struct UpdateOp { pub id: Transaction, @@ -248,9 +249,14 @@ impl Operation for UpdateOp { return_msg = None; } else { // Get broadcast targets for propagating UPDATE to subscribers - let broadcast_to = op_manager + let mut broadcast_to = op_manager .get_broadcast_targets_update(key, &request_sender.peer); + if broadcast_to.is_empty() { + broadcast_to = op_manager + .compute_update_fallback_targets(key, &request_sender.peer); + } + if broadcast_to.is_empty() { tracing::debug!( tx = %id, @@ -292,10 +298,21 @@ impl Operation for UpdateOp { } } else { // Contract not found locally - forward to another peer - let next_target = op_manager.ring.closest_potentially_caching( - key, - [&self_location.peer, &request_sender.peer].as_slice(), - ); + let skip_peers = [&self_location.peer, &request_sender.peer]; + let next_target = op_manager + .ring + .closest_potentially_caching(key, skip_peers.as_slice()) + .or_else(|| { + op_manager + .ring + .k_closest_potentially_caching( + key, + skip_peers.as_slice(), + 5, + ) + .into_iter() + .next() + }); if let Some(forward_target) = next_target { tracing::debug!( @@ -316,10 +333,33 @@ impl Operation for UpdateOp { }); new_state = None; } else { + let skip_list = [&self_location.peer, &request_sender.peer]; + let subscribers = op_manager + .ring + .subscribers_of(key) + .map(|subs| { + subs.value() + .iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>() + }) + .unwrap_or_default(); + let candidates = op_manager + .ring + .k_closest_potentially_caching(key, skip_list.as_slice(), 5) + .into_iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>(); + let connection_count = + op_manager.ring.connection_manager.num_connections(); // No peers available and we don't have the contract - error tracing::error!( tx = %id, %key, + subscribers = ?subscribers, + candidates = ?candidates, + connection_count, + request_sender = %request_sender.peer, "Cannot handle UPDATE: contract not found locally and no peers to forward to" ); return Err(OpError::RingError(RingError::NoCachingPeers(*key))); @@ -386,10 +426,14 @@ impl Operation for UpdateOp { return_msg = None; } else { // Get broadcast targets - let broadcast_to = + let mut broadcast_to = op_manager.get_broadcast_targets_update(key, &sender.peer); - // If no peers to broadcast to, nothing else to do + if broadcast_to.is_empty() { + broadcast_to = + op_manager.compute_update_fallback_targets(key, &sender.peer); + } + if broadcast_to.is_empty() { tracing::debug!( tx = %id, @@ -447,13 +491,100 @@ impl Operation for UpdateOp { }); new_state = None; } else { - // No more peers to try - error - tracing::error!( + tracing::warn!( tx = %id, %key, - "Cannot handle UPDATE SeekNode: contract not found and no peers to forward to" + "No forwarding targets for UPDATE SeekNode - attempting local fetch" ); - return Err(OpError::RingError(RingError::NoCachingPeers(*key))); + + let mut fetch_skip = HashSet::new(); + fetch_skip.insert(sender.peer.clone()); + + let get_op = get::start_op(*key, true, false); + if let Err(fetch_err) = + get::request_get(op_manager, get_op, fetch_skip).await + { + tracing::warn!( + tx = %id, + %key, + error = %fetch_err, + "Failed to fetch contract while handling UPDATE SeekNode" + ); + return Err(OpError::RingError(RingError::NoCachingPeers(*key))); + } + + if super::has_contract(op_manager, *key).await? { + tracing::info!( + tx = %id, + %key, + "Successfully fetched contract locally, applying UPDATE" + ); + let UpdateExecution { + value: updated_value, + summary: _summary, + changed, + } = update_contract( + op_manager, + *key, + value.clone(), + related_contracts.clone(), + ) + .await?; + + if !changed { + tracing::debug!( + tx = %id, + %key, + "Fetched contract apply produced no change during SeekNode fallback" + ); + new_state = None; + return_msg = None; + } else { + let mut broadcast_to = + op_manager.get_broadcast_targets_update(key, &sender.peer); + + if broadcast_to.is_empty() { + broadcast_to = op_manager + .compute_update_fallback_targets(key, &sender.peer); + } + + if broadcast_to.is_empty() { + tracing::debug!( + tx = %id, + %key, + "No broadcast targets after SeekNode fallback apply; finishing locally" + ); + new_state = None; + return_msg = None; + } else { + match try_to_broadcast( + *id, + true, + op_manager, + self.state, + (broadcast_to, sender.clone()), + *key, + updated_value.clone(), + false, + ) + .await + { + Ok((state, msg)) => { + new_state = state; + return_msg = msg; + } + Err(err) => return Err(err), + } + } + } + } else { + tracing::error!( + tx = %id, + %key, + "Contract still unavailable after fetch attempt during UPDATE SeekNode" + ); + return Err(OpError::RingError(RingError::NoCachingPeers(*key))); + } } } } @@ -487,9 +618,14 @@ impl Operation for UpdateOp { new_state = None; return_msg = None; } else { - let broadcast_to = + let mut broadcast_to = op_manager.get_broadcast_targets_update(key, &sender.peer); + if broadcast_to.is_empty() { + broadcast_to = + op_manager.compute_update_fallback_targets(key, &sender.peer); + } + tracing::debug!( "Successfully updated a value for contract {} @ {:?} - BroadcastTo - update", key, @@ -649,9 +785,11 @@ impl OpManager { .ring .subscribers_of(key) .map(|subs| { + let self_peer = self.ring.connection_manager.get_peer_key(); subs.value() .iter() .filter(|pk| &pk.peer != sender) + .filter(|pk| self_peer.as_ref().map(|me| &pk.peer != me).unwrap_or(true)) .cloned() .collect::>() }) @@ -671,15 +809,60 @@ impl OpManager { subscribers.len() ); } else { + let own_peer = self.ring.connection_manager.get_peer_key(); + let skip_slice = std::slice::from_ref(sender); + let fallback_candidates = self + .ring + .k_closest_potentially_caching(key, skip_slice, 5) + .into_iter() + .map(|candidate| format!("{:.8}", candidate.peer)) + .collect::>(); + tracing::warn!( - "UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate", + "UPDATE_PROPAGATION: contract={:.8} from={} NO_TARGETS - update will not propagate (self={:?}, fallback_candidates={:?})", key, - sender + sender, + own_peer.map(|p| format!("{:.8}", p)), + fallback_candidates ); } subscribers } + + fn compute_update_fallback_targets( + &self, + key: &ContractKey, + sender: &PeerId, + ) -> Vec { + let mut skip: HashSet = HashSet::new(); + skip.insert(sender.clone()); + if let Some(self_peer) = self.ring.connection_manager.get_peer_key() { + skip.insert(self_peer); + } + + let candidates = self + .ring + .k_closest_potentially_caching(key, &skip, 3) + .into_iter() + .filter(|candidate| &candidate.peer != sender) + .collect::>(); + + if !candidates.is_empty() { + tracing::info!( + "UPDATE_PROPAGATION: contract={:.8} from={} using fallback targets={}", + key, + sender, + candidates + .iter() + .map(|c| format!("{:.8}", c.peer)) + .collect::>() + .join(",") + ); + } + + candidates + } } fn build_op_result( diff --git a/crates/core/src/wasm_runtime/store.rs b/crates/core/src/wasm_runtime/store.rs index 15c701cbe..07044f377 100644 --- a/crates/core/src/wasm_runtime/store.rs +++ b/crates/core/src/wasm_runtime/store.rs @@ -7,6 +7,7 @@ use std::io::{self, BufReader, BufWriter, Seek, Write}; use std::path::{Path, PathBuf}; use std::time::Duration; use std::{fs::File, io::Read}; +use tracing::error; const INTERNAL_KEY: usize = 32; const TOMBSTONE_MARKER: usize = 1; @@ -325,7 +326,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re let mut original_reader = BufReader::new(original_file); let mut temp_writer = SafeWriter::::new(&temp_file_path, true).inspect_err(|_| { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } })?; @@ -340,7 +341,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re }; if let Err(err) = temp_writer.insert_record(store_key, value) { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(err); } @@ -356,7 +357,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re Err(other) => { // Handle other errors gracefully if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(other); } @@ -366,7 +367,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re // Check if any deleted records were found; if not, skip compaction if !any_deleted { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Ok(()); } @@ -374,7 +375,7 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re // Clean up and finalize the compaction process if let Err(e) = temp_writer.flush() { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(e); } @@ -382,14 +383,14 @@ fn compact_index_file(key_file_path: &Path) -> std::io::Re // Replace the original file with the temporary file if let Err(e) = fs::rename(&temp_file_path, key_file_path) { if let Err(e) = fs::remove_file(&lock_file_path) { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); } return Err(e); } // Remove the lock file fs::remove_file(&lock_file_path).map_err(|e| { - eprintln!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); + error!("{}:{}: Failed to remove lock file: {e}", file!(), line!()); e })?; @@ -589,13 +590,13 @@ mod tests { create_test_data(&mut file, &key_file_path, shared_data, i); } else if let Err(err) = super::compact_index_file::(&key_file_path) { - eprintln!("Thread encountered an error during compaction: {err}"); + error!("Thread encountered an error during compaction: {err}"); return Err(err); } barrier.wait(); // compact a last time so we know what data to compare against super::compact_index_file::(&key_file_path).map_err(|err| { - eprintln!("Thread encountered an error during compaction: {err}"); + error!("Thread encountered an error during compaction: {err}"); err }) }) diff --git a/crates/core/src/wasm_runtime/tests/contract_metering.rs b/crates/core/src/wasm_runtime/tests/contract_metering.rs index 88c63a857..c4d849ebe 100644 --- a/crates/core/src/wasm_runtime/tests/contract_metering.rs +++ b/crates/core/src/wasm_runtime/tests/contract_metering.rs @@ -5,6 +5,7 @@ use crate::wasm_runtime::tests::TestSetup; use crate::wasm_runtime::{ContractExecError, RuntimeInnerError}; use freenet_stdlib::prelude::*; use std::time::Instant; +use tracing::info; const TEST_CONTRACT_METERING: &str = "test_contract_metering"; @@ -52,7 +53,7 @@ fn validate_state_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -103,7 +104,7 @@ fn test_update_state_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -150,7 +151,7 @@ fn test_summarize_state_metering() -> Result<(), Box> { let result = runtime.summarize_state(&contract_key, &Parameters::from([].as_ref()), &state); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( @@ -202,7 +203,7 @@ fn test_get_state_delta_metering() -> Result<(), Box> { ); let duration = time.elapsed().as_secs_f64(); - println!("Duration: {duration:.2}s"); + info!("Duration: {duration:.2}s"); assert!(duration < 5.0, "Should not timeout"); assert!( diff --git a/crates/core/src/wasm_runtime/tests/mod.rs b/crates/core/src/wasm_runtime/tests/mod.rs index 955c4062e..110c49a0c 100644 --- a/crates/core/src/wasm_runtime/tests/mod.rs +++ b/crates/core/src/wasm_runtime/tests/mod.rs @@ -6,6 +6,7 @@ use freenet_stdlib::prelude::{ use crate::util::tests::get_temp_dir; use crate::util::workspace::get_workspace_target_dir; +use tracing::info; use super::{ContractStore, DelegateStore, SecretsStore}; @@ -22,7 +23,7 @@ pub(crate) fn get_test_module(name: &str) -> Result, Box Result, Box Date: Fri, 7 Nov 2025 22:11:03 +0100 Subject: [PATCH 02/10] fix(update): keep local subscribers and add PUT ack tracing --- crates/core/src/operations/put.rs | 16 ++++++++++++++++ crates/core/src/operations/update.rs | 23 +++++++++++++++++++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 2996bab9a..07dc0accc 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -570,6 +570,11 @@ impl Operation for PutOp { return_msg = None; } PutMsg::SuccessfulPut { id, .. } => { + tracing::debug!( + tx = %id, + current_state = ?self.state, + "PutOp::process_message: handling SuccessfulPut" + ); match self.state { Some(PutState::AwaitingResponse { key, @@ -654,6 +659,12 @@ impl Operation for PutOp { // Forward success message upstream if needed if let Some(upstream) = upstream { + tracing::trace!( + tx = %id, + %key, + upstream = %upstream.peer, + "PutOp::process_message: Forwarding SuccessfulPut upstream" + ); return_msg = Some(PutMsg::SuccessfulPut { id: *id, target: upstream, @@ -661,6 +672,11 @@ impl Operation for PutOp { sender: op_manager.ring.connection_manager.own_location(), }); } else { + tracing::trace!( + tx = %id, + %key, + "PutOp::process_message: SuccessfulPut originated locally; no upstream" + ); return_msg = None; } } diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index b6ba487a4..7e1f31921 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -786,10 +786,29 @@ impl OpManager { .subscribers_of(key) .map(|subs| { let self_peer = self.ring.connection_manager.get_peer_key(); + let allow_self = self_peer.as_ref().map(|me| me == sender).unwrap_or(false); subs.value() .iter() - .filter(|pk| &pk.peer != sender) - .filter(|pk| self_peer.as_ref().map(|me| &pk.peer != me).unwrap_or(true)) + .filter(|pk| { + // Allow the sender to remain in the broadcast list when we're the sender, + // so local auto-subscribe via GET/PUT still receives notifications. + if &pk.peer == sender { + allow_self + } else { + true + } + }) + .filter(|pk| { + if let Some(self_peer) = &self_peer { + if &pk.peer == self_peer { + allow_self + } else { + true + } + } else { + true + } + }) .cloned() .collect::>() }) From 4a6938745728e1c3e8d297648f14dd62c7cfd0b0 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 9 Nov 2025 10:54:48 -0600 Subject: [PATCH 03/10] fix(update): remove GET fallback propagation --- crates/core/src/operations/update.rs | 217 ++++++--------------------- 1 file changed, 42 insertions(+), 175 deletions(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 7e1f31921..d5e45319d 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -4,7 +4,7 @@ use freenet_stdlib::client_api::{ErrorKind, HostResponse}; use freenet_stdlib::prelude::*; pub(crate) use self::messages::UpdateMsg; -use super::{get, OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; +use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::contract::{ContractHandlerEvent, StoreResponse}; use crate::message::{InnerMessage, NetMessage, NodeEvent, Transaction}; use crate::node::IsOperationCompleted; @@ -13,7 +13,6 @@ use crate::{ client_events::HostResult, node::{NetworkBridge, OpManager, PeerId}, }; -use std::collections::HashSet; pub(crate) struct UpdateOp { pub id: Transaction, @@ -249,14 +248,9 @@ impl Operation for UpdateOp { return_msg = None; } else { // Get broadcast targets for propagating UPDATE to subscribers - let mut broadcast_to = op_manager + let broadcast_to = op_manager .get_broadcast_targets_update(key, &request_sender.peer); - if broadcast_to.is_empty() { - broadcast_to = op_manager - .compute_update_fallback_targets(key, &request_sender.peer); - } - if broadcast_to.is_empty() { tracing::debug!( tx = %id, @@ -298,21 +292,10 @@ impl Operation for UpdateOp { } } else { // Contract not found locally - forward to another peer - let skip_peers = [&self_location.peer, &request_sender.peer]; - let next_target = op_manager - .ring - .closest_potentially_caching(key, skip_peers.as_slice()) - .or_else(|| { - op_manager - .ring - .k_closest_potentially_caching( - key, - skip_peers.as_slice(), - 5, - ) - .into_iter() - .next() - }); + let next_target = op_manager.ring.closest_potentially_caching( + key, + [&self_location.peer, &request_sender.peer].as_slice(), + ); if let Some(forward_target) = next_target { tracing::debug!( @@ -333,6 +316,7 @@ impl Operation for UpdateOp { }); new_state = None; } else { + // No peers available and we don't have the contract - capture context let skip_list = [&self_location.peer, &request_sender.peer]; let subscribers = op_manager .ring @@ -352,7 +336,6 @@ impl Operation for UpdateOp { .collect::>(); let connection_count = op_manager.ring.connection_manager.num_connections(); - // No peers available and we don't have the contract - error tracing::error!( tx = %id, %key, @@ -426,14 +409,10 @@ impl Operation for UpdateOp { return_msg = None; } else { // Get broadcast targets - let mut broadcast_to = + let broadcast_to = op_manager.get_broadcast_targets_update(key, &sender.peer); - if broadcast_to.is_empty() { - broadcast_to = - op_manager.compute_update_fallback_targets(key, &sender.peer); - } - + // If no peers to broadcast to, nothing else to do if broadcast_to.is_empty() { tracing::debug!( tx = %id, @@ -491,100 +470,36 @@ impl Operation for UpdateOp { }); new_state = None; } else { - tracing::warn!( + // No more peers to try - capture context for diagnostics + let skip_list = [&sender.peer, &self_location.peer]; + let subscribers = op_manager + .ring + .subscribers_of(key) + .map(|subs| { + subs.value() + .iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>() + }) + .unwrap_or_default(); + let candidates = op_manager + .ring + .k_closest_potentially_caching(key, skip_list.as_slice(), 5) + .into_iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>(); + let connection_count = + op_manager.ring.connection_manager.num_connections(); + tracing::error!( tx = %id, %key, - "No forwarding targets for UPDATE SeekNode - attempting local fetch" + subscribers = ?subscribers, + candidates = ?candidates, + connection_count, + sender = %sender.peer, + "Cannot handle UPDATE SeekNode: contract not found and no peers to forward to" ); - - let mut fetch_skip = HashSet::new(); - fetch_skip.insert(sender.peer.clone()); - - let get_op = get::start_op(*key, true, false); - if let Err(fetch_err) = - get::request_get(op_manager, get_op, fetch_skip).await - { - tracing::warn!( - tx = %id, - %key, - error = %fetch_err, - "Failed to fetch contract while handling UPDATE SeekNode" - ); - return Err(OpError::RingError(RingError::NoCachingPeers(*key))); - } - - if super::has_contract(op_manager, *key).await? { - tracing::info!( - tx = %id, - %key, - "Successfully fetched contract locally, applying UPDATE" - ); - let UpdateExecution { - value: updated_value, - summary: _summary, - changed, - } = update_contract( - op_manager, - *key, - value.clone(), - related_contracts.clone(), - ) - .await?; - - if !changed { - tracing::debug!( - tx = %id, - %key, - "Fetched contract apply produced no change during SeekNode fallback" - ); - new_state = None; - return_msg = None; - } else { - let mut broadcast_to = - op_manager.get_broadcast_targets_update(key, &sender.peer); - - if broadcast_to.is_empty() { - broadcast_to = op_manager - .compute_update_fallback_targets(key, &sender.peer); - } - - if broadcast_to.is_empty() { - tracing::debug!( - tx = %id, - %key, - "No broadcast targets after SeekNode fallback apply; finishing locally" - ); - new_state = None; - return_msg = None; - } else { - match try_to_broadcast( - *id, - true, - op_manager, - self.state, - (broadcast_to, sender.clone()), - *key, - updated_value.clone(), - false, - ) - .await - { - Ok((state, msg)) => { - new_state = state; - return_msg = msg; - } - Err(err) => return Err(err), - } - } - } - } else { - tracing::error!( - tx = %id, - %key, - "Contract still unavailable after fetch attempt during UPDATE SeekNode" - ); - return Err(OpError::RingError(RingError::NoCachingPeers(*key))); - } + return Err(OpError::RingError(RingError::NoCachingPeers(*key))); } } } @@ -618,14 +533,9 @@ impl Operation for UpdateOp { new_state = None; return_msg = None; } else { - let mut broadcast_to = + let broadcast_to = op_manager.get_broadcast_targets_update(key, &sender.peer); - if broadcast_to.is_empty() { - broadcast_to = - op_manager.compute_update_fallback_targets(key, &sender.peer); - } - tracing::debug!( "Successfully updated a value for contract {} @ {:?} - BroadcastTo - update", key, @@ -790,25 +700,16 @@ impl OpManager { subs.value() .iter() .filter(|pk| { - // Allow the sender to remain in the broadcast list when we're the sender, - // so local auto-subscribe via GET/PUT still receives notifications. - if &pk.peer == sender { + // Allow the sender (or ourselves) to stay in the broadcast list when we're + // originating the UPDATE so local auto-subscribes still receive events. + let is_sender = &pk.peer == sender; + let is_self = self_peer.as_ref().map_or(false, |me| &pk.peer == me); + if is_sender || is_self { allow_self } else { true } }) - .filter(|pk| { - if let Some(self_peer) = &self_peer { - if &pk.peer == self_peer { - allow_self - } else { - true - } - } else { - true - } - }) .cloned() .collect::>() }) @@ -848,40 +749,6 @@ impl OpManager { subscribers } - - fn compute_update_fallback_targets( - &self, - key: &ContractKey, - sender: &PeerId, - ) -> Vec { - let mut skip: HashSet = HashSet::new(); - skip.insert(sender.clone()); - if let Some(self_peer) = self.ring.connection_manager.get_peer_key() { - skip.insert(self_peer); - } - - let candidates = self - .ring - .k_closest_potentially_caching(key, &skip, 3) - .into_iter() - .filter(|candidate| &candidate.peer != sender) - .collect::>(); - - if !candidates.is_empty() { - tracing::info!( - "UPDATE_PROPAGATION: contract={:.8} from={} using fallback targets={}", - key, - sender, - candidates - .iter() - .map(|c| format!("{:.8}", c.peer)) - .collect::>() - .join(",") - ); - } - - candidates - } } fn build_op_result( From e16169078f4969334db716f966f5686c80ec4385 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 9 Nov 2025 10:55:49 -0600 Subject: [PATCH 04/10] style(update): use direct equality in allow-self check --- crates/core/src/operations/update.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index d5e45319d..7743ce95b 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -703,7 +703,7 @@ impl OpManager { // Allow the sender (or ourselves) to stay in the broadcast list when we're // originating the UPDATE so local auto-subscribes still receive events. let is_sender = &pk.peer == sender; - let is_self = self_peer.as_ref().map_or(false, |me| &pk.peer == me); + let is_self = self_peer.as_ref() == Some(&pk.peer); if is_sender || is_self { allow_self } else { From 88d04dbbcc1f8da2dbeb5a3b3df530badb4fd479 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 7 Nov 2025 17:17:31 +0100 Subject: [PATCH 05/10] feat: harden subscription routing --- apps/freenet-ping/Cargo.toml | 2 +- apps/freenet-ping/app/Cargo.toml | 2 +- crates/core/src/operations/subscribe.rs | 582 +++++++++++++++--- crates/core/src/router/isotonic_estimator.rs | 9 +- crates/core/src/router/mod.rs | 17 +- .../src/topology/request_density_tracker.rs | 5 +- scripts/deploy-local-gateway.sh | 6 +- 7 files changed, 518 insertions(+), 105 deletions(-) diff --git a/apps/freenet-ping/Cargo.toml b/apps/freenet-ping/Cargo.toml index 0c834d5dc..2c0d2e32f 100644 --- a/apps/freenet-ping/Cargo.toml +++ b/apps/freenet-ping/Cargo.toml @@ -4,7 +4,7 @@ members = ["contracts/ping", "app", "types"] [workspace.dependencies] # freenet-stdlib = { path = "./../../stdlib/rust", features = ["contract"] } -freenet-stdlib = { version = "0.1.24" } +freenet-stdlib = { version = "0.1.14" } freenet-ping-types = { path = "types", default-features = false } chrono = { version = "0.4", default-features = false } testresult = "0.4" diff --git a/apps/freenet-ping/app/Cargo.toml b/apps/freenet-ping/app/Cargo.toml index dd0b05bf8..ef83d63ae 100644 --- a/apps/freenet-ping/app/Cargo.toml +++ b/apps/freenet-ping/app/Cargo.toml @@ -10,7 +10,7 @@ testing = ["freenet-stdlib/testing", "freenet/testing"] anyhow = "1.0" chrono = { workspace = true, features = ["default"] } clap = { version = "4.5", features = ["derive"] } -freenet-stdlib = { version = "0.1.24", features = ["net"] } +freenet-stdlib = { version = "0.1.22", features = ["net"] } freenet-ping-types = { path = "../types", features = ["std", "clap"] } futures = "0.3.31" rand = "0.9.2" diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 9963fc8bf..c8fab8952 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -3,10 +3,11 @@ use std::future::Future; use std::pin::Pin; pub(crate) use self::messages::SubscribeMsg; -use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; +use super::{get, OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::node::IsOperationCompleted; use crate::{ client_events::HostResult, + contract::{ContractHandlerEvent, StoreResponse}, message::{InnerMessage, NetMessage, Transaction}, node::{NetworkBridge, OpManager, PeerId}, ring::{Location, PeerKeyLocation, RingError}, @@ -16,9 +17,79 @@ use freenet_stdlib::{ prelude::*, }; use serde::{Deserialize, Serialize}; +use tokio::time::{sleep, Duration}; const MAX_RETRIES: usize = 10; +const LOCAL_FETCH_TIMEOUT_MS: u64 = 1_500; +const LOCAL_FETCH_POLL_INTERVAL_MS: u64 = 25; + +fn subscribers_snapshot(op_manager: &OpManager, key: &ContractKey) -> Vec { + op_manager + .ring + .subscribers_of(key) + .map(|subs| { + subs.iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>() + }) + .unwrap_or_default() +} + +/// Poll local storage for a short period until the fetched contract becomes available. +async fn wait_for_local_contract( + op_manager: &OpManager, + key: ContractKey, +) -> Result { + let mut elapsed = 0; + while elapsed < LOCAL_FETCH_TIMEOUT_MS { + if super::has_contract(op_manager, key).await? { + return Ok(true); + } + sleep(Duration::from_millis(LOCAL_FETCH_POLL_INTERVAL_MS)).await; + elapsed += LOCAL_FETCH_POLL_INTERVAL_MS; + } + Ok(false) +} + +async fn fetch_contract_if_missing( + op_manager: &OpManager, + key: ContractKey, +) -> Result<(), OpError> { + if has_contract_with_code(op_manager, key).await? { + return Ok(()); + } + let get_op = get::start_op(key, true, false); + get::request_get(op_manager, get_op, HashSet::new()).await?; + + if wait_for_local_contract(op_manager, key).await? + && has_contract_with_code(op_manager, key).await? + { + Ok(()) + } else { + Err(RingError::NoCachingPeers(key).into()) + } +} + +async fn has_contract_with_code(op_manager: &OpManager, key: ContractKey) -> Result { + match op_manager + .notify_contract_handler(ContractHandlerEvent::GetQuery { + key, + return_contract_code: true, + }) + .await? + { + ContractHandlerEvent::GetResponse { + response: + Ok(StoreResponse { + state: Some(_), + contract: Some(_), + }), + .. + } => Ok(true), + _ => Ok(false), + } +} #[derive(Debug)] enum SubscribeState { /// Prepare the request to subscribe. @@ -72,57 +143,79 @@ pub(crate) async fn request_subscribe( sub_op: SubscribeOp, ) -> Result<(), OpError> { if let Some(SubscribeState::PrepareRequest { id, key }) = &sub_op.state { + let own_loc = op_manager.ring.connection_manager.own_location(); + let local_has_contract = super::has_contract(op_manager, *key).await?; + + tracing::debug!( + tx = %id, + %key, + subscriber_peer = %own_loc.peer, + local_has_contract, + "subscribe: request_subscribe invoked" + ); + + let mut skip_list: HashSet = HashSet::new(); + skip_list.insert(own_loc.peer.clone()); + // Use k_closest_potentially_caching to try multiple candidates - const EMPTY: &[PeerId] = &[]; // Try up to 3 candidates - let candidates = op_manager.ring.k_closest_potentially_caching(key, EMPTY, 3); + let candidates = op_manager + .ring + .k_closest_potentially_caching(key, &skip_list, 3); + + if tracing::enabled!(tracing::Level::INFO) { + let skip_display: Vec = skip_list + .iter() + .map(|peer| format!("{:.8}", peer)) + .collect(); + let candidate_display: Vec = candidates + .iter() + .map(|cand| format!("{:.8}", cand.peer)) + .collect(); + tracing::info!( + tx = %id, + %key, + skip = ?skip_display, + candidates = ?candidate_display, + "subscribe: k_closest_potentially_caching results" + ); + } let target = match candidates.first() { Some(peer) => peer.clone(), None => { - // No remote peers available - check if we have the contract locally - tracing::debug!(%key, "No remote peers available for subscription, checking locally"); - - if super::has_contract(op_manager, *key).await? { - // We have the contract locally - register subscription and complete immediately - tracing::info!(%key, tx = %id, "Contract available locally, registering local subscription"); - - // CRITICAL FIX for issue #2001: Register subscriber in DashMap before completing - // Without this, UPDATE operations won't find subscribers for locally-cached contracts - let subscriber = op_manager.ring.connection_manager.own_location(); - if op_manager + // No remote peers available - rely on local contract if present. + tracing::debug!( + %key, + "No remote peers available for subscription, checking locally" + ); + + if local_has_contract { + tracing::info!( + %key, + tx = %id, + "No remote peers, fulfilling subscription locally" + ); + return complete_local_subscription(op_manager, *id, *key).await; + } else { + let connection_count = op_manager.ring.connection_manager.num_connections(); + let subscribers = op_manager .ring - .add_subscriber(key, subscriber.clone()) - .is_err() - { - tracing::error!(%key, tx = %id, "Failed to add local subscriber - max subscribers reached"); - // Continue anyway - client requested subscription and contract is local - } else { - tracing::debug!(%key, tx = %id, subscriber = %subscriber.peer, "Successfully registered local subscriber"); - } - - match op_manager - .notify_node_event(crate::message::NodeEvent::LocalSubscribeComplete { - tx: *id, - key: *key, - subscribed: true, + .subscribers_of(key) + .map(|subs| { + subs.value() + .iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>() }) - .await - { - Ok(()) => { - tracing::debug!(%key, tx = %id, "sent LocalSubscribeComplete event") - } - Err(e) => { - tracing::error!(%key, tx = %id, error = %e, "failed to send LocalSubscribeComplete event") - } - } - - // Mark subscription as completed for atomicity tracking - op_manager.completed(*id); - - return Ok(()); - } else { - tracing::debug!(%key, "Contract not available locally and no remote peers"); + .unwrap_or_default(); + tracing::warn!( + %key, + tx = %id, + connection_count, + subscribers = ?subscribers, + "Contract not available locally and no remote peers" + ); return Err(RingError::NoCachingPeers(*key).into()); } } @@ -130,15 +223,23 @@ pub(crate) async fn request_subscribe( // Forward to remote peer let new_state = Some(SubscribeState::AwaitingResponse { - skip_list: vec![].into_iter().collect(), + skip_list, retries: 0, current_hop: op_manager.ring.max_hops_to_live, upstream_subscriber: None, }); + tracing::debug!( + tx = %id, + %key, + target_peer = %target.peer, + target_location = ?target.location, + "subscribe: forwarding RequestSub to target peer" + ); let msg = SubscribeMsg::RequestSub { id: *id, key: *key, target, + subscriber: own_loc.clone(), }; let op = SubscribeOp { id: *id, @@ -154,6 +255,38 @@ pub(crate) async fn request_subscribe( Ok(()) } +async fn complete_local_subscription( + op_manager: &OpManager, + id: Transaction, + key: ContractKey, +) -> Result<(), OpError> { + let subscriber = op_manager.ring.connection_manager.own_location(); + if let Err(err) = op_manager.ring.add_subscriber(&key, subscriber.clone()) { + tracing::warn!( + %key, + tx = %id, + subscriber = %subscriber.peer, + error = ?err, + "Failed to register local subscriber" + ); + } else { + tracing::debug!( + %key, + tx = %id, + subscriber = %subscriber.peer, + "Registered local subscriber" + ); + } + + op_manager + .notify_node_event(crate::message::NodeEvent::LocalSubscribeComplete { + tx: id, + key, + subscribed: true, + }) + .await +} + pub(crate) struct SubscribeOp { pub id: Transaction, state: Option, @@ -240,21 +373,138 @@ impl Operation for SubscribeOp { let new_state; match input { - SubscribeMsg::RequestSub { id, key, target } => { - // fast tracked from the request_sub func - debug_assert!(matches!( + SubscribeMsg::RequestSub { + id, + key, + target: _, + subscriber, + } => { + tracing::debug!( + tx = %id, + %key, + subscriber = %subscriber.peer, + "subscribe: processing RequestSub" + ); + let own_loc = op_manager.ring.connection_manager.own_location(); + + if !matches!( self.state, Some(SubscribeState::AwaitingResponse { .. }) - )); - let sender = op_manager.ring.connection_manager.own_location(); + | Some(SubscribeState::ReceivedRequest) + ) { + tracing::warn!( + tx = %id, + %key, + state = ?self.state, + "subscribe: RequestSub received in unexpected state" + ); + return Err(OpError::invalid_transition(self.id)); + } + + if super::has_contract(op_manager, *key).await? { + let before_direct = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_before = ?before_direct, + "subscribe: handling RequestSub locally (contract available)" + ); + + if op_manager + .ring + .add_subscriber(key, subscriber.clone()) + .is_err() + { + tracing::warn!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_before = ?before_direct, + "subscribe: direct registration failed (max subscribers reached)" + ); + return Ok(OperationResult { + return_msg: Some(NetMessage::from(SubscribeMsg::ReturnSub { + id: *id, + key: *key, + sender: own_loc.clone(), + target: subscriber.clone(), + subscribed: false, + })), + state: None, + }); + } + + let after_direct = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_after = ?after_direct, + "subscribe: registered direct subscriber (RequestSub)" + ); + + if subscriber.peer == own_loc.peer { + tracing::debug!( + tx = %id, + %key, + "RequestSub originated locally; sending LocalSubscribeComplete" + ); + if let Err(err) = op_manager + .notify_node_event( + crate::message::NodeEvent::LocalSubscribeComplete { + tx: *id, + key: *key, + subscribed: true, + }, + ) + .await + { + tracing::error!( + tx = %id, + %key, + error = %err, + "Failed to send LocalSubscribeComplete event for RequestSub" + ); + return Err(err); + } + + return build_op_result(self.id, None, None); + } + + let return_msg = SubscribeMsg::ReturnSub { + id: *id, + key: *key, + sender: own_loc.clone(), + target: subscriber.clone(), + subscribed: true, + }; + + return build_op_result(self.id, None, Some(return_msg)); + } + + let mut skip = HashSet::new(); + skip.insert(subscriber.peer.clone()); + skip.insert(own_loc.peer.clone()); + + let forward_target = op_manager + .ring + .k_closest_potentially_caching(key, &skip, 3) + .into_iter() + .find(|candidate| candidate.peer != own_loc.peer) + .ok_or_else(|| RingError::NoCachingPeers(*key)) + .map_err(OpError::from)?; + + skip.insert(forward_target.peer.clone()); + new_state = self.state; return_msg = Some(SubscribeMsg::SeekNode { id: *id, key: *key, - target: target.clone(), - subscriber: sender.clone(), - skip_list: HashSet::from([sender.peer]), - htl: op_manager.ring.max_hops_to_live, + target: forward_target, + subscriber: subscriber.clone(), + skip_list: skip.clone(), + htl: op_manager.ring.max_hops_to_live.max(1), retries: 0, }); } @@ -267,6 +517,8 @@ impl Operation for SubscribeOp { htl, retries, } => { + let ring_max_htl = op_manager.ring.max_hops_to_live.max(1); + let htl = (*htl).min(ring_max_htl); let this_peer = op_manager.ring.connection_manager.own_location(); let return_not_subbed = || -> OperationResult { OperationResult { @@ -281,6 +533,16 @@ impl Operation for SubscribeOp { } }; + if htl == 0 { + tracing::warn!( + tx = %id, + %key, + subscriber = %subscriber.peer, + "Dropping Subscribe SeekNode with zero HTL" + ); + return Ok(return_not_subbed()); + } + if !super::has_contract(op_manager, *key).await? { tracing::debug!(tx = %id, %key, "Contract not found, trying other peer"); @@ -288,53 +550,133 @@ impl Operation for SubscribeOp { let candidates = op_manager .ring .k_closest_potentially_caching(key, skip_list, 3); - let Some(new_target) = candidates.first() else { - tracing::warn!(tx = %id, %key, "No remote peer available for forwarding"); - return Ok(return_not_subbed()); - }; - let new_target = new_target.clone(); - let new_htl = htl - 1; + if candidates.is_empty() { + let connection_count = + op_manager.ring.connection_manager.num_connections(); + tracing::warn!( + tx = %id, + %key, + skip = ?skip_list, + connection_count, + "No remote peer available for forwarding" + ); + tracing::info!( + tx = %id, + %key, + "Attempting to fetch contract locally before aborting subscribe" + ); - if new_htl == 0 { - tracing::debug!(tx = %id, %key, "Max number of hops reached while trying to get contract"); - return Ok(return_not_subbed()); - } + let get_op = get::start_op(*key, true, false); + if let Err(fetch_err) = + get::request_get(op_manager, get_op, HashSet::new()).await + { + tracing::warn!( + tx = %id, + %key, + error = %fetch_err, + "Failed to fetch contract locally while handling subscribe" + ); + return Ok(return_not_subbed()); + } - let mut new_skip_list = skip_list.clone(); - new_skip_list.insert(target.peer.clone()); - - tracing::debug!(tx = %id, new_target = %new_target.peer, "Forward request to peer"); - // Retry seek node when the contract to subscribe has not been found in this node - return build_op_result( - *id, - Some(SubscribeState::AwaitingResponse { - skip_list: new_skip_list.clone(), - retries: *retries, - current_hop: new_htl, - upstream_subscriber: Some(subscriber.clone()), - }), - (SubscribeMsg::SeekNode { - id: *id, - key: *key, - subscriber: this_peer, - target: new_target, - skip_list: new_skip_list, - htl: new_htl, - retries: *retries, - }) - .into(), - ); + if wait_for_local_contract(op_manager, *key).await? { + tracing::info!( + tx = %id, + %key, + "Fetched contract locally while handling subscribe" + ); + } else { + tracing::warn!( + tx = %id, + %key, + "Contract still unavailable locally after fetch attempt" + ); + return Ok(return_not_subbed()); + } + } else { + let Some(new_target) = candidates.first() else { + return Ok(return_not_subbed()); + }; + let new_target = new_target.clone(); + let new_htl = htl.saturating_sub(1); + + if new_htl == 0 { + tracing::debug!(tx = %id, %key, "Max number of hops reached while trying to get contract"); + return Ok(return_not_subbed()); + } + + let mut new_skip_list = skip_list.clone(); + new_skip_list.insert(target.peer.clone()); + + tracing::info!( + tx = %id, + %key, + new_target = %new_target.peer, + upstream = %subscriber.peer, + "Forward request to peer" + ); + tracing::debug!( + tx = %id, + %key, + candidates = ?candidates, + skip = ?new_skip_list, + "Forwarding seek to next candidate" + ); + // Retry seek node when the contract to subscribe has not been found in this node + return build_op_result( + *id, + Some(SubscribeState::AwaitingResponse { + skip_list: new_skip_list.clone(), + retries: *retries, + current_hop: new_htl, + upstream_subscriber: Some(subscriber.clone()), + }), + (SubscribeMsg::SeekNode { + id: *id, + key: *key, + subscriber: this_peer, + target: new_target, + skip_list: new_skip_list, + htl: new_htl, + retries: *retries, + }) + .into(), + ); + } + // After fetch attempt we should now have the contract locally. } + let before_direct = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_before = ?before_direct, + "subscribe: attempting to register direct subscriber" + ); if op_manager .ring .add_subscriber(key, subscriber.clone()) .is_err() { - tracing::debug!(tx = %id, %key, "Max number of subscribers reached for contract"); + tracing::warn!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_before = ?before_direct, + "subscribe: direct registration failed (max subscribers reached)" + ); // max number of subscribers for this contract reached return Ok(return_not_subbed()); } + let after_direct = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_after = ?after_direct, + "subscribe: registered direct subscriber" + ); match self.state { Some(SubscribeState::ReceivedRequest) => { @@ -426,6 +768,8 @@ impl Operation for SubscribeOp { upstream_subscriber, .. }) => { + fetch_contract_if_missing(op_manager, *key).await?; + tracing::info!( tx = %id, %key, @@ -433,6 +777,56 @@ impl Operation for SubscribeOp { provider = %sender.peer, "Subscribed to contract" ); + tracing::info!( + tx = %id, + %key, + upstream = upstream_subscriber + .as_ref() + .map(|loc| format!("{:.8}", loc.peer)) + .unwrap_or_else(|| "".into()), + "Handling ReturnSub (subscribed=true)" + ); + if let Some(upstream_subscriber) = upstream_subscriber.as_ref() { + let before_upstream = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + upstream = %upstream_subscriber.peer, + subscribers_before = ?before_upstream, + "subscribe: attempting to register upstream link" + ); + if op_manager + .ring + .add_subscriber(key, upstream_subscriber.clone()) + .is_err() + { + tracing::warn!( + tx = %id, + %key, + upstream = %upstream_subscriber.peer, + subscribers_before = ?before_upstream, + "subscribe: upstream registration failed (max subscribers reached)" + ); + } else { + let after_upstream = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + upstream = %upstream_subscriber.peer, + subscribers_after = ?after_upstream, + "subscribe: registered upstream link" + ); + } + } + + let before_provider = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + provider = %sender.peer, + subscribers_before = ?before_provider, + "subscribe: registering provider/subscription source" + ); if op_manager.ring.add_subscriber(key, sender.clone()).is_err() { // concurrently it reached max number of subscribers for this contract tracing::debug!( @@ -442,6 +836,14 @@ impl Operation for SubscribeOp { ); return Err(OpError::UnexpectedOpState); } + let after_provider = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + provider = %sender.peer, + subscribers_after = ?after_provider, + "subscribe: registered provider/subscription source" + ); new_state = Some(SubscribeState::Completed { key: *key }); if let Some(upstream_subscriber) = upstream_subscriber { @@ -518,6 +920,7 @@ mod messages { id: Transaction, key: ContractKey, target: PeerKeyLocation, + subscriber: PeerKeyLocation, }, SeekNode { id: Transaction, @@ -549,6 +952,7 @@ mod messages { fn target(&self) -> Option> { match self { + Self::RequestSub { target, .. } => Some(target), Self::SeekNode { target, .. } => Some(target), Self::ReturnSub { target, .. } => Some(target), _ => None, diff --git a/crates/core/src/router/isotonic_estimator.rs b/crates/core/src/router/isotonic_estimator.rs index 9a82ba228..a02cb8034 100644 --- a/crates/core/src/router/isotonic_estimator.rs +++ b/crates/core/src/router/isotonic_estimator.rs @@ -217,6 +217,7 @@ impl Adjustment { mod tests { use super::*; + use tracing::debug; // This test `test_peer_time_estimator` checks the accuracy of the `RoutingOutcomeEstimator` struct's // `estimate_retrieval_time()` method. It generates a list of 100 random events, where each event @@ -239,7 +240,7 @@ mod tests { for _ in 0..100 { let peer = PeerKeyLocation::random(); if peer.location.is_none() { - println!("Peer location is none for {peer:?}"); + debug!("Peer location is none for {peer:?}"); } let contract_location = Location::random(); events.push(simulate_positive_request(peer, contract_location)); @@ -265,7 +266,7 @@ mod tests { // Check that the errors are small let average_error = errors.iter().sum::() / errors.len() as f64; - println!("Average error: {average_error}"); + debug!("Average error: {average_error}"); assert!(average_error < 0.01); } @@ -276,7 +277,7 @@ mod tests { for _ in 0..100 { let peer = PeerKeyLocation::random(); if peer.location.is_none() { - println!("Peer location is none for {peer:?}"); + debug!("Peer location is none for {peer:?}"); } let contract_location = Location::random(); events.push(simulate_negative_request(peer, contract_location)); @@ -302,7 +303,7 @@ mod tests { // Check that the errors are small let average_error = errors.iter().sum::() / errors.len() as f64; - println!("Average error: {average_error}"); + debug!("Average error: {average_error}"); assert!(average_error < 0.01); } diff --git a/crates/core/src/router/mod.rs b/crates/core/src/router/mod.rs index ba459df22..f5749154b 100644 --- a/crates/core/src/router/mod.rs +++ b/crates/core/src/router/mod.rs @@ -1,7 +1,7 @@ mod isotonic_estimator; mod util; -use crate::ring::{Location, PeerKeyLocation}; +use crate::ring::{Distance, Location, PeerKeyLocation}; use isotonic_estimator::{EstimatorType, IsotonicEstimator, IsotonicEvent}; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -162,9 +162,12 @@ impl Router { let mut peer_distances: Vec<_> = peers .into_iter() - .filter_map(|peer| { - peer.location - .map(|loc| (peer, target_location.distance(loc))) + .map(|peer| { + let distance = peer + .location + .map(|loc| target_location.distance(loc)) + .unwrap_or_else(|| Distance::new(0.5)); + (peer, distance) }) .collect(); @@ -203,8 +206,10 @@ impl Router { let mut peer_distances: Vec<_> = peers .into_iter() .filter_map(|peer| { - peer.location - .map(|loc| (peer, target_location.distance(loc))) + peer.location.map(|loc| { + let distance = target_location.distance(loc); + (peer, distance) + }) }) .collect(); diff --git a/crates/core/src/topology/request_density_tracker.rs b/crates/core/src/topology/request_density_tracker.rs index df56efa01..4820c694c 100644 --- a/crates/core/src/topology/request_density_tracker.rs +++ b/crates/core/src/topology/request_density_tracker.rs @@ -248,6 +248,7 @@ pub(crate) enum DensityMapError { mod tests { use super::*; use std::sync::RwLock; + use tracing::debug; #[test] fn test_create_density_map() { @@ -327,12 +328,12 @@ mod tests { let result = result.unwrap(); // Scan and dumb densities 0.0 to 1.0 at 0.01 intervals - println!("Location\tDensity"); + debug!("Location\tDensity"); for i in 0..100 { let location = Location::new(i as f64 / 100.0); let density = result.get_density_at(location).unwrap(); // Print and round density to 2 decimals - println!( + debug!( "{}\t{}", location.as_f64(), (density * 100.0).round() / 100.0 diff --git a/scripts/deploy-local-gateway.sh b/scripts/deploy-local-gateway.sh index a731dcdd9..3da4c8b30 100755 --- a/scripts/deploy-local-gateway.sh +++ b/scripts/deploy-local-gateway.sh @@ -249,7 +249,8 @@ start_service() { case "$SERVICE_MANAGER" in systemd) - if systemctl list-unit-files | grep -q "^$service_arg.service" 2>/dev/null; then + # Check if unit file exists by querying systemctl directly + if systemctl list-unit-files "$service_arg.service" 2>/dev/null | grep -q "$service_arg.service"; then echo -n " Starting systemd service ($service_arg)... " if [[ "$DRY_RUN" == "true" ]]; then echo "[DRY RUN]" @@ -294,7 +295,8 @@ verify_service() { case "$SERVICE_MANAGER" in systemd) - if systemctl list-unit-files | grep -q "^$service_arg.service" 2>/dev/null; then + # Check if unit file exists by querying systemctl directly + if systemctl list-unit-files "$service_arg.service" 2>/dev/null | grep -q "$service_arg.service"; then echo -n " Verifying service status ($service_arg)... " sleep 2 # Give service time to start if systemctl is-active --quiet "$service_arg.service"; then From 816f22060fc6a8c13cb5dcd3e99ee11f5cc0c4a4 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Fri, 7 Nov 2025 23:20:24 +0100 Subject: [PATCH 06/10] fix(put): preserve upstream during broadcast --- crates/core/src/operations/put.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 07dc0accc..2c7581f3c 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -885,6 +885,14 @@ async fn try_to_broadcast( _ => false, }; + let preserved_upstream = match &state { + Some(PutState::AwaitingResponse { + upstream: Some(existing), + .. + }) => Some(existing.clone()), + _ => None, + }; + match state { // Handle initiating node that's also the target (single node or targeting self) Some(PutState::AwaitingResponse { @@ -923,9 +931,12 @@ async fn try_to_broadcast( key ); // means the whole tx finished so can return early + let upstream_for_completion = preserved_upstream + .clone() + .or_else(|| Some(upstream.clone())); new_state = Some(PutState::AwaitingResponse { key, - upstream: Some(upstream), + upstream: upstream_for_completion, contract: contract.clone(), // No longer optional state: new_value.clone(), subscribe, From c32cf6b5f56992524bad87abe3c80c94ed4c6a43 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sat, 8 Nov 2025 01:16:07 +0100 Subject: [PATCH 07/10] fix(put): deliver SuccessfulPut directly to origin --- crates/core/src/operations/put.rs | 78 +++++++++++++++++++++++++++++-- crates/core/src/tracing/mod.rs | 1 + 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 2c7581f3c..aed616fbc 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -164,6 +164,7 @@ impl Operation for PutOp { PutMsg::RequestPut { id, sender, + origin, contract, related_contracts, value, @@ -276,6 +277,7 @@ impl Operation for PutOp { return_msg = Some(PutMsg::SeekNode { id: *id, sender: own_location.clone(), + origin: origin.clone(), target: forward_target, value: modified_value.clone(), contract: contract.clone(), @@ -290,6 +292,7 @@ impl Operation for PutOp { contract: contract.clone(), state: modified_value, subscribe, + origin: origin.clone(), }); } else { // No other peers to forward to - we're the final destination @@ -305,6 +308,7 @@ impl Operation for PutOp { target: sender.clone(), key, sender: own_location.clone(), + origin: origin.clone(), }); // Mark operation as finished @@ -319,6 +323,7 @@ impl Operation for PutOp { htl, target, sender, + origin, } => { // Get the contract key and check if we should handle it let key = contract.key(); @@ -345,6 +350,7 @@ impl Operation for PutOp { *id, new_htl, HashSet::from([sender.peer.clone()]), + origin.clone(), ) .await } else { @@ -406,6 +412,7 @@ impl Operation for PutOp { last_hop, op_manager, self.state, + origin.clone(), (broadcast_to, sender.clone()), key, (contract.clone(), value.clone()), @@ -425,6 +432,7 @@ impl Operation for PutOp { new_value, contract, sender, + origin, .. } => { // Get own location @@ -457,6 +465,7 @@ impl Operation for PutOp { false, op_manager, self.state, + origin.clone(), (broadcast_to, sender.clone()), *key, (contract.clone(), updated_value), @@ -478,6 +487,7 @@ impl Operation for PutOp { new_value, contract, upstream, + origin, .. } => { // Get own location and initialize counter @@ -502,6 +512,7 @@ impl Operation for PutOp { target: upstream.clone(), key: *key, sender: sender.clone(), + origin: origin.clone(), }; tracing::trace!( @@ -526,6 +537,7 @@ impl Operation for PutOp { key: *key, new_value: new_value.clone(), sender: sender.clone(), + origin: origin.clone(), contract: contract.clone(), target: peer.clone(), }; @@ -582,6 +594,7 @@ impl Operation for PutOp { contract, state, subscribe, + origin: state_origin, }) => { tracing::debug!( tx = %id, @@ -657,19 +670,22 @@ impl Operation for PutOp { } } + let local_peer = op_manager.ring.connection_manager.own_location(); + // Forward success message upstream if needed - if let Some(upstream) = upstream { + if let Some(upstream_peer) = upstream.clone() { tracing::trace!( tx = %id, %key, - upstream = %upstream.peer, + upstream = %upstream_peer.peer, "PutOp::process_message: Forwarding SuccessfulPut upstream" ); return_msg = Some(PutMsg::SuccessfulPut { id: *id, - target: upstream, + target: upstream_peer, key, - sender: op_manager.ring.connection_manager.own_location(), + sender: local_peer.clone(), + origin: state_origin.clone(), }); } else { tracing::trace!( @@ -679,6 +695,34 @@ impl Operation for PutOp { ); return_msg = None; } + + // Send a direct acknowledgement to the original requester if we are not it + if state_origin.peer != local_peer.peer + && !upstream + .as_ref() + .map(|u| u.peer == state_origin.peer) + .unwrap_or(false) + { + let direct_ack = PutMsg::SuccessfulPut { + id: *id, + target: state_origin.clone(), + key, + sender: local_peer, + origin: state_origin.clone(), + }; + + if let Err(err) = conn_manager + .send(&state_origin.peer, NetMessage::from(direct_ack)) + .await + { + tracing::warn!( + tx = %id, + %key, + origin_peer = %state_origin.peer, + "Failed to send direct SuccessfulPut to origin: {err}" + ); + } + } } Some(PutState::Finished { .. }) => { // Operation already completed - this is a duplicate SuccessfulPut message @@ -700,6 +744,7 @@ impl Operation for PutOp { htl, sender, skip_list, + origin, .. } => { // Get contract key and own location @@ -747,6 +792,7 @@ impl Operation for PutOp { *id, new_htl, new_skip_list.clone(), + origin.clone(), ) .await; @@ -815,6 +861,7 @@ impl Operation for PutOp { last_hop, op_manager, self.state, + origin.clone(), (broadcast_to, sender.clone()), key, (contract.clone(), new_value.clone()), @@ -868,11 +915,13 @@ fn build_op_result( }) } +#[allow(clippy::too_many_arguments)] async fn try_to_broadcast( id: Transaction, last_hop: bool, op_manager: &OpManager, state: Option, + origin: PeerKeyLocation, (broadcast_to, upstream): (Vec, PeerKeyLocation), key: ContractKey, (contract, new_value): (ContractContainer, WrappedState), @@ -940,6 +989,7 @@ async fn try_to_broadcast( contract: contract.clone(), // No longer optional state: new_value.clone(), subscribe, + origin: origin.clone(), }); return_msg = None; } else if !broadcast_to.is_empty() { @@ -954,6 +1004,7 @@ async fn try_to_broadcast( contract, upstream, sender: op_manager.ring.connection_manager.own_location(), + origin: origin.clone(), }); let op = PutOp { @@ -971,6 +1022,7 @@ async fn try_to_broadcast( target: upstream, key, sender: op_manager.ring.connection_manager.own_location(), + origin, }); } } @@ -1030,6 +1082,7 @@ pub(crate) fn start_op_with_id( } #[derive(Debug)] +#[allow(clippy::large_enum_variant)] pub enum PutState { ReceivedRequest, /// Preparing request for put op. @@ -1047,6 +1100,7 @@ pub enum PutState { contract: ContractContainer, state: WrappedState, subscribe: bool, + origin: PeerKeyLocation, }, /// Broadcasting changes to subscribers. BroadcastOngoing, @@ -1127,6 +1181,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re contract: contract.clone(), state: updated_value.clone(), subscribe, + origin: own_location.clone(), }); // Create a SuccessfulPut message to trigger the completion handling @@ -1135,6 +1190,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re target: own_location.clone(), key, sender: own_location.clone(), + origin: own_location.clone(), }; // Use notify_op_change to trigger the completion handling @@ -1153,6 +1209,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re false, op_manager, broadcast_state, + own_location.clone(), (broadcast_to, sender), key, (contract.clone(), updated_value), @@ -1217,12 +1274,14 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re contract: contract.clone(), state: updated_value.clone(), subscribe, + origin: own_location.clone(), }); // Create RequestPut message and forward to target peer let msg = PutMsg::RequestPut { id, - sender: own_location, + sender: own_location.clone(), + origin: own_location, contract, related_contracts, value: updated_value, @@ -1282,6 +1341,7 @@ async fn put_contract( /// It returns whether this peer should be storing the contract or not. /// /// This operation is "fire and forget" and the node does not keep track if is successful or not. +#[allow(clippy::too_many_arguments)] async fn forward_put( op_manager: &OpManager, conn_manager: &CB, @@ -1290,6 +1350,7 @@ async fn forward_put( id: Transaction, htl: usize, skip_list: HashSet, + origin: PeerKeyLocation, ) -> bool where CB: NetworkBridge, @@ -1347,6 +1408,7 @@ where id, sender: own_pkloc, target: peer.clone(), + origin, contract: contract.clone(), new_value: new_value.clone(), htl, @@ -1386,6 +1448,7 @@ mod messages { RequestPut { id: Transaction, sender: PeerKeyLocation, + origin: PeerKeyLocation, contract: ContractContainer, #[serde(deserialize_with = "RelatedContracts::deser_related_contracts")] related_contracts: RelatedContracts<'static>, @@ -1401,6 +1464,7 @@ mod messages { id: Transaction, sender: PeerKeyLocation, target: PeerKeyLocation, + origin: PeerKeyLocation, contract: ContractContainer, new_value: WrappedState, /// current htl, reduced by one at each hop @@ -1413,12 +1477,14 @@ mod messages { target: PeerKeyLocation, key: ContractKey, sender: PeerKeyLocation, + origin: PeerKeyLocation, }, /// Target the node which is closest to the key SeekNode { id: Transaction, sender: PeerKeyLocation, target: PeerKeyLocation, + origin: PeerKeyLocation, value: WrappedState, contract: ContractContainer, #[serde(deserialize_with = "RelatedContracts::deser_related_contracts")] @@ -1436,11 +1502,13 @@ mod messages { contract: ContractContainer, upstream: PeerKeyLocation, sender: PeerKeyLocation, + origin: PeerKeyLocation, }, /// Broadcasting a change to a peer, which then will relay the changes to other peers. BroadcastTo { id: Transaction, sender: PeerKeyLocation, + origin: PeerKeyLocation, key: ContractKey, new_value: WrappedState, contract: ContractContainer, diff --git a/crates/core/src/tracing/mod.rs b/crates/core/src/tracing/mod.rs index bde43deda..d2c2b7133 100644 --- a/crates/core/src/tracing/mod.rs +++ b/crates/core/src/tracing/mod.rs @@ -241,6 +241,7 @@ impl<'a> NetEventLog<'a> { target, key, sender, + .. }) => EventKind::Put(PutEvent::PutSuccess { id: *id, requester: sender.clone(), From e8d6d5752102650716c30dcbf59f064d9f28ceef Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 9 Nov 2025 10:42:01 -0600 Subject: [PATCH 08/10] fix(subscriptions): address review feedback --- apps/freenet-ping/Cargo.toml | 3 +- apps/freenet-ping/app/Cargo.toml | 2 +- crates/core/src/operations/put.rs | 4 +- crates/core/src/operations/subscribe.rs | 158 ++++++++++++------------ crates/core/src/router/mod.rs | 17 +-- 5 files changed, 87 insertions(+), 97 deletions(-) diff --git a/apps/freenet-ping/Cargo.toml b/apps/freenet-ping/Cargo.toml index 2c0d2e32f..79a379bde 100644 --- a/apps/freenet-ping/Cargo.toml +++ b/apps/freenet-ping/Cargo.toml @@ -4,7 +4,7 @@ members = ["contracts/ping", "app", "types"] [workspace.dependencies] # freenet-stdlib = { path = "./../../stdlib/rust", features = ["contract"] } -freenet-stdlib = { version = "0.1.14" } +freenet-stdlib = { version = "0.1.24" } freenet-ping-types = { path = "types", default-features = false } chrono = { version = "0.4", default-features = false } testresult = "0.4" @@ -19,4 +19,3 @@ debug = false codegen-units = 1 panic = 'abort' strip = true - diff --git a/apps/freenet-ping/app/Cargo.toml b/apps/freenet-ping/app/Cargo.toml index ef83d63ae..dd0b05bf8 100644 --- a/apps/freenet-ping/app/Cargo.toml +++ b/apps/freenet-ping/app/Cargo.toml @@ -10,7 +10,7 @@ testing = ["freenet-stdlib/testing", "freenet/testing"] anyhow = "1.0" chrono = { workspace = true, features = ["default"] } clap = { version = "4.5", features = ["derive"] } -freenet-stdlib = { version = "0.1.22", features = ["net"] } +freenet-stdlib = { version = "0.1.24", features = ["net"] } freenet-ping-types = { path = "../types", features = ["std", "clap"] } futures = "0.3.31" rand = "0.9.2" diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index aed616fbc..95c96cf4a 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -980,9 +980,7 @@ async fn try_to_broadcast( key ); // means the whole tx finished so can return early - let upstream_for_completion = preserved_upstream - .clone() - .or_else(|| Some(upstream.clone())); + let upstream_for_completion = preserved_upstream.clone().or(Some(upstream.clone())); new_state = Some(PutState::AwaitingResponse { key, upstream: upstream_for_completion, diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index c8fab8952..c0162f5f1 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -20,7 +20,9 @@ use serde::{Deserialize, Serialize}; use tokio::time::{sleep, Duration}; const MAX_RETRIES: usize = 10; +/// Maximum time in milliseconds to wait for a locally fetched contract to become available. const LOCAL_FETCH_TIMEOUT_MS: u64 = 1_500; +/// Polling interval in milliseconds while waiting for a fetched contract to be stored locally. const LOCAL_FETCH_POLL_INTERVAL_MS: u64 = 25; fn subscribers_snapshot(op_manager: &OpManager, key: &ContractKey) -> Vec { @@ -35,6 +37,50 @@ fn subscribers_snapshot(op_manager: &OpManager, key: &ContractKey) -> Vec Result<(), ()> { + let before = subscribers_snapshot(op_manager, key); + tracing::debug!( + tx = %tx, + %key, + subscriber = %subscriber.peer, + stage, + subscribers_before = ?before, + "subscribe: attempting to register subscriber" + ); + + match op_manager.ring.add_subscriber(key, subscriber.clone()) { + Ok(()) => { + let after = subscribers_snapshot(op_manager, key); + tracing::debug!( + tx = %tx, + %key, + subscriber = %subscriber.peer, + stage, + subscribers_after = ?after, + "subscribe: registered subscriber" + ); + Ok(()) + } + Err(_) => { + tracing::warn!( + tx = %tx, + %key, + subscriber = %subscriber.peer, + stage, + subscribers_before = ?before, + "subscribe: subscriber registration failed (max subscribers reached)" + ); + Err(()) + } + } +} + /// Poll local storage for a short period until the fetched contract becomes available. async fn wait_for_local_contract( op_manager: &OpManager, @@ -646,37 +692,18 @@ impl Operation for SubscribeOp { // After fetch attempt we should now have the contract locally. } - let before_direct = subscribers_snapshot(op_manager, key); - tracing::info!( - tx = %id, - %key, - subscriber = %subscriber.peer, - subscribers_before = ?before_direct, - "subscribe: attempting to register direct subscriber" - ); - if op_manager - .ring - .add_subscriber(key, subscriber.clone()) - .is_err() + if register_subscriber_with_logging( + id, + op_manager, + key, + subscriber, + "direct subscriber", + ) + .is_err() { - tracing::warn!( - tx = %id, - %key, - subscriber = %subscriber.peer, - subscribers_before = ?before_direct, - "subscribe: direct registration failed (max subscribers reached)" - ); // max number of subscribers for this contract reached return Ok(return_not_subbed()); } - let after_direct = subscribers_snapshot(op_manager, key); - tracing::info!( - tx = %id, - %key, - subscriber = %subscriber.peer, - subscribers_after = ?after_direct, - "subscribe: registered direct subscriber" - ); match self.state { Some(SubscribeState::ReceivedRequest) => { @@ -768,7 +795,15 @@ impl Operation for SubscribeOp { upstream_subscriber, .. }) => { - fetch_contract_if_missing(op_manager, *key).await?; + if let Err(err) = fetch_contract_if_missing(op_manager, *key).await { + tracing::warn!( + tx = %id, + %key, + error = %err, + "Failed to fetch contract code after successful subscription" + ); + return Err(err); + } tracing::info!( tx = %id, @@ -787,63 +822,26 @@ impl Operation for SubscribeOp { "Handling ReturnSub (subscribed=true)" ); if let Some(upstream_subscriber) = upstream_subscriber.as_ref() { - let before_upstream = subscribers_snapshot(op_manager, key); - tracing::info!( - tx = %id, - %key, - upstream = %upstream_subscriber.peer, - subscribers_before = ?before_upstream, - "subscribe: attempting to register upstream link" + let _ = register_subscriber_with_logging( + id, + op_manager, + key, + upstream_subscriber, + "upstream link", ); - if op_manager - .ring - .add_subscriber(key, upstream_subscriber.clone()) - .is_err() - { - tracing::warn!( - tx = %id, - %key, - upstream = %upstream_subscriber.peer, - subscribers_before = ?before_upstream, - "subscribe: upstream registration failed (max subscribers reached)" - ); - } else { - let after_upstream = subscribers_snapshot(op_manager, key); - tracing::info!( - tx = %id, - %key, - upstream = %upstream_subscriber.peer, - subscribers_after = ?after_upstream, - "subscribe: registered upstream link" - ); - } } - let before_provider = subscribers_snapshot(op_manager, key); - tracing::info!( - tx = %id, - %key, - provider = %sender.peer, - subscribers_before = ?before_provider, - "subscribe: registering provider/subscription source" - ); - if op_manager.ring.add_subscriber(key, sender.clone()).is_err() { - // concurrently it reached max number of subscribers for this contract - tracing::debug!( - tx = %id, - %key, - "Max number of subscribers reached for contract" - ); + if register_subscriber_with_logging( + id, + op_manager, + key, + sender, + "provider link", + ) + .is_err() + { return Err(OpError::UnexpectedOpState); } - let after_provider = subscribers_snapshot(op_manager, key); - tracing::info!( - tx = %id, - %key, - provider = %sender.peer, - subscribers_after = ?after_provider, - "subscribe: registered provider/subscription source" - ); new_state = Some(SubscribeState::Completed { key: *key }); if let Some(upstream_subscriber) = upstream_subscriber { diff --git a/crates/core/src/router/mod.rs b/crates/core/src/router/mod.rs index f5749154b..ba459df22 100644 --- a/crates/core/src/router/mod.rs +++ b/crates/core/src/router/mod.rs @@ -1,7 +1,7 @@ mod isotonic_estimator; mod util; -use crate::ring::{Distance, Location, PeerKeyLocation}; +use crate::ring::{Location, PeerKeyLocation}; use isotonic_estimator::{EstimatorType, IsotonicEstimator, IsotonicEvent}; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -162,12 +162,9 @@ impl Router { let mut peer_distances: Vec<_> = peers .into_iter() - .map(|peer| { - let distance = peer - .location - .map(|loc| target_location.distance(loc)) - .unwrap_or_else(|| Distance::new(0.5)); - (peer, distance) + .filter_map(|peer| { + peer.location + .map(|loc| (peer, target_location.distance(loc))) }) .collect(); @@ -206,10 +203,8 @@ impl Router { let mut peer_distances: Vec<_> = peers .into_iter() .filter_map(|peer| { - peer.location.map(|loc| { - let distance = target_location.distance(loc); - (peer, distance) - }) + peer.location + .map(|loc| (peer, target_location.distance(loc))) }) .collect(); From e243c8d0405b1d15ba525ad990b07a970e52f352 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 9 Nov 2025 11:10:48 -0600 Subject: [PATCH 09/10] fix(router): handle peers without location --- crates/core/src/router/mod.rs | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/crates/core/src/router/mod.rs b/crates/core/src/router/mod.rs index ba459df22..3f1f80a4f 100644 --- a/crates/core/src/router/mod.rs +++ b/crates/core/src/router/mod.rs @@ -1,7 +1,7 @@ mod isotonic_estimator; mod util; -use crate::ring::{Location, PeerKeyLocation}; +use crate::ring::{Distance, Location, PeerKeyLocation}; use isotonic_estimator::{EstimatorType, IsotonicEstimator, IsotonicEvent}; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -20,6 +20,19 @@ pub(crate) struct Router { } impl Router { + /// Some code paths (bootstrap, tests) hand Router peer entries before the + /// remote has published a location. Treat them as midway around the ring so + /// we still consider them instead of dropping the candidate set entirely. + #[inline] + fn peer_distance_or_default( + peer: &PeerKeyLocation, + target_location: &Location, + ) -> Distance { + peer.location + .map(|loc| target_location.distance(loc)) + .unwrap_or_else(|| Distance::new(0.5)) + } + pub fn new(history: &[RouteEvent]) -> Self { let failure_outcomes: Vec = history .iter() @@ -162,9 +175,9 @@ impl Router { let mut peer_distances: Vec<_> = peers .into_iter() - .filter_map(|peer| { - peer.location - .map(|loc| (peer, target_location.distance(loc))) + .map(|peer| { + let distance = Self::peer_distance_or_default(peer, target_location); + (peer, distance) }) .collect(); @@ -202,9 +215,9 @@ impl Router { let mut peer_distances: Vec<_> = peers .into_iter() - .filter_map(|peer| { - peer.location - .map(|loc| (peer, target_location.distance(loc))) + .map(|peer| { + let distance = Self::peer_distance_or_default(peer, &target_location); + (peer, distance) }) .collect(); From e1f507b3567cd5ea8c3df8264b9d3ca884a66a85 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 9 Nov 2025 11:34:59 -0600 Subject: [PATCH 10/10] style(router): run rustfmt --- crates/core/src/router/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/core/src/router/mod.rs b/crates/core/src/router/mod.rs index 3f1f80a4f..1687cc8a4 100644 --- a/crates/core/src/router/mod.rs +++ b/crates/core/src/router/mod.rs @@ -24,10 +24,7 @@ impl Router { /// remote has published a location. Treat them as midway around the ring so /// we still consider them instead of dropping the candidate set entirely. #[inline] - fn peer_distance_or_default( - peer: &PeerKeyLocation, - target_location: &Location, - ) -> Distance { + fn peer_distance_or_default(peer: &PeerKeyLocation, target_location: &Location) -> Distance { peer.location .map(|loc| target_location.distance(loc)) .unwrap_or_else(|| Distance::new(0.5))