Skip to content

Commit 61947f6

Browse files
authored
Merge branch 'stack/subscription-routing' into stack/connect-transport-rewrite
2 parents 613e7c0 + de0aaa9 commit 61947f6

File tree

2 files changed

+49
-3
lines changed

2 files changed

+49
-3
lines changed

crates/core/src/operations/put.rs

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -581,6 +581,11 @@ impl Operation for PutOp {
581581
return_msg = None;
582582
}
583583
PutMsg::SuccessfulPut { id, .. } => {
584+
tracing::debug!(
585+
tx = %id,
586+
current_state = ?self.state,
587+
"PutOp::process_message: handling SuccessfulPut"
588+
);
584589
match self.state {
585590
Some(PutState::AwaitingResponse {
586591
key,
@@ -665,13 +670,24 @@ impl Operation for PutOp {
665670

666671
// Forward success message upstream if needed
667672
if let Some(upstream) = upstream {
673+
tracing::trace!(
674+
tx = %id,
675+
%key,
676+
upstream = %upstream.peer,
677+
"PutOp::process_message: Forwarding SuccessfulPut upstream"
678+
);
668679
return_msg = Some(PutMsg::SuccessfulPut {
669680
id: *id,
670681
target: upstream,
671682
key,
672683
sender: op_manager.ring.connection_manager.own_location(),
673684
});
674685
} else {
686+
tracing::trace!(
687+
tx = %id,
688+
%key,
689+
"PutOp::process_message: SuccessfulPut originated locally; no upstream"
690+
);
675691
return_msg = None;
676692
}
677693
}
@@ -894,6 +910,14 @@ async fn try_to_broadcast(
894910
_ => false,
895911
};
896912

913+
let preserved_upstream = match &state {
914+
Some(PutState::AwaitingResponse {
915+
upstream: Some(existing),
916+
..
917+
}) => Some(existing.clone()),
918+
_ => None,
919+
};
920+
897921
match state {
898922
// Handle initiating node that's also the target (single node or targeting self)
899923
Some(PutState::AwaitingResponse {
@@ -932,9 +956,12 @@ async fn try_to_broadcast(
932956
key
933957
);
934958
// means the whole tx finished so can return early
959+
let upstream_for_completion = preserved_upstream
960+
.clone()
961+
.or_else(|| Some(upstream.clone()));
935962
new_state = Some(PutState::AwaitingResponse {
936963
key,
937-
upstream: Some(upstream),
964+
upstream: upstream_for_completion,
938965
contract: contract.clone(), // No longer optional
939966
state: new_value.clone(),
940967
subscribe,

crates/core/src/operations/update.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -786,10 +786,29 @@ impl OpManager {
786786
.subscribers_of(key)
787787
.map(|subs| {
788788
let self_peer = self.ring.connection_manager.get_peer_key();
789+
let allow_self = self_peer.as_ref().map(|me| me == sender).unwrap_or(false);
789790
subs.value()
790791
.iter()
791-
.filter(|pk| &pk.peer != sender)
792-
.filter(|pk| self_peer.as_ref().map(|me| &pk.peer != me).unwrap_or(true))
792+
.filter(|pk| {
793+
// Allow the sender to remain in the broadcast list when we're the sender,
794+
// so local auto-subscribe via GET/PUT still receives notifications.
795+
if &pk.peer == sender {
796+
allow_self
797+
} else {
798+
true
799+
}
800+
})
801+
.filter(|pk| {
802+
if let Some(self_peer) = &self_peer {
803+
if &pk.peer == self_peer {
804+
allow_self
805+
} else {
806+
true
807+
}
808+
} else {
809+
true
810+
}
811+
})
793812
.cloned()
794813
.collect::<Vec<_>>()
795814
})

0 commit comments

Comments
 (0)