Skip to content

Commit ead6ba4

Browse files
committed
fix(connect): add shutdown handle for initial join task
1 parent ab7e887 commit ead6ba4

File tree

3 files changed

+34
-12
lines changed

3 files changed

+34
-12
lines changed

crates/core/src/node/p2p_impl.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{collections::HashSet, convert::Infallible, sync::Arc, time::Duration};
22

33
use futures::{future::BoxFuture, FutureExt};
4+
use tokio::task::JoinHandle;
45
use tracing::Instrument;
56

67
use super::{
@@ -44,6 +45,7 @@ pub(crate) struct NodeP2P {
4445
should_try_connect: bool,
4546
client_events_task: BoxFuture<'static, anyhow::Error>,
4647
contract_executor_task: BoxFuture<'static, anyhow::Error>,
48+
initial_join_task: Option<JoinHandle<()>>,
4749
}
4850

4951
impl NodeP2P {
@@ -181,10 +183,14 @@ impl NodeP2P {
181183

182184
Ok(())
183185
}
184-
pub(super) async fn run_node(self) -> anyhow::Result<Infallible> {
186+
pub(super) async fn run_node(mut self) -> anyhow::Result<Infallible> {
185187
if self.should_try_connect {
186-
connect::initial_join_procedure(self.op_manager.clone(), &self.conn_manager.gateways)
187-
.await?;
188+
let join_handle = connect::initial_join_procedure(
189+
self.op_manager.clone(),
190+
&self.conn_manager.gateways,
191+
)
192+
.await?;
193+
self.initial_join_task = Some(join_handle);
188194

189195
// After connecting to gateways, aggressively try to reach min_connections
190196
// This is important for fast startup and avoiding on-demand connection delays
@@ -199,7 +205,8 @@ impl NodeP2P {
199205
self.node_controller,
200206
);
201207

202-
tokio::select!(
208+
let join_task = self.initial_join_task.take();
209+
let result = tokio::select!(
203210
r = f => {
204211
let Err(e) = r;
205212
tracing::error!("Network event listener exited: {}", e);
@@ -213,7 +220,13 @@ impl NodeP2P {
213220
tracing::error!("Contract executor task exited: {:?}", e);
214221
Err(e)
215222
}
216-
)
223+
);
224+
225+
if let Some(handle) = join_task {
226+
handle.abort();
227+
}
228+
229+
result
217230
}
218231

219232
pub(crate) async fn build<CH, const CLIENTS: usize, ER>(
@@ -343,6 +356,7 @@ impl NodeP2P {
343356
location: config.location,
344357
client_events_task,
345358
contract_executor_task,
359+
initial_join_task: None,
346360
})
347361
}
348362
}

crates/core/src/node/testing_impl.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -780,7 +780,8 @@ where
780780
NB: NetworkBridge + NetworkBridgeExt,
781781
UsrEv: ClientEventsProxy + Send + 'static,
782782
{
783-
connect::initial_join_procedure(config.op_manager.clone(), &config.gateways).await?;
783+
let join_task =
784+
connect::initial_join_procedure(config.op_manager.clone(), &config.gateways).await?;
784785
let (client_responses, _cli_response_sender) = contract::client_responses_channel();
785786
let span = {
786787
config
@@ -811,9 +812,13 @@ where
811812
.parent_span
812813
.clone()
813814
.unwrap_or_else(|| tracing::info_span!("event_listener", peer = %config.peer_key));
814-
run_event_listener(node_controller_rx, config)
815+
let result = run_event_listener(node_controller_rx, config)
815816
.instrument(parent_span)
816-
.await
817+
.await;
818+
819+
join_task.abort();
820+
let _ = join_task.await;
821+
result
817822
}
818823

819824
/// Starts listening to incoming events. Will attempt to join the ring if any gateways have been provided.

crates/core/src/operations/connect.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::time::{Duration, Instant};
1212
use futures::{stream::FuturesUnordered, StreamExt};
1313
use serde::{Deserialize, Serialize};
1414
use tokio::sync::mpsc;
15-
use tokio::task;
15+
use tokio::task::{self, JoinHandle};
1616

1717
use crate::client_events::HostResult;
1818
use crate::dev_tool::Location;
@@ -297,6 +297,9 @@ impl RelayContext for RelayEnv<'_> {
297297
}
298298

299299
fn courtesy_hint(&self, _acceptor: &PeerKeyLocation, _joiner: &PeerKeyLocation) -> bool {
300+
// Courtesy slots still piggyback on regular connections. Flag the first acceptance so the
301+
// joiner can prioritise it, and keep the logic simple until dedicated courtesy tracking
302+
// is wired in (see courtesy-connection-budget branch).
300303
self.op_manager.ring.open_connections() == 0
301304
}
302305
}
@@ -836,15 +839,15 @@ pub(crate) async fn join_ring_request(
836839
pub(crate) async fn initial_join_procedure(
837840
op_manager: Arc<OpManager>,
838841
gateways: &[PeerKeyLocation],
839-
) -> Result<(), OpError> {
842+
) -> Result<JoinHandle<()>, OpError> {
840843
let number_of_parallel_connections = {
841844
let max_potential_conns_per_gw = op_manager.ring.max_hops_to_live;
842845
let needed_to_cover_max =
843846
op_manager.ring.connection_manager.max_connections / max_potential_conns_per_gw;
844847
gateways.iter().take(needed_to_cover_max).count().max(2)
845848
};
846849
let gateways = gateways.to_vec();
847-
task::spawn(async move {
850+
let handle = task::spawn(async move {
848851
if gateways.is_empty() {
849852
tracing::warn!("No gateways available, aborting join procedure");
850853
return;
@@ -940,7 +943,7 @@ pub(crate) async fn initial_join_procedure(
940943
tokio::time::sleep(Duration::from_secs(wait_time)).await;
941944
}
942945
});
943-
Ok(())
946+
Ok(handle)
944947
}
945948

946949
#[cfg(test)]

0 commit comments

Comments
 (0)