Skip to content

Commit 51a8e0f

Browse files
committed
feat(connection): cap gateway courtesy links
1 parent 3c7abc0 commit 51a8e0f

File tree

3 files changed

+112
-11
lines changed

3 files changed

+112
-11
lines changed

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1598,11 +1598,34 @@ impl P2pConnManager {
15981598
.connection_manager
15991599
.prune_in_transit_connection(&peer_id);
16001600
let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr));
1601-
self.bridge
1601+
let eviction_candidate = self
1602+
.bridge
16021603
.op_manager
16031604
.ring
1604-
.add_connection(loc, peer_id.clone(), false)
1605+
.add_connection(loc, peer_id.clone(), false, courtesy)
16051606
.await;
1607+
if let Some(victim) = eviction_candidate {
1608+
if victim == peer_id {
1609+
tracing::debug!(
1610+
%peer_id,
1611+
"Courtesy eviction candidate matched current connection; skipping drop"
1612+
);
1613+
} else {
1614+
tracing::info!(
1615+
%victim,
1616+
%peer_id,
1617+
courtesy_limit = true,
1618+
"Courtesy connection budget exceeded; dropping oldest courtesy peer"
1619+
);
1620+
if let Err(error) = self.bridge.drop_connection(&victim).await {
1621+
tracing::warn!(
1622+
%victim,
1623+
?error,
1624+
"Failed to drop courtesy connection after hitting budget"
1625+
);
1626+
}
1627+
}
1628+
}
16061629
}
16071630
Ok(())
16081631
}

crates/core/src/ring/connection_manager.rs

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use parking_lot::Mutex;
22
use rand::prelude::IndexedRandom;
3-
use std::collections::{btree_map::Entry, BTreeMap};
3+
use std::collections::{btree_map::Entry, BTreeMap, VecDeque};
4+
use std::time::Instant;
45

56
use crate::topology::{Limits, TopologyManager};
67

@@ -22,8 +23,17 @@ pub(crate) struct ConnectionManager {
2223
pub max_connections: usize,
2324
pub rnd_if_htl_above: usize,
2425
pub pub_key: Arc<TransportPublicKey>,
26+
courtesy_links: Arc<Mutex<VecDeque<CourtesyLink>>>,
27+
max_courtesy_links: usize,
2528
}
2629

30+
#[derive(Clone)]
31+
struct CourtesyLink {
32+
peer: PeerId,
33+
}
34+
35+
const MAX_COURTESY_LINKS: usize = 10;
36+
2737
impl ConnectionManager {
2838
pub fn new(config: &NodeConfig) -> Self {
2939
let min_connections = if let Some(v) = config.min_number_conn {
@@ -111,9 +121,43 @@ impl ConnectionManager {
111121
max_connections,
112122
rnd_if_htl_above,
113123
pub_key: Arc::new(pub_key),
124+
courtesy_links: Arc::new(Mutex::new(VecDeque::new())),
125+
max_courtesy_links: if is_gateway { MAX_COURTESY_LINKS } else { 0 },
114126
}
115127
}
116128

129+
fn register_courtesy_connection(&self, peer: &PeerId) -> Option<PeerId> {
130+
if !self.is_gateway || self.max_courtesy_links == 0 {
131+
return None;
132+
}
133+
let mut links = self.courtesy_links.lock();
134+
if links.len() == self.max_courtesy_links && links.iter().all(|entry| entry.peer != *peer) {
135+
tracing::debug!(
136+
%peer,
137+
max = self.max_courtesy_links,
138+
"register_courtesy_connection: budget full before inserting"
139+
);
140+
}
141+
links.retain(|entry| entry.peer != *peer);
142+
links.push_back(CourtesyLink { peer: peer.clone() });
143+
if links.len() > self.max_courtesy_links {
144+
links.pop_front().map(|entry| entry.peer)
145+
} else {
146+
None
147+
}
148+
}
149+
150+
fn unregister_courtesy_connection(&self, peer: &PeerId) {
151+
if !self.is_gateway {
152+
return;
153+
}
154+
let mut links = self.courtesy_links.lock();
155+
if links.is_empty() {
156+
return;
157+
}
158+
links.retain(|entry| entry.peer != *peer);
159+
}
160+
117161
/// Whether a node should accept a new node connection or not based
118162
/// on the relative location and other conditions.
119163
///
@@ -200,7 +244,7 @@ impl ConnectionManager {
200244
return true;
201245
}
202246

203-
const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 2;
247+
const GATEWAY_DIRECT_ACCEPT_LIMIT: usize = 10;
204248
if self.is_gateway {
205249
let direct_total = open + reserved_before;
206250
if direct_total >= GATEWAY_DIRECT_ACCEPT_LIMIT {
@@ -350,8 +394,20 @@ impl ConnectionManager {
350394
self.prune_connection(peer, false)
351395
}
352396

353-
pub fn add_connection(&self, loc: Location, peer: PeerId, was_reserved: bool) {
354-
tracing::info!(%peer, %loc, %was_reserved, "Adding connection to topology");
397+
pub fn add_connection(
398+
&self,
399+
loc: Location,
400+
peer: PeerId,
401+
was_reserved: bool,
402+
courtesy: bool,
403+
) -> Option<PeerId> {
404+
tracing::info!(
405+
%peer,
406+
%loc,
407+
%was_reserved,
408+
courtesy,
409+
"Adding connection to topology"
410+
);
355411
debug_assert!(self.get_peer_key().expect("should be set") != peer);
356412
if was_reserved {
357413
let old = self
@@ -381,6 +437,13 @@ impl ConnectionManager {
381437
self.open_connections
382438
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
383439
std::mem::drop(lop);
440+
441+
if courtesy {
442+
self.register_courtesy_connection(&peer)
443+
} else {
444+
self.unregister_courtesy_connection(&peer);
445+
None
446+
}
384447
}
385448

386449
pub fn update_peer_identity(&self, old_peer: &PeerId, new_peer: PeerId) -> bool {
@@ -452,6 +515,7 @@ impl ConnectionManager {
452515
}
453516

454517
if is_alive {
518+
self.unregister_courtesy_connection(peer);
455519
self.open_connections
456520
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
457521
} else {

crates/core/src/ring/mod.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,14 +228,28 @@ impl Ring {
228228
.record_request(recipient, target, request_type);
229229
}
230230

231-
pub async fn add_connection(&self, loc: Location, peer: PeerId, was_reserved: bool) {
232-
tracing::info!(%peer, this = ?self.connection_manager.get_peer_key(), %was_reserved, "Adding connection to peer");
233-
self.connection_manager
234-
.add_connection(loc, peer.clone(), was_reserved);
231+
pub async fn add_connection(
232+
&self,
233+
loc: Location,
234+
peer: PeerId,
235+
was_reserved: bool,
236+
courtesy: bool,
237+
) -> Option<PeerId> {
238+
tracing::info!(
239+
%peer,
240+
this = ?self.connection_manager.get_peer_key(),
241+
%was_reserved,
242+
courtesy,
243+
"Adding connection to peer"
244+
);
245+
let eviction_candidate =
246+
self.connection_manager
247+
.add_connection(loc, peer.clone(), was_reserved, courtesy);
235248
self.event_register
236249
.register_events(Either::Left(NetEventLog::connected(self, peer, loc)))
237250
.await;
238-
self.refresh_density_request_cache()
251+
self.refresh_density_request_cache();
252+
eviction_candidate
239253
}
240254

241255
pub fn update_connection_identity(&self, old_peer: &PeerId, new_peer: PeerId) {

0 commit comments

Comments
 (0)