Skip to content

Commit dcf5422

Browse files
committed
fix: thread shutdown leaks
Signed-off-by: Alexandre Rulleau <alexandre.rulleau@datadoghq.com>
1 parent c2690e7 commit dcf5422

File tree

7 files changed

+191
-55
lines changed

7 files changed

+191
-55
lines changed

datadog-ipc/tarpc/src/trace.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,14 @@ use rand::Rng;
2222
use std::{
2323
fmt::{self, Formatter},
2424
num::{NonZeroU128, NonZeroU64},
25+
sync::atomic::{AtomicU64, Ordering},
2526
};
2627
#[cfg(feature = "opentelemetry")]
2728
use tracing_opentelemetry::OpenTelemetrySpanExt;
2829

30+
/// Global atomic counter for generating unique span IDs
31+
static SPAN_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
32+
2933
/// A context for tracing the execution of processes, distributed or otherwise.
3034
///
3135
/// Consists of a span identifying an event, an optional parent span identifying a causal event
@@ -80,9 +84,11 @@ pub enum SamplingDecision {
8084
impl Context {
8185
/// Constructs a new context with the trace ID and sampling decision inherited from the parent.
8286
pub(crate) fn new_child(&self) -> Self {
87+
// Use atomic counter instead of rand to avoid TLS allocation
88+
let span_id_value = SPAN_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
8389
Self {
8490
trace_id: self.trace_id,
85-
span_id: SpanId::random(&mut rand::thread_rng()),
91+
span_id: SpanId(span_id_value),
8692
sampling_decision: self.sampling_decision,
8793
}
8894
}

datadog-sidecar-ffi/src/lib.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryAct
3333
use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader};
3434
#[cfg(unix)]
3535
use datadog_sidecar::{
36-
connect_worker_unix, shutdown_master_listener_unix, start_master_listener_unix,
36+
clear_inherited_listener_unix, connect_worker_unix, shutdown_master_listener_unix,
37+
start_master_listener_unix,
3738
};
3839
#[cfg(windows)]
3940
use datadog_sidecar::{
@@ -359,6 +360,19 @@ pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError {
359360
MaybeError::None
360361
}
361362

363+
#[no_mangle]
364+
pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError {
365+
#[cfg(unix)]
366+
{
367+
try_c!(clear_inherited_listener_unix());
368+
}
369+
#[cfg(windows)]
370+
{
371+
// Windows doesn't use fork, so no inherited state to clear
372+
}
373+
MaybeError::None
374+
}
375+
362376
#[no_mangle]
363377
pub extern "C" fn ddog_sidecar_ping(transport: &mut Box<SidecarTransport>) -> MaybeError {
364378
try_c!(blocking::ping(transport));

datadog-sidecar/src/entry.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,12 @@ where
129129
// Shutdown final sender so the receiver can complete
130130
drop(shutdown_complete_tx);
131131

132-
// Await everything else to completion
133-
_ = telemetry_handle.await;
132+
// Await everything else to completion with timeouts to ensure we don't hang
133+
let shutdown_timeout = Duration::from_millis(500);
134+
135+
_ = tokio::time::timeout(shutdown_timeout, telemetry_handle).await;
134136
server.shutdown();
135-
_ = server.trace_flusher.join().await;
137+
_ = tokio::time::timeout(shutdown_timeout, server.trace_flusher.join()).await;
136138

137139
Ok(())
138140
}
@@ -153,14 +155,9 @@ where
153155

154156
let (listener, cancel) = acquire_listener()?;
155157

156-
let result = runtime
158+
runtime
157159
.block_on(main_loop(listener, Arc::new(cancel)))
158-
.map_err(|e| e.into());
159-
160-
// Wait 1 second to shut down properly
161-
runtime.shutdown_timeout(std::time::Duration::from_secs(1));
162-
163-
result
160+
.map_err(|e| e.into())
164161
}
165162

166163
pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> {

datadog-sidecar/src/service/queue_id.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use rand::Rng;
54
use serde::{Deserialize, Serialize};
5+
use std::sync::atomic::{AtomicU64, Ordering};
66

77
/// `QueueId` is a struct that represents a unique identifier for a queue.
88
/// It contains a single field, `inner`, which is a 64-bit unsigned integer.
@@ -12,11 +12,15 @@ pub struct QueueId {
1212
pub(crate) inner: u64,
1313
}
1414

15+
/// Global atomic counter for generating unique queue IDs
16+
static QUEUE_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
17+
1518
impl QueueId {
1619
/// Generates a new unique `QueueId`.
1720
///
18-
/// This method generates a random 64-bit unsigned integer between 1 (inclusive) and `u64::MAX`
19-
/// (exclusive) and uses it as the `inner` value of the new `QueueId`.
21+
/// This method uses an atomic counter to generate monotonically increasing
22+
/// unique IDs. The counter starts at 1 and increments with each call.
23+
/// This approach avoids TLS allocations from random number generators.
2024
///
2125
/// # Examples
2226
///
@@ -27,7 +31,7 @@ impl QueueId {
2731
/// ```
2832
pub fn new_unique() -> Self {
2933
Self {
30-
inner: rand::thread_rng().gen_range(1u64..u64::MAX),
34+
inner: QUEUE_ID_COUNTER.fetch_add(1, Ordering::Relaxed),
3135
}
3236
}
3337
}

datadog-sidecar/src/setup/unix.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::sync::LazyLock;
4+
use std::sync::{
5+
atomic::{AtomicU16, Ordering},
6+
LazyLock,
7+
};
58
use std::{
69
env, fs, io,
710
os::unix::{
@@ -83,7 +86,10 @@ impl Liaison for SharedDirLiaison {
8386
}
8487

8588
fn ipc_per_process() -> Self {
86-
static PROCESS_RANDOM_ID: LazyLock<u16> = LazyLock::new(rand::random);
89+
// Use atomic counter instead of rand::random to avoid TLS allocation
90+
static PROCESS_ID_COUNTER: AtomicU16 = AtomicU16::new(1);
91+
static PROCESS_RANDOM_ID: LazyLock<u16> =
92+
LazyLock::new(|| PROCESS_ID_COUNTER.fetch_add(1, Ordering::Relaxed));
8793

8894
let pid = std::process::id();
8995
let liason_path = env::temp_dir().join(format!("libdatadog.{}.{pid}", *PROCESS_RANDOM_ID));

datadog-sidecar/src/unix.rs

Lines changed: 138 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ use nix::sys::socket::{shutdown, Shutdown};
1515
use std::io;
1616
use std::os::fd::RawFd;
1717
use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd};
18-
use std::sync::Mutex;
18+
use std::sync::{Arc, Mutex};
19+
use std::sync::atomic::{AtomicBool, Ordering};
1920
use std::thread;
2021
use std::time::{Duration, Instant};
2122
use tokio::net::{UnixListener, UnixStream};
@@ -51,20 +52,117 @@ pub fn start_master_listener_unix(master_pid: i32) -> io::Result<()> {
5152
let handle = thread::Builder::new()
5253
.name("dd-sidecar".into())
5354
.spawn(move || {
54-
let acquire_listener = move || -> io::Result<_> {
55-
std_listener.set_nonblocking(true)?;
56-
let listener = UnixListener::from_std(std_listener.try_clone()?)?;
57-
let cancel = {
58-
let fd = listener.as_raw_fd();
59-
move || stop_listening(fd)
60-
};
61-
Ok((move |handler| accept_socket_loop(listener, handler), cancel))
55+
// Use blocking I/O - no shared tokio Runtime needed
56+
// This makes the code fork-safe
57+
use crate::service::sidecar_server::SidecarServer;
58+
let runtime = match tokio::runtime::Builder::new_current_thread()
59+
.enable_all()
60+
.build()
61+
{
62+
Ok(rt) => rt,
63+
Err(e) => {
64+
error!("Failed to create runtime for server initialization: {}", e);
65+
return;
66+
}
6267
};
6368

64-
let _ = enter_listener_loop(acquire_listener).map_err(|e| {
65-
error!("enter_listener_loop failed: {}", e);
66-
e
67-
});
69+
let server = runtime.block_on(async { SidecarServer::default() });
70+
71+
// Shutdown flag to signal connection threads to stop
72+
let shutdown_flag = Arc::new(AtomicBool::new(false));
73+
74+
// Track connection threads for cleanup during shutdown
75+
let mut handler_threads: Vec<thread::JoinHandle<()>> = Vec::new();
76+
77+
loop {
78+
// Clean up finished threads to avoid accumulating handles
79+
handler_threads.retain(|h| !h.is_finished());
80+
81+
match std_listener.accept() {
82+
Ok((stream, _addr)) => {
83+
let server = server.clone();
84+
let shutdown = shutdown_flag.clone();
85+
86+
// Spawn a thread for each connection
87+
match thread::Builder::new().name("dd-conn-handler".into()).spawn(
88+
move || {
89+
// Create a minimal single-threaded runtime for this connection only
90+
// This runtime will be dropped when the connection closes
91+
let runtime = match tokio::runtime::Builder::new_current_thread()
92+
.enable_all()
93+
.build()
94+
{
95+
Ok(rt) => rt,
96+
Err(e) => {
97+
error!("Failed to create runtime for connection: {}", e);
98+
return;
99+
}
100+
};
101+
102+
runtime.block_on(async move {
103+
// Check shutdown flag
104+
if shutdown.load(Ordering::Relaxed) {
105+
return;
106+
}
107+
108+
// Convert std UnixStream to tokio UnixStream
109+
if let Err(e) = stream.set_nonblocking(true) {
110+
error!("Failed to set nonblocking: {}", e);
111+
return;
112+
}
113+
114+
let tokio_stream = match UnixStream::from_std(stream) {
115+
Ok(s) => s,
116+
Err(e) => {
117+
error!("Failed to convert stream: {}", e);
118+
return;
119+
}
120+
};
121+
122+
// Handle the connection using existing async infrastructure
123+
use datadog_ipc::platform::AsyncChannel;
124+
125+
// Use the cloned shared server
126+
server
127+
.accept_connection(AsyncChannel::from(tokio_stream))
128+
.await;
129+
});
130+
},
131+
) {
132+
Ok(handle) => handler_threads.push(handle),
133+
Err(e) => error!("Failed to spawn handler thread: {}", e),
134+
}
135+
}
136+
Err(e) => {
137+
match e.kind() {
138+
io::ErrorKind::Interrupted => continue,
139+
io::ErrorKind::InvalidInput => break, // Socket shut down
140+
_ => {
141+
error!("Accept error: {}", e);
142+
thread::sleep(Duration::from_millis(100));
143+
}
144+
}
145+
}
146+
}
147+
}
148+
149+
info!("Master listener stopped accepting connections");
150+
151+
// Signal all connection threads to stop
152+
shutdown_flag.store(true, Ordering::Relaxed);
153+
154+
// Shutdown the server - this should close active connections
155+
server.shutdown();
156+
157+
// Now join all connection threads - they should exit quickly
158+
// since connections are closed and shutdown flag is set
159+
info!("Waiting for {} connection threads to finish", handler_threads.len());
160+
for (i, handle) in handler_threads.into_iter().enumerate() {
161+
if let Err(e) = handle.join() {
162+
error!("Connection thread {} panicked: {:?}", i, e);
163+
}
164+
}
165+
info!("All connection threads finished");
68166
})
69167
.map_err(io::Error::other)?;
70168

@@ -95,6 +193,7 @@ pub fn connect_worker_unix(master_pid: i32) -> io::Result<SidecarTransport> {
95193
}
96194
}
97195

196+
error!("Worker failed to connect after 10 attempts");
98197
Err(last_error.unwrap_or_else(|| io::Error::other("Connection failed")))
99198
}
100199

@@ -112,28 +211,35 @@ pub fn shutdown_master_listener_unix() -> io::Result<()> {
112211

113212
if let Some((handle, fd)) = listener_data {
114213
stop_listening(fd);
214+
let _ = handle.join();
215+
}
115216

116-
// Try to join with a timeout to avoid hanging the shutdown
117-
// We spawn a helper thread to do the join so we can implement a timeout
118-
let (tx, rx) = std::sync::mpsc::channel();
119-
std::thread::spawn(move || {
120-
let result = handle.join();
121-
let _ = tx.send(result);
122-
});
123-
124-
// Wait up to 2 seconds for clean shutdown (including time for tokio runtime shutdown)
125-
match rx.recv_timeout(Duration::from_millis(2000)) {
126-
Ok(Ok(())) => {
127-
// Clean shutdown
128-
}
129-
Ok(Err(_)) => {
130-
error!("Listener thread panicked during shutdown");
131-
}
132-
Err(_) => {
133-
// Timeout - thread didn't exit in time
134-
// This is acceptable as the OS will clean up when the process exits
217+
Ok(())
218+
}
219+
220+
/// Clears inherited resources in child processes after fork().
221+
/// With the new blocking I/O approach, we only need to forget the listener thread handle.
222+
/// Each connection creates its own short-lived runtime, so there's no global runtime to inherit.
223+
pub fn clear_inherited_listener_unix() -> io::Result<()> {
224+
info!("Child process clearing inherited listener state");
225+
match MASTER_LISTENER.lock() {
226+
Ok(mut guard) => {
227+
if let Some((handle, _fd)) = guard.take() {
228+
info!("Child forgetting inherited listener thread handle");
229+
// Forget the handle without joining - parent owns the thread
230+
std::mem::forget(handle);
231+
info!("Child successfully forgot listener handle");
232+
} else {
233+
info!("Child found no listener to clear");
135234
}
136235
}
236+
Err(e) => {
237+
error!(
238+
"Failed to acquire lock for clearing inherited listener: {}",
239+
e
240+
);
241+
return Err(io::Error::other("Mutex poisoned"));
242+
}
137243
}
138244

139245
Ok(())

datadog-sidecar/src/windows.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,7 @@ async fn accept_pipe_loop(
147147
.ok_or_else(|| io::Error::from(io::ErrorKind::InvalidInput))?;
148148

149149
let raw_handle = pipe_listener.as_raw_handle();
150-
let mut pipe = unsafe {
151-
NamedPipeServer::from_raw_handle(raw_handle)
152-
}?;
150+
let mut pipe = unsafe { NamedPipeServer::from_raw_handle(raw_handle) }?;
153151

154152
loop {
155153
match pipe.connect().await {
@@ -287,21 +285,26 @@ pub fn shutdown_master_listener_windows() -> io::Result<()> {
287285
stop_listening_on_handle(raw);
288286

289287
let (tx, rx) = std::sync::mpsc::channel();
290-
std::thread::spawn(move || {
288+
let helper_handle = std::thread::spawn(move || {
291289
let result = handle.join();
292290
let _ = tx.send(result);
293291
});
294292

295293
// Wait up to 500ms for proper shutdown
296294
match rx.recv_timeout(Duration::from_millis(500)) {
297-
Ok(Ok(())) => { }
295+
Ok(Ok(())) => {}
298296
Ok(Err(_)) => {
299297
error!("Listener thread panicked during shutdown");
300298
}
301299
Err(err) => {
302300
error!("Timeout waiting for listener thread to shut down: {}", err);
303301
}
304302
}
303+
304+
// Join the helper thread to clean up its TLS
305+
if let Err(_) = helper_handle.join() {
306+
error!("Helper thread panicked");
307+
}
305308
}
306309

307310
Ok(())

0 commit comments

Comments
 (0)