diff --git a/apps/freenet-ping/Cargo.toml b/apps/freenet-ping/Cargo.toml index 0c834d5dc..2c0d2e32f 100644 --- a/apps/freenet-ping/Cargo.toml +++ b/apps/freenet-ping/Cargo.toml @@ -4,7 +4,7 @@ members = ["contracts/ping", "app", "types"] [workspace.dependencies] # freenet-stdlib = { path = "./../../stdlib/rust", features = ["contract"] } -freenet-stdlib = { version = "0.1.24" } +freenet-stdlib = { version = "0.1.14" } freenet-ping-types = { path = "types", default-features = false } chrono = { version = "0.4", default-features = false } testresult = "0.4" diff --git a/apps/freenet-ping/app/Cargo.toml b/apps/freenet-ping/app/Cargo.toml index dd0b05bf8..ef83d63ae 100644 --- a/apps/freenet-ping/app/Cargo.toml +++ b/apps/freenet-ping/app/Cargo.toml @@ -10,7 +10,7 @@ testing = ["freenet-stdlib/testing", "freenet/testing"] anyhow = "1.0" chrono = { workspace = true, features = ["default"] } clap = { version = "4.5", features = ["derive"] } -freenet-stdlib = { version = "0.1.24", features = ["net"] } +freenet-stdlib = { version = "0.1.22", features = ["net"] } freenet-ping-types = { path = "../types", features = ["std", "clap"] } futures = "0.3.31" rand = "0.9.2" diff --git a/crates/core/src/operations/put.rs b/crates/core/src/operations/put.rs index 07dc0accc..aed616fbc 100644 --- a/crates/core/src/operations/put.rs +++ b/crates/core/src/operations/put.rs @@ -164,6 +164,7 @@ impl Operation for PutOp { PutMsg::RequestPut { id, sender, + origin, contract, related_contracts, value, @@ -276,6 +277,7 @@ impl Operation for PutOp { return_msg = Some(PutMsg::SeekNode { id: *id, sender: own_location.clone(), + origin: origin.clone(), target: forward_target, value: modified_value.clone(), contract: contract.clone(), @@ -290,6 +292,7 @@ impl Operation for PutOp { contract: contract.clone(), state: modified_value, subscribe, + origin: origin.clone(), }); } else { // No other peers to forward to - we're the final destination @@ -305,6 +308,7 @@ impl Operation for PutOp { target: sender.clone(), key, sender: own_location.clone(), + origin: origin.clone(), }); // Mark operation as finished @@ -319,6 +323,7 @@ impl Operation for PutOp { htl, target, sender, + origin, } => { // Get the contract key and check if we should handle it let key = contract.key(); @@ -345,6 +350,7 @@ impl Operation for PutOp { *id, new_htl, HashSet::from([sender.peer.clone()]), + origin.clone(), ) .await } else { @@ -406,6 +412,7 @@ impl Operation for PutOp { last_hop, op_manager, self.state, + origin.clone(), (broadcast_to, sender.clone()), key, (contract.clone(), value.clone()), @@ -425,6 +432,7 @@ impl Operation for PutOp { new_value, contract, sender, + origin, .. } => { // Get own location @@ -457,6 +465,7 @@ impl Operation for PutOp { false, op_manager, self.state, + origin.clone(), (broadcast_to, sender.clone()), *key, (contract.clone(), updated_value), @@ -478,6 +487,7 @@ impl Operation for PutOp { new_value, contract, upstream, + origin, .. } => { // Get own location and initialize counter @@ -502,6 +512,7 @@ impl Operation for PutOp { target: upstream.clone(), key: *key, sender: sender.clone(), + origin: origin.clone(), }; tracing::trace!( @@ -526,6 +537,7 @@ impl Operation for PutOp { key: *key, new_value: new_value.clone(), sender: sender.clone(), + origin: origin.clone(), contract: contract.clone(), target: peer.clone(), }; @@ -582,6 +594,7 @@ impl Operation for PutOp { contract, state, subscribe, + origin: state_origin, }) => { tracing::debug!( tx = %id, @@ -657,19 +670,22 @@ impl Operation for PutOp { } } + let local_peer = op_manager.ring.connection_manager.own_location(); + // Forward success message upstream if needed - if let Some(upstream) = upstream { + if let Some(upstream_peer) = upstream.clone() { tracing::trace!( tx = %id, %key, - upstream = %upstream.peer, + upstream = %upstream_peer.peer, "PutOp::process_message: Forwarding SuccessfulPut upstream" ); return_msg = Some(PutMsg::SuccessfulPut { id: *id, - target: upstream, + target: upstream_peer, key, - sender: op_manager.ring.connection_manager.own_location(), + sender: local_peer.clone(), + origin: state_origin.clone(), }); } else { tracing::trace!( @@ -679,6 +695,34 @@ impl Operation for PutOp { ); return_msg = None; } + + // Send a direct acknowledgement to the original requester if we are not it + if state_origin.peer != local_peer.peer + && !upstream + .as_ref() + .map(|u| u.peer == state_origin.peer) + .unwrap_or(false) + { + let direct_ack = PutMsg::SuccessfulPut { + id: *id, + target: state_origin.clone(), + key, + sender: local_peer, + origin: state_origin.clone(), + }; + + if let Err(err) = conn_manager + .send(&state_origin.peer, NetMessage::from(direct_ack)) + .await + { + tracing::warn!( + tx = %id, + %key, + origin_peer = %state_origin.peer, + "Failed to send direct SuccessfulPut to origin: {err}" + ); + } + } } Some(PutState::Finished { .. }) => { // Operation already completed - this is a duplicate SuccessfulPut message @@ -700,6 +744,7 @@ impl Operation for PutOp { htl, sender, skip_list, + origin, .. } => { // Get contract key and own location @@ -747,6 +792,7 @@ impl Operation for PutOp { *id, new_htl, new_skip_list.clone(), + origin.clone(), ) .await; @@ -815,6 +861,7 @@ impl Operation for PutOp { last_hop, op_manager, self.state, + origin.clone(), (broadcast_to, sender.clone()), key, (contract.clone(), new_value.clone()), @@ -868,11 +915,13 @@ fn build_op_result( }) } +#[allow(clippy::too_many_arguments)] async fn try_to_broadcast( id: Transaction, last_hop: bool, op_manager: &OpManager, state: Option, + origin: PeerKeyLocation, (broadcast_to, upstream): (Vec, PeerKeyLocation), key: ContractKey, (contract, new_value): (ContractContainer, WrappedState), @@ -885,6 +934,14 @@ async fn try_to_broadcast( _ => false, }; + let preserved_upstream = match &state { + Some(PutState::AwaitingResponse { + upstream: Some(existing), + .. + }) => Some(existing.clone()), + _ => None, + }; + match state { // Handle initiating node that's also the target (single node or targeting self) Some(PutState::AwaitingResponse { @@ -923,12 +980,16 @@ async fn try_to_broadcast( key ); // means the whole tx finished so can return early + let upstream_for_completion = preserved_upstream + .clone() + .or_else(|| Some(upstream.clone())); new_state = Some(PutState::AwaitingResponse { key, - upstream: Some(upstream), + upstream: upstream_for_completion, contract: contract.clone(), // No longer optional state: new_value.clone(), subscribe, + origin: origin.clone(), }); return_msg = None; } else if !broadcast_to.is_empty() { @@ -943,6 +1004,7 @@ async fn try_to_broadcast( contract, upstream, sender: op_manager.ring.connection_manager.own_location(), + origin: origin.clone(), }); let op = PutOp { @@ -960,6 +1022,7 @@ async fn try_to_broadcast( target: upstream, key, sender: op_manager.ring.connection_manager.own_location(), + origin, }); } } @@ -1019,6 +1082,7 @@ pub(crate) fn start_op_with_id( } #[derive(Debug)] +#[allow(clippy::large_enum_variant)] pub enum PutState { ReceivedRequest, /// Preparing request for put op. @@ -1036,6 +1100,7 @@ pub enum PutState { contract: ContractContainer, state: WrappedState, subscribe: bool, + origin: PeerKeyLocation, }, /// Broadcasting changes to subscribers. BroadcastOngoing, @@ -1116,6 +1181,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re contract: contract.clone(), state: updated_value.clone(), subscribe, + origin: own_location.clone(), }); // Create a SuccessfulPut message to trigger the completion handling @@ -1124,6 +1190,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re target: own_location.clone(), key, sender: own_location.clone(), + origin: own_location.clone(), }; // Use notify_op_change to trigger the completion handling @@ -1142,6 +1209,7 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re false, op_manager, broadcast_state, + own_location.clone(), (broadcast_to, sender), key, (contract.clone(), updated_value), @@ -1206,12 +1274,14 @@ pub(crate) async fn request_put(op_manager: &OpManager, mut put_op: PutOp) -> Re contract: contract.clone(), state: updated_value.clone(), subscribe, + origin: own_location.clone(), }); // Create RequestPut message and forward to target peer let msg = PutMsg::RequestPut { id, - sender: own_location, + sender: own_location.clone(), + origin: own_location, contract, related_contracts, value: updated_value, @@ -1271,6 +1341,7 @@ async fn put_contract( /// It returns whether this peer should be storing the contract or not. /// /// This operation is "fire and forget" and the node does not keep track if is successful or not. +#[allow(clippy::too_many_arguments)] async fn forward_put( op_manager: &OpManager, conn_manager: &CB, @@ -1279,6 +1350,7 @@ async fn forward_put( id: Transaction, htl: usize, skip_list: HashSet, + origin: PeerKeyLocation, ) -> bool where CB: NetworkBridge, @@ -1336,6 +1408,7 @@ where id, sender: own_pkloc, target: peer.clone(), + origin, contract: contract.clone(), new_value: new_value.clone(), htl, @@ -1375,6 +1448,7 @@ mod messages { RequestPut { id: Transaction, sender: PeerKeyLocation, + origin: PeerKeyLocation, contract: ContractContainer, #[serde(deserialize_with = "RelatedContracts::deser_related_contracts")] related_contracts: RelatedContracts<'static>, @@ -1390,6 +1464,7 @@ mod messages { id: Transaction, sender: PeerKeyLocation, target: PeerKeyLocation, + origin: PeerKeyLocation, contract: ContractContainer, new_value: WrappedState, /// current htl, reduced by one at each hop @@ -1402,12 +1477,14 @@ mod messages { target: PeerKeyLocation, key: ContractKey, sender: PeerKeyLocation, + origin: PeerKeyLocation, }, /// Target the node which is closest to the key SeekNode { id: Transaction, sender: PeerKeyLocation, target: PeerKeyLocation, + origin: PeerKeyLocation, value: WrappedState, contract: ContractContainer, #[serde(deserialize_with = "RelatedContracts::deser_related_contracts")] @@ -1425,11 +1502,13 @@ mod messages { contract: ContractContainer, upstream: PeerKeyLocation, sender: PeerKeyLocation, + origin: PeerKeyLocation, }, /// Broadcasting a change to a peer, which then will relay the changes to other peers. BroadcastTo { id: Transaction, sender: PeerKeyLocation, + origin: PeerKeyLocation, key: ContractKey, new_value: WrappedState, contract: ContractContainer, diff --git a/crates/core/src/operations/subscribe.rs b/crates/core/src/operations/subscribe.rs index 9963fc8bf..c8fab8952 100644 --- a/crates/core/src/operations/subscribe.rs +++ b/crates/core/src/operations/subscribe.rs @@ -3,10 +3,11 @@ use std::future::Future; use std::pin::Pin; pub(crate) use self::messages::SubscribeMsg; -use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; +use super::{get, OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult}; use crate::node::IsOperationCompleted; use crate::{ client_events::HostResult, + contract::{ContractHandlerEvent, StoreResponse}, message::{InnerMessage, NetMessage, Transaction}, node::{NetworkBridge, OpManager, PeerId}, ring::{Location, PeerKeyLocation, RingError}, @@ -16,9 +17,79 @@ use freenet_stdlib::{ prelude::*, }; use serde::{Deserialize, Serialize}; +use tokio::time::{sleep, Duration}; const MAX_RETRIES: usize = 10; +const LOCAL_FETCH_TIMEOUT_MS: u64 = 1_500; +const LOCAL_FETCH_POLL_INTERVAL_MS: u64 = 25; + +fn subscribers_snapshot(op_manager: &OpManager, key: &ContractKey) -> Vec { + op_manager + .ring + .subscribers_of(key) + .map(|subs| { + subs.iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>() + }) + .unwrap_or_default() +} + +/// Poll local storage for a short period until the fetched contract becomes available. +async fn wait_for_local_contract( + op_manager: &OpManager, + key: ContractKey, +) -> Result { + let mut elapsed = 0; + while elapsed < LOCAL_FETCH_TIMEOUT_MS { + if super::has_contract(op_manager, key).await? { + return Ok(true); + } + sleep(Duration::from_millis(LOCAL_FETCH_POLL_INTERVAL_MS)).await; + elapsed += LOCAL_FETCH_POLL_INTERVAL_MS; + } + Ok(false) +} + +async fn fetch_contract_if_missing( + op_manager: &OpManager, + key: ContractKey, +) -> Result<(), OpError> { + if has_contract_with_code(op_manager, key).await? { + return Ok(()); + } + let get_op = get::start_op(key, true, false); + get::request_get(op_manager, get_op, HashSet::new()).await?; + + if wait_for_local_contract(op_manager, key).await? + && has_contract_with_code(op_manager, key).await? + { + Ok(()) + } else { + Err(RingError::NoCachingPeers(key).into()) + } +} + +async fn has_contract_with_code(op_manager: &OpManager, key: ContractKey) -> Result { + match op_manager + .notify_contract_handler(ContractHandlerEvent::GetQuery { + key, + return_contract_code: true, + }) + .await? + { + ContractHandlerEvent::GetResponse { + response: + Ok(StoreResponse { + state: Some(_), + contract: Some(_), + }), + .. + } => Ok(true), + _ => Ok(false), + } +} #[derive(Debug)] enum SubscribeState { /// Prepare the request to subscribe. @@ -72,57 +143,79 @@ pub(crate) async fn request_subscribe( sub_op: SubscribeOp, ) -> Result<(), OpError> { if let Some(SubscribeState::PrepareRequest { id, key }) = &sub_op.state { + let own_loc = op_manager.ring.connection_manager.own_location(); + let local_has_contract = super::has_contract(op_manager, *key).await?; + + tracing::debug!( + tx = %id, + %key, + subscriber_peer = %own_loc.peer, + local_has_contract, + "subscribe: request_subscribe invoked" + ); + + let mut skip_list: HashSet = HashSet::new(); + skip_list.insert(own_loc.peer.clone()); + // Use k_closest_potentially_caching to try multiple candidates - const EMPTY: &[PeerId] = &[]; // Try up to 3 candidates - let candidates = op_manager.ring.k_closest_potentially_caching(key, EMPTY, 3); + let candidates = op_manager + .ring + .k_closest_potentially_caching(key, &skip_list, 3); + + if tracing::enabled!(tracing::Level::INFO) { + let skip_display: Vec = skip_list + .iter() + .map(|peer| format!("{:.8}", peer)) + .collect(); + let candidate_display: Vec = candidates + .iter() + .map(|cand| format!("{:.8}", cand.peer)) + .collect(); + tracing::info!( + tx = %id, + %key, + skip = ?skip_display, + candidates = ?candidate_display, + "subscribe: k_closest_potentially_caching results" + ); + } let target = match candidates.first() { Some(peer) => peer.clone(), None => { - // No remote peers available - check if we have the contract locally - tracing::debug!(%key, "No remote peers available for subscription, checking locally"); - - if super::has_contract(op_manager, *key).await? { - // We have the contract locally - register subscription and complete immediately - tracing::info!(%key, tx = %id, "Contract available locally, registering local subscription"); - - // CRITICAL FIX for issue #2001: Register subscriber in DashMap before completing - // Without this, UPDATE operations won't find subscribers for locally-cached contracts - let subscriber = op_manager.ring.connection_manager.own_location(); - if op_manager + // No remote peers available - rely on local contract if present. + tracing::debug!( + %key, + "No remote peers available for subscription, checking locally" + ); + + if local_has_contract { + tracing::info!( + %key, + tx = %id, + "No remote peers, fulfilling subscription locally" + ); + return complete_local_subscription(op_manager, *id, *key).await; + } else { + let connection_count = op_manager.ring.connection_manager.num_connections(); + let subscribers = op_manager .ring - .add_subscriber(key, subscriber.clone()) - .is_err() - { - tracing::error!(%key, tx = %id, "Failed to add local subscriber - max subscribers reached"); - // Continue anyway - client requested subscription and contract is local - } else { - tracing::debug!(%key, tx = %id, subscriber = %subscriber.peer, "Successfully registered local subscriber"); - } - - match op_manager - .notify_node_event(crate::message::NodeEvent::LocalSubscribeComplete { - tx: *id, - key: *key, - subscribed: true, + .subscribers_of(key) + .map(|subs| { + subs.value() + .iter() + .map(|loc| format!("{:.8}", loc.peer)) + .collect::>() }) - .await - { - Ok(()) => { - tracing::debug!(%key, tx = %id, "sent LocalSubscribeComplete event") - } - Err(e) => { - tracing::error!(%key, tx = %id, error = %e, "failed to send LocalSubscribeComplete event") - } - } - - // Mark subscription as completed for atomicity tracking - op_manager.completed(*id); - - return Ok(()); - } else { - tracing::debug!(%key, "Contract not available locally and no remote peers"); + .unwrap_or_default(); + tracing::warn!( + %key, + tx = %id, + connection_count, + subscribers = ?subscribers, + "Contract not available locally and no remote peers" + ); return Err(RingError::NoCachingPeers(*key).into()); } } @@ -130,15 +223,23 @@ pub(crate) async fn request_subscribe( // Forward to remote peer let new_state = Some(SubscribeState::AwaitingResponse { - skip_list: vec![].into_iter().collect(), + skip_list, retries: 0, current_hop: op_manager.ring.max_hops_to_live, upstream_subscriber: None, }); + tracing::debug!( + tx = %id, + %key, + target_peer = %target.peer, + target_location = ?target.location, + "subscribe: forwarding RequestSub to target peer" + ); let msg = SubscribeMsg::RequestSub { id: *id, key: *key, target, + subscriber: own_loc.clone(), }; let op = SubscribeOp { id: *id, @@ -154,6 +255,38 @@ pub(crate) async fn request_subscribe( Ok(()) } +async fn complete_local_subscription( + op_manager: &OpManager, + id: Transaction, + key: ContractKey, +) -> Result<(), OpError> { + let subscriber = op_manager.ring.connection_manager.own_location(); + if let Err(err) = op_manager.ring.add_subscriber(&key, subscriber.clone()) { + tracing::warn!( + %key, + tx = %id, + subscriber = %subscriber.peer, + error = ?err, + "Failed to register local subscriber" + ); + } else { + tracing::debug!( + %key, + tx = %id, + subscriber = %subscriber.peer, + "Registered local subscriber" + ); + } + + op_manager + .notify_node_event(crate::message::NodeEvent::LocalSubscribeComplete { + tx: id, + key, + subscribed: true, + }) + .await +} + pub(crate) struct SubscribeOp { pub id: Transaction, state: Option, @@ -240,21 +373,138 @@ impl Operation for SubscribeOp { let new_state; match input { - SubscribeMsg::RequestSub { id, key, target } => { - // fast tracked from the request_sub func - debug_assert!(matches!( + SubscribeMsg::RequestSub { + id, + key, + target: _, + subscriber, + } => { + tracing::debug!( + tx = %id, + %key, + subscriber = %subscriber.peer, + "subscribe: processing RequestSub" + ); + let own_loc = op_manager.ring.connection_manager.own_location(); + + if !matches!( self.state, Some(SubscribeState::AwaitingResponse { .. }) - )); - let sender = op_manager.ring.connection_manager.own_location(); + | Some(SubscribeState::ReceivedRequest) + ) { + tracing::warn!( + tx = %id, + %key, + state = ?self.state, + "subscribe: RequestSub received in unexpected state" + ); + return Err(OpError::invalid_transition(self.id)); + } + + if super::has_contract(op_manager, *key).await? { + let before_direct = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_before = ?before_direct, + "subscribe: handling RequestSub locally (contract available)" + ); + + if op_manager + .ring + .add_subscriber(key, subscriber.clone()) + .is_err() + { + tracing::warn!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_before = ?before_direct, + "subscribe: direct registration failed (max subscribers reached)" + ); + return Ok(OperationResult { + return_msg: Some(NetMessage::from(SubscribeMsg::ReturnSub { + id: *id, + key: *key, + sender: own_loc.clone(), + target: subscriber.clone(), + subscribed: false, + })), + state: None, + }); + } + + let after_direct = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_after = ?after_direct, + "subscribe: registered direct subscriber (RequestSub)" + ); + + if subscriber.peer == own_loc.peer { + tracing::debug!( + tx = %id, + %key, + "RequestSub originated locally; sending LocalSubscribeComplete" + ); + if let Err(err) = op_manager + .notify_node_event( + crate::message::NodeEvent::LocalSubscribeComplete { + tx: *id, + key: *key, + subscribed: true, + }, + ) + .await + { + tracing::error!( + tx = %id, + %key, + error = %err, + "Failed to send LocalSubscribeComplete event for RequestSub" + ); + return Err(err); + } + + return build_op_result(self.id, None, None); + } + + let return_msg = SubscribeMsg::ReturnSub { + id: *id, + key: *key, + sender: own_loc.clone(), + target: subscriber.clone(), + subscribed: true, + }; + + return build_op_result(self.id, None, Some(return_msg)); + } + + let mut skip = HashSet::new(); + skip.insert(subscriber.peer.clone()); + skip.insert(own_loc.peer.clone()); + + let forward_target = op_manager + .ring + .k_closest_potentially_caching(key, &skip, 3) + .into_iter() + .find(|candidate| candidate.peer != own_loc.peer) + .ok_or_else(|| RingError::NoCachingPeers(*key)) + .map_err(OpError::from)?; + + skip.insert(forward_target.peer.clone()); + new_state = self.state; return_msg = Some(SubscribeMsg::SeekNode { id: *id, key: *key, - target: target.clone(), - subscriber: sender.clone(), - skip_list: HashSet::from([sender.peer]), - htl: op_manager.ring.max_hops_to_live, + target: forward_target, + subscriber: subscriber.clone(), + skip_list: skip.clone(), + htl: op_manager.ring.max_hops_to_live.max(1), retries: 0, }); } @@ -267,6 +517,8 @@ impl Operation for SubscribeOp { htl, retries, } => { + let ring_max_htl = op_manager.ring.max_hops_to_live.max(1); + let htl = (*htl).min(ring_max_htl); let this_peer = op_manager.ring.connection_manager.own_location(); let return_not_subbed = || -> OperationResult { OperationResult { @@ -281,6 +533,16 @@ impl Operation for SubscribeOp { } }; + if htl == 0 { + tracing::warn!( + tx = %id, + %key, + subscriber = %subscriber.peer, + "Dropping Subscribe SeekNode with zero HTL" + ); + return Ok(return_not_subbed()); + } + if !super::has_contract(op_manager, *key).await? { tracing::debug!(tx = %id, %key, "Contract not found, trying other peer"); @@ -288,53 +550,133 @@ impl Operation for SubscribeOp { let candidates = op_manager .ring .k_closest_potentially_caching(key, skip_list, 3); - let Some(new_target) = candidates.first() else { - tracing::warn!(tx = %id, %key, "No remote peer available for forwarding"); - return Ok(return_not_subbed()); - }; - let new_target = new_target.clone(); - let new_htl = htl - 1; + if candidates.is_empty() { + let connection_count = + op_manager.ring.connection_manager.num_connections(); + tracing::warn!( + tx = %id, + %key, + skip = ?skip_list, + connection_count, + "No remote peer available for forwarding" + ); + tracing::info!( + tx = %id, + %key, + "Attempting to fetch contract locally before aborting subscribe" + ); - if new_htl == 0 { - tracing::debug!(tx = %id, %key, "Max number of hops reached while trying to get contract"); - return Ok(return_not_subbed()); - } + let get_op = get::start_op(*key, true, false); + if let Err(fetch_err) = + get::request_get(op_manager, get_op, HashSet::new()).await + { + tracing::warn!( + tx = %id, + %key, + error = %fetch_err, + "Failed to fetch contract locally while handling subscribe" + ); + return Ok(return_not_subbed()); + } - let mut new_skip_list = skip_list.clone(); - new_skip_list.insert(target.peer.clone()); - - tracing::debug!(tx = %id, new_target = %new_target.peer, "Forward request to peer"); - // Retry seek node when the contract to subscribe has not been found in this node - return build_op_result( - *id, - Some(SubscribeState::AwaitingResponse { - skip_list: new_skip_list.clone(), - retries: *retries, - current_hop: new_htl, - upstream_subscriber: Some(subscriber.clone()), - }), - (SubscribeMsg::SeekNode { - id: *id, - key: *key, - subscriber: this_peer, - target: new_target, - skip_list: new_skip_list, - htl: new_htl, - retries: *retries, - }) - .into(), - ); + if wait_for_local_contract(op_manager, *key).await? { + tracing::info!( + tx = %id, + %key, + "Fetched contract locally while handling subscribe" + ); + } else { + tracing::warn!( + tx = %id, + %key, + "Contract still unavailable locally after fetch attempt" + ); + return Ok(return_not_subbed()); + } + } else { + let Some(new_target) = candidates.first() else { + return Ok(return_not_subbed()); + }; + let new_target = new_target.clone(); + let new_htl = htl.saturating_sub(1); + + if new_htl == 0 { + tracing::debug!(tx = %id, %key, "Max number of hops reached while trying to get contract"); + return Ok(return_not_subbed()); + } + + let mut new_skip_list = skip_list.clone(); + new_skip_list.insert(target.peer.clone()); + + tracing::info!( + tx = %id, + %key, + new_target = %new_target.peer, + upstream = %subscriber.peer, + "Forward request to peer" + ); + tracing::debug!( + tx = %id, + %key, + candidates = ?candidates, + skip = ?new_skip_list, + "Forwarding seek to next candidate" + ); + // Retry seek node when the contract to subscribe has not been found in this node + return build_op_result( + *id, + Some(SubscribeState::AwaitingResponse { + skip_list: new_skip_list.clone(), + retries: *retries, + current_hop: new_htl, + upstream_subscriber: Some(subscriber.clone()), + }), + (SubscribeMsg::SeekNode { + id: *id, + key: *key, + subscriber: this_peer, + target: new_target, + skip_list: new_skip_list, + htl: new_htl, + retries: *retries, + }) + .into(), + ); + } + // After fetch attempt we should now have the contract locally. } + let before_direct = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_before = ?before_direct, + "subscribe: attempting to register direct subscriber" + ); if op_manager .ring .add_subscriber(key, subscriber.clone()) .is_err() { - tracing::debug!(tx = %id, %key, "Max number of subscribers reached for contract"); + tracing::warn!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_before = ?before_direct, + "subscribe: direct registration failed (max subscribers reached)" + ); // max number of subscribers for this contract reached return Ok(return_not_subbed()); } + let after_direct = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + subscriber = %subscriber.peer, + subscribers_after = ?after_direct, + "subscribe: registered direct subscriber" + ); match self.state { Some(SubscribeState::ReceivedRequest) => { @@ -426,6 +768,8 @@ impl Operation for SubscribeOp { upstream_subscriber, .. }) => { + fetch_contract_if_missing(op_manager, *key).await?; + tracing::info!( tx = %id, %key, @@ -433,6 +777,56 @@ impl Operation for SubscribeOp { provider = %sender.peer, "Subscribed to contract" ); + tracing::info!( + tx = %id, + %key, + upstream = upstream_subscriber + .as_ref() + .map(|loc| format!("{:.8}", loc.peer)) + .unwrap_or_else(|| "".into()), + "Handling ReturnSub (subscribed=true)" + ); + if let Some(upstream_subscriber) = upstream_subscriber.as_ref() { + let before_upstream = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + upstream = %upstream_subscriber.peer, + subscribers_before = ?before_upstream, + "subscribe: attempting to register upstream link" + ); + if op_manager + .ring + .add_subscriber(key, upstream_subscriber.clone()) + .is_err() + { + tracing::warn!( + tx = %id, + %key, + upstream = %upstream_subscriber.peer, + subscribers_before = ?before_upstream, + "subscribe: upstream registration failed (max subscribers reached)" + ); + } else { + let after_upstream = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + upstream = %upstream_subscriber.peer, + subscribers_after = ?after_upstream, + "subscribe: registered upstream link" + ); + } + } + + let before_provider = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + provider = %sender.peer, + subscribers_before = ?before_provider, + "subscribe: registering provider/subscription source" + ); if op_manager.ring.add_subscriber(key, sender.clone()).is_err() { // concurrently it reached max number of subscribers for this contract tracing::debug!( @@ -442,6 +836,14 @@ impl Operation for SubscribeOp { ); return Err(OpError::UnexpectedOpState); } + let after_provider = subscribers_snapshot(op_manager, key); + tracing::info!( + tx = %id, + %key, + provider = %sender.peer, + subscribers_after = ?after_provider, + "subscribe: registered provider/subscription source" + ); new_state = Some(SubscribeState::Completed { key: *key }); if let Some(upstream_subscriber) = upstream_subscriber { @@ -518,6 +920,7 @@ mod messages { id: Transaction, key: ContractKey, target: PeerKeyLocation, + subscriber: PeerKeyLocation, }, SeekNode { id: Transaction, @@ -549,6 +952,7 @@ mod messages { fn target(&self) -> Option> { match self { + Self::RequestSub { target, .. } => Some(target), Self::SeekNode { target, .. } => Some(target), Self::ReturnSub { target, .. } => Some(target), _ => None, diff --git a/crates/core/src/router/isotonic_estimator.rs b/crates/core/src/router/isotonic_estimator.rs index 9a82ba228..a02cb8034 100644 --- a/crates/core/src/router/isotonic_estimator.rs +++ b/crates/core/src/router/isotonic_estimator.rs @@ -217,6 +217,7 @@ impl Adjustment { mod tests { use super::*; + use tracing::debug; // This test `test_peer_time_estimator` checks the accuracy of the `RoutingOutcomeEstimator` struct's // `estimate_retrieval_time()` method. It generates a list of 100 random events, where each event @@ -239,7 +240,7 @@ mod tests { for _ in 0..100 { let peer = PeerKeyLocation::random(); if peer.location.is_none() { - println!("Peer location is none for {peer:?}"); + debug!("Peer location is none for {peer:?}"); } let contract_location = Location::random(); events.push(simulate_positive_request(peer, contract_location)); @@ -265,7 +266,7 @@ mod tests { // Check that the errors are small let average_error = errors.iter().sum::() / errors.len() as f64; - println!("Average error: {average_error}"); + debug!("Average error: {average_error}"); assert!(average_error < 0.01); } @@ -276,7 +277,7 @@ mod tests { for _ in 0..100 { let peer = PeerKeyLocation::random(); if peer.location.is_none() { - println!("Peer location is none for {peer:?}"); + debug!("Peer location is none for {peer:?}"); } let contract_location = Location::random(); events.push(simulate_negative_request(peer, contract_location)); @@ -302,7 +303,7 @@ mod tests { // Check that the errors are small let average_error = errors.iter().sum::() / errors.len() as f64; - println!("Average error: {average_error}"); + debug!("Average error: {average_error}"); assert!(average_error < 0.01); } diff --git a/crates/core/src/router/mod.rs b/crates/core/src/router/mod.rs index ba459df22..f5749154b 100644 --- a/crates/core/src/router/mod.rs +++ b/crates/core/src/router/mod.rs @@ -1,7 +1,7 @@ mod isotonic_estimator; mod util; -use crate::ring::{Location, PeerKeyLocation}; +use crate::ring::{Distance, Location, PeerKeyLocation}; use isotonic_estimator::{EstimatorType, IsotonicEstimator, IsotonicEvent}; use serde::{Deserialize, Serialize}; use std::time::Duration; @@ -162,9 +162,12 @@ impl Router { let mut peer_distances: Vec<_> = peers .into_iter() - .filter_map(|peer| { - peer.location - .map(|loc| (peer, target_location.distance(loc))) + .map(|peer| { + let distance = peer + .location + .map(|loc| target_location.distance(loc)) + .unwrap_or_else(|| Distance::new(0.5)); + (peer, distance) }) .collect(); @@ -203,8 +206,10 @@ impl Router { let mut peer_distances: Vec<_> = peers .into_iter() .filter_map(|peer| { - peer.location - .map(|loc| (peer, target_location.distance(loc))) + peer.location.map(|loc| { + let distance = target_location.distance(loc); + (peer, distance) + }) }) .collect(); diff --git a/crates/core/src/topology/request_density_tracker.rs b/crates/core/src/topology/request_density_tracker.rs index df56efa01..4820c694c 100644 --- a/crates/core/src/topology/request_density_tracker.rs +++ b/crates/core/src/topology/request_density_tracker.rs @@ -248,6 +248,7 @@ pub(crate) enum DensityMapError { mod tests { use super::*; use std::sync::RwLock; + use tracing::debug; #[test] fn test_create_density_map() { @@ -327,12 +328,12 @@ mod tests { let result = result.unwrap(); // Scan and dumb densities 0.0 to 1.0 at 0.01 intervals - println!("Location\tDensity"); + debug!("Location\tDensity"); for i in 0..100 { let location = Location::new(i as f64 / 100.0); let density = result.get_density_at(location).unwrap(); // Print and round density to 2 decimals - println!( + debug!( "{}\t{}", location.as_f64(), (density * 100.0).round() / 100.0 diff --git a/crates/core/src/tracing/mod.rs b/crates/core/src/tracing/mod.rs index bde43deda..d2c2b7133 100644 --- a/crates/core/src/tracing/mod.rs +++ b/crates/core/src/tracing/mod.rs @@ -241,6 +241,7 @@ impl<'a> NetEventLog<'a> { target, key, sender, + .. }) => EventKind::Put(PutEvent::PutSuccess { id: *id, requester: sender.clone(), diff --git a/scripts/deploy-local-gateway.sh b/scripts/deploy-local-gateway.sh index a731dcdd9..3da4c8b30 100755 --- a/scripts/deploy-local-gateway.sh +++ b/scripts/deploy-local-gateway.sh @@ -249,7 +249,8 @@ start_service() { case "$SERVICE_MANAGER" in systemd) - if systemctl list-unit-files | grep -q "^$service_arg.service" 2>/dev/null; then + # Check if unit file exists by querying systemctl directly + if systemctl list-unit-files "$service_arg.service" 2>/dev/null | grep -q "$service_arg.service"; then echo -n " Starting systemd service ($service_arg)... " if [[ "$DRY_RUN" == "true" ]]; then echo "[DRY RUN]" @@ -294,7 +295,8 @@ verify_service() { case "$SERVICE_MANAGER" in systemd) - if systemctl list-unit-files | grep -q "^$service_arg.service" 2>/dev/null; then + # Check if unit file exists by querying systemctl directly + if systemctl list-unit-files "$service_arg.service" 2>/dev/null | grep -q "$service_arg.service"; then echo -n " Verifying service status ($service_arg)... " sleep 2 # Give service time to start if systemctl is-active --quiet "$service_arg.service"; then