Skip to content

Commit 4a69387

Browse files
sanityiduartgomez
authored andcommitted
fix(update): remove GET fallback propagation
1 parent da01c02 commit 4a69387

File tree

1 file changed

+42
-175
lines changed

1 file changed

+42
-175
lines changed

crates/core/src/operations/update.rs

Lines changed: 42 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use freenet_stdlib::client_api::{ErrorKind, HostResponse};
44
use freenet_stdlib::prelude::*;
55

66
pub(crate) use self::messages::UpdateMsg;
7-
use super::{get, OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult};
7+
use super::{OpEnum, OpError, OpInitialization, OpOutcome, Operation, OperationResult};
88
use crate::contract::{ContractHandlerEvent, StoreResponse};
99
use crate::message::{InnerMessage, NetMessage, NodeEvent, Transaction};
1010
use crate::node::IsOperationCompleted;
@@ -13,7 +13,6 @@ use crate::{
1313
client_events::HostResult,
1414
node::{NetworkBridge, OpManager, PeerId},
1515
};
16-
use std::collections::HashSet;
1716

1817
pub(crate) struct UpdateOp {
1918
pub id: Transaction,
@@ -249,14 +248,9 @@ impl Operation for UpdateOp {
249248
return_msg = None;
250249
} else {
251250
// Get broadcast targets for propagating UPDATE to subscribers
252-
let mut broadcast_to = op_manager
251+
let broadcast_to = op_manager
253252
.get_broadcast_targets_update(key, &request_sender.peer);
254253

255-
if broadcast_to.is_empty() {
256-
broadcast_to = op_manager
257-
.compute_update_fallback_targets(key, &request_sender.peer);
258-
}
259-
260254
if broadcast_to.is_empty() {
261255
tracing::debug!(
262256
tx = %id,
@@ -298,21 +292,10 @@ impl Operation for UpdateOp {
298292
}
299293
} else {
300294
// Contract not found locally - forward to another peer
301-
let skip_peers = [&self_location.peer, &request_sender.peer];
302-
let next_target = op_manager
303-
.ring
304-
.closest_potentially_caching(key, skip_peers.as_slice())
305-
.or_else(|| {
306-
op_manager
307-
.ring
308-
.k_closest_potentially_caching(
309-
key,
310-
skip_peers.as_slice(),
311-
5,
312-
)
313-
.into_iter()
314-
.next()
315-
});
295+
let next_target = op_manager.ring.closest_potentially_caching(
296+
key,
297+
[&self_location.peer, &request_sender.peer].as_slice(),
298+
);
316299

317300
if let Some(forward_target) = next_target {
318301
tracing::debug!(
@@ -333,6 +316,7 @@ impl Operation for UpdateOp {
333316
});
334317
new_state = None;
335318
} else {
319+
// No peers available and we don't have the contract - capture context
336320
let skip_list = [&self_location.peer, &request_sender.peer];
337321
let subscribers = op_manager
338322
.ring
@@ -352,7 +336,6 @@ impl Operation for UpdateOp {
352336
.collect::<Vec<_>>();
353337
let connection_count =
354338
op_manager.ring.connection_manager.num_connections();
355-
// No peers available and we don't have the contract - error
356339
tracing::error!(
357340
tx = %id,
358341
%key,
@@ -426,14 +409,10 @@ impl Operation for UpdateOp {
426409
return_msg = None;
427410
} else {
428411
// Get broadcast targets
429-
let mut broadcast_to =
412+
let broadcast_to =
430413
op_manager.get_broadcast_targets_update(key, &sender.peer);
431414

432-
if broadcast_to.is_empty() {
433-
broadcast_to =
434-
op_manager.compute_update_fallback_targets(key, &sender.peer);
435-
}
436-
415+
// If no peers to broadcast to, nothing else to do
437416
if broadcast_to.is_empty() {
438417
tracing::debug!(
439418
tx = %id,
@@ -491,100 +470,36 @@ impl Operation for UpdateOp {
491470
});
492471
new_state = None;
493472
} else {
494-
tracing::warn!(
473+
// No more peers to try - capture context for diagnostics
474+
let skip_list = [&sender.peer, &self_location.peer];
475+
let subscribers = op_manager
476+
.ring
477+
.subscribers_of(key)
478+
.map(|subs| {
479+
subs.value()
480+
.iter()
481+
.map(|loc| format!("{:.8}", loc.peer))
482+
.collect::<Vec<_>>()
483+
})
484+
.unwrap_or_default();
485+
let candidates = op_manager
486+
.ring
487+
.k_closest_potentially_caching(key, skip_list.as_slice(), 5)
488+
.into_iter()
489+
.map(|loc| format!("{:.8}", loc.peer))
490+
.collect::<Vec<_>>();
491+
let connection_count =
492+
op_manager.ring.connection_manager.num_connections();
493+
tracing::error!(
495494
tx = %id,
496495
%key,
497-
"No forwarding targets for UPDATE SeekNode - attempting local fetch"
496+
subscribers = ?subscribers,
497+
candidates = ?candidates,
498+
connection_count,
499+
sender = %sender.peer,
500+
"Cannot handle UPDATE SeekNode: contract not found and no peers to forward to"
498501
);
499-
500-
let mut fetch_skip = HashSet::new();
501-
fetch_skip.insert(sender.peer.clone());
502-
503-
let get_op = get::start_op(*key, true, false);
504-
if let Err(fetch_err) =
505-
get::request_get(op_manager, get_op, fetch_skip).await
506-
{
507-
tracing::warn!(
508-
tx = %id,
509-
%key,
510-
error = %fetch_err,
511-
"Failed to fetch contract while handling UPDATE SeekNode"
512-
);
513-
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
514-
}
515-
516-
if super::has_contract(op_manager, *key).await? {
517-
tracing::info!(
518-
tx = %id,
519-
%key,
520-
"Successfully fetched contract locally, applying UPDATE"
521-
);
522-
let UpdateExecution {
523-
value: updated_value,
524-
summary: _summary,
525-
changed,
526-
} = update_contract(
527-
op_manager,
528-
*key,
529-
value.clone(),
530-
related_contracts.clone(),
531-
)
532-
.await?;
533-
534-
if !changed {
535-
tracing::debug!(
536-
tx = %id,
537-
%key,
538-
"Fetched contract apply produced no change during SeekNode fallback"
539-
);
540-
new_state = None;
541-
return_msg = None;
542-
} else {
543-
let mut broadcast_to =
544-
op_manager.get_broadcast_targets_update(key, &sender.peer);
545-
546-
if broadcast_to.is_empty() {
547-
broadcast_to = op_manager
548-
.compute_update_fallback_targets(key, &sender.peer);
549-
}
550-
551-
if broadcast_to.is_empty() {
552-
tracing::debug!(
553-
tx = %id,
554-
%key,
555-
"No broadcast targets after SeekNode fallback apply; finishing locally"
556-
);
557-
new_state = None;
558-
return_msg = None;
559-
} else {
560-
match try_to_broadcast(
561-
*id,
562-
true,
563-
op_manager,
564-
self.state,
565-
(broadcast_to, sender.clone()),
566-
*key,
567-
updated_value.clone(),
568-
false,
569-
)
570-
.await
571-
{
572-
Ok((state, msg)) => {
573-
new_state = state;
574-
return_msg = msg;
575-
}
576-
Err(err) => return Err(err),
577-
}
578-
}
579-
}
580-
} else {
581-
tracing::error!(
582-
tx = %id,
583-
%key,
584-
"Contract still unavailable after fetch attempt during UPDATE SeekNode"
585-
);
586-
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
587-
}
502+
return Err(OpError::RingError(RingError::NoCachingPeers(*key)));
588503
}
589504
}
590505
}
@@ -618,14 +533,9 @@ impl Operation for UpdateOp {
618533
new_state = None;
619534
return_msg = None;
620535
} else {
621-
let mut broadcast_to =
536+
let broadcast_to =
622537
op_manager.get_broadcast_targets_update(key, &sender.peer);
623538

624-
if broadcast_to.is_empty() {
625-
broadcast_to =
626-
op_manager.compute_update_fallback_targets(key, &sender.peer);
627-
}
628-
629539
tracing::debug!(
630540
"Successfully updated a value for contract {} @ {:?} - BroadcastTo - update",
631541
key,
@@ -790,25 +700,16 @@ impl OpManager {
790700
subs.value()
791701
.iter()
792702
.filter(|pk| {
793-
// Allow the sender to remain in the broadcast list when we're the sender,
794-
// so local auto-subscribe via GET/PUT still receives notifications.
795-
if &pk.peer == sender {
703+
// Allow the sender (or ourselves) to stay in the broadcast list when we're
704+
// originating the UPDATE so local auto-subscribes still receive events.
705+
let is_sender = &pk.peer == sender;
706+
let is_self = self_peer.as_ref().map_or(false, |me| &pk.peer == me);
707+
if is_sender || is_self {
796708
allow_self
797709
} else {
798710
true
799711
}
800712
})
801-
.filter(|pk| {
802-
if let Some(self_peer) = &self_peer {
803-
if &pk.peer == self_peer {
804-
allow_self
805-
} else {
806-
true
807-
}
808-
} else {
809-
true
810-
}
811-
})
812713
.cloned()
813714
.collect::<Vec<_>>()
814715
})
@@ -848,40 +749,6 @@ impl OpManager {
848749

849750
subscribers
850751
}
851-
852-
fn compute_update_fallback_targets(
853-
&self,
854-
key: &ContractKey,
855-
sender: &PeerId,
856-
) -> Vec<PeerKeyLocation> {
857-
let mut skip: HashSet<PeerId> = HashSet::new();
858-
skip.insert(sender.clone());
859-
if let Some(self_peer) = self.ring.connection_manager.get_peer_key() {
860-
skip.insert(self_peer);
861-
}
862-
863-
let candidates = self
864-
.ring
865-
.k_closest_potentially_caching(key, &skip, 3)
866-
.into_iter()
867-
.filter(|candidate| &candidate.peer != sender)
868-
.collect::<Vec<_>>();
869-
870-
if !candidates.is_empty() {
871-
tracing::info!(
872-
"UPDATE_PROPAGATION: contract={:.8} from={} using fallback targets={}",
873-
key,
874-
sender,
875-
candidates
876-
.iter()
877-
.map(|c| format!("{:.8}", c.peer))
878-
.collect::<Vec<_>>()
879-
.join(",")
880-
);
881-
}
882-
883-
candidates
884-
}
885752
}
886753

887754
fn build_op_result(

0 commit comments

Comments
 (0)