Skip to content

Commit 9491843

Browse files
committed
refactor(transport): replace handshake pipeline
1 parent 2af32cb commit 9491843

37 files changed

+3231
-4619
lines changed

Cargo.lock

Lines changed: 214 additions & 42 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pav_regression = "0.6.1"
4444
parking_lot = "0.12"
4545
pin-project = "1"
4646
rand = { features = ["small_rng"], workspace = true }
47+
once_cell = "1"
4748
redb = { optional = true, version = "3" }
4849
serde = { features = ["derive", "rc"], workspace = true }
4950
serde_json = { workspace = true }
@@ -76,12 +77,12 @@ opentelemetry_sdk = { optional = true, version = "0.31", features = ["rt-tokio"]
7677

7778
# internal deps
7879
freenet-stdlib = { features = ["net"], workspace = true }
79-
console-subscriber = { version = "0.5.0", optional = true }
80+
console-subscriber = { version = "0.4.1", optional = true }
8081
tokio-stream = "0.1.17"
8182

8283
[target.'cfg(windows)'.dependencies]
8384
winapi = { version = "0.3", features = ["sysinfoapi"] }
84-
wmi = "0.18.0"
85+
wmi = "0.17.3"
8586
serde = { version = "1.0", features = ["derive"] }
8687

8788
[dev-dependencies]

crates/core/src/message.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ where
193193

194194
mod sealed_msg_type {
195195
use super::*;
196+
use crate::operations::connect::ConnectMsg;
196197

197198
pub trait SealedTxType {
198199
fn tx_type_id() -> TransactionTypeId;
@@ -301,7 +302,7 @@ impl Versioned for NetMessage {
301302
impl Versioned for NetMessageV1 {
302303
fn version(&self) -> semver::Version {
303304
match self {
304-
NetMessageV1::Connect(_) => semver::Version::new(1, 0, 0),
305+
NetMessageV1::Connect(_) => semver::Version::new(1, 1, 0),
305306
NetMessageV1::Put(_) => semver::Version::new(1, 0, 0),
306307
NetMessageV1::Get(_) => semver::Version::new(1, 0, 0),
307308
NetMessageV1::Subscribe(_) => semver::Version::new(1, 0, 0),
@@ -363,10 +364,9 @@ pub(crate) enum NodeEvent {
363364
key: ContractKey,
364365
subscribed: bool,
365366
},
366-
/// Send a message to a peer over the network
367-
SendMessage {
368-
target: PeerId,
369-
msg: Box<NetMessage>,
367+
/// Register expectation for an inbound connection from the given peer.
368+
ExpectPeerConnection {
369+
peer: PeerId,
370370
},
371371
}
372372

@@ -444,8 +444,8 @@ impl Display for NodeEvent {
444444
"Local subscribe complete (tx: {tx}, key: {key}, subscribed: {subscribed})"
445445
)
446446
}
447-
NodeEvent::SendMessage { target, msg } => {
448-
write!(f, "SendMessage (to {target}, tx: {})", msg.id())
447+
NodeEvent::ExpectPeerConnection { peer } => {
448+
write!(f, "ExpectPeerConnection (from {peer})")
449449
}
450450
}
451451
}
@@ -486,7 +486,7 @@ impl MessageStats for NetMessageV1 {
486486

487487
fn target(&self) -> Option<PeerKeyLocation> {
488488
match self {
489-
NetMessageV1::Connect(op) => op.target().as_ref().map(|b| b.borrow().clone()),
489+
NetMessageV1::Connect(op) => op.target().cloned(),
490490
NetMessageV1::Put(op) => op.target().as_ref().map(|b| b.borrow().clone()),
491491
NetMessageV1::Get(op) => op.target().as_ref().map(|b| b.borrow().clone()),
492492
NetMessageV1::Subscribe(op) => op.target().as_ref().map(|b| b.borrow().clone()),

crates/core/src/node/mod.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ async fn process_message_v1<CB>(
701701
tx_type = %msg.id().transaction_type()
702702
);
703703
let op_result =
704-
handle_op_request::<connect::ConnectOp, _>(&op_manager, &mut conn_manager, op)
704+
handle_op_request::<ConnectOp, _>(&op_manager, &mut conn_manager, op)
705705
.instrument(span)
706706
.await;
707707

@@ -861,7 +861,7 @@ where
861861
tx_type = %msg.id().transaction_type()
862862
);
863863
let op_result =
864-
handle_op_request::<connect::ConnectOp, _>(&op_manager, &mut conn_manager, op)
864+
handle_op_request::<ConnectOp, _>(&op_manager, &mut conn_manager, op)
865865
.instrument(span)
866866
.await;
867867

@@ -879,7 +879,6 @@ where
879879
}
880880
}
881881

882-
// Pure network result processing - no client handling
883882
return handle_pure_network_result(
884883
tx,
885884
op_result,
@@ -1153,29 +1152,28 @@ async fn handle_aborted_op(
11531152
// is useless without connecting to the network, we will retry with exponential backoff
11541153
// if necessary
11551154
match op_manager.pop(&tx) {
1156-
// only keep attempting to connect if the node hasn't got enough connections yet
11571155
Ok(Some(OpEnum::Connect(op)))
11581156
if op.has_backoff()
11591157
&& op_manager.ring.open_connections()
11601158
< op_manager.ring.connection_manager.min_connections =>
11611159
{
1162-
let ConnectOp {
1163-
gateway, backoff, ..
1164-
} = *op;
1160+
let gateway = op.gateway().cloned();
11651161
if let Some(gateway) = gateway {
11661162
tracing::warn!("Retry connecting to gateway {}", gateway.peer);
1167-
connect::join_ring_request(backoff, &gateway, op_manager).await?;
1163+
connect::join_ring_request(None, &gateway, op_manager).await?;
11681164
}
11691165
}
11701166
Ok(Some(OpEnum::Connect(_))) => {
1171-
// if no connections were achieved just fail
11721167
if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
11731168
tracing::warn!("Retrying joining the ring with an other gateway");
11741169
if let Some(gateway) = gateways.iter().shuffle().next() {
11751170
connect::join_ring_request(None, gateway, op_manager).await?
11761171
}
11771172
}
11781173
}
1174+
Ok(Some(other)) => {
1175+
op_manager.push(tx, other).await?;
1176+
}
11791177
_ => {}
11801178
}
11811179
}

0 commit comments

Comments
 (0)