From 6527150e2b5cc959c7c37397bfdfe1ac564a7f6c Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Fri, 24 Oct 2025 16:07:28 +0200 Subject: [PATCH 1/4] feat: support threaded connectivity Signed-off-by: Alexandre Rulleau --- datadog-sidecar-ffi/src/lib.rs | 46 +++++- datadog-sidecar/src/config.rs | 17 ++ datadog-sidecar/src/entry.rs | 4 +- datadog-sidecar/src/setup/mod.rs | 1 + datadog-sidecar/src/setup/unix.rs | 16 +- datadog-sidecar/src/setup/windows.rs | 7 +- datadog-sidecar/src/unix.rs | 38 +++++ datadog-sidecar/src/windows.rs | 229 +++++++++++++++++++++++++-- 8 files changed, 342 insertions(+), 16 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 73c5dbb530..81215ca180 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -40,6 +40,18 @@ use libdd_common_ffi::{self as ffi, MaybeError}; use libdd_crashtracker_ffi::Metadata; use libdd_dogstatsd_client::DogStatsDActionOwned; use libdd_telemetry::{ +#[cfg(unix)] +use datadog_sidecar::{connect_worker_unix, start_master_listener_unix}; +#[cfg(windows)] +use datadog_sidecar::{ + connect_worker_windows, start_master_listener_windows, transport_from_owned_handle, +}; +use datadog_trace_utils::msgpack_encoder; +use ddcommon::tag::Tag; +use ddcommon::Endpoint; +use ddcommon_ffi::slice::{AsBytes, CharSlice}; +use ddcommon_ffi::{self as ffi, MaybeError}; +use ddtelemetry::{ data::{self, Dependency, Integration}, worker::{LifecycleAction, LogIdentifier, TelemetryActions}, }; @@ -295,8 +307,6 @@ pub extern "C" fn ddog_remote_config_reader_drop(_: Box) {} #[no_mangle] pub extern "C" fn ddog_sidecar_transport_drop(_: Box) {} -/// # Safety -/// Caller must ensure the process is safe to fork, at the time when this method is called #[no_mangle] pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -> MaybeError { let cfg = datadog_sidecar::config::FromEnv::config(); @@ -307,6 +317,38 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) - MaybeError::None } +#[no_mangle] +pub extern "C" fn ddog_sidecar_connect_master(master_pid: i32) -> MaybeError { + #[cfg(unix)] + { + try_c!(start_master_listener_unix(master_pid)); + } + #[cfg(windows)] + { + try_c!(start_master_listener_windows(master_pid)); + } + MaybeError::None +} + +#[no_mangle] +pub extern "C" fn ddog_sidecar_connect_worker( + master_pid: i32, + connection: &mut *mut SidecarTransport, +) -> MaybeError { + #[cfg(unix)] + { + let transport = Box::new(try_c!(connect_worker_unix(master_pid))); + *connection = Box::into_raw(transport); + } + #[cfg(windows)] + { + let handle = try_c!(connect_worker_windows(master_pid)); + let transport = Box::new(try_c!(transport_from_owned_handle(handle))); + *connection = Box::into_raw(transport); + } + MaybeError::None +} + #[no_mangle] pub extern "C" fn ddog_sidecar_ping(transport: &mut Box) -> MaybeError { try_c!(blocking::ping(transport)); diff --git a/datadog-sidecar/src/config.rs b/datadog-sidecar/src/config.rs index 4009e3a1ad..0c4ecb8e5e 100644 --- a/datadog-sidecar/src/config.rs +++ b/datadog-sidecar/src/config.rs @@ -36,6 +36,8 @@ const ENV_SIDECAR_APPSEC_LOCK_FILE_PATH: &str = "_DD_SIDECAR_APPSEC_LOCK_FILE_PA const ENV_SIDECAR_APPSEC_LOG_FILE_PATH: &str = "_DD_SIDECAR_APPSEC_LOG_FILE_PATH"; const ENV_SIDECAR_APPSEC_LOG_LEVEL: &str = "_DD_SIDECAR_APPSEC_LOG_LEVEL"; +const ENV_SIDECAR_CONNECT_TO_MASTER_PID: &str = "_DD_SIDECAR_CONNECT_TO_MASTER_PID"; + #[derive(Debug, Copy, Clone, Default)] pub enum IpcMode { #[default] @@ -84,6 +86,7 @@ pub struct Config { pub crashtracker_endpoint: Option, pub appsec_config: Option, pub max_memory: usize, + pub connect_to_master_pid: i32, } #[derive(Debug, Clone)] @@ -128,6 +131,12 @@ impl Config { format!("{}", self.max_memory).into(), ); } + if self.connect_to_master_pid != 0 { + res.insert( + ENV_SIDECAR_CONNECT_TO_MASTER_PID, + format!("{}", self.connect_to_master_pid).into(), + ); + } res } } @@ -241,9 +250,17 @@ impl FromEnv { crashtracker_endpoint: Self::crashtracker_endpoint(), appsec_config: Self::appsec_config(), max_memory: Self::max_memory(), + connect_to_master_pid: Self::connect_to_master_pid(), } } + fn connect_to_master_pid() -> i32 { + std::env::var(ENV_SIDECAR_CONNECT_TO_MASTER_PID) + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(0) + } + fn appsec_config() -> Option { let shared_lib_path = std::env::var_os(ENV_SIDECAR_APPSEC_SHARED_LIB_PATH)?; let socket_file_path = std::env::var_os(ENV_SIDECAR_APPSEC_SOCKET_FILE_PATH)?; diff --git a/datadog-sidecar/src/entry.rs b/datadog-sidecar/src/entry.rs index d0a0fad4fc..d3bcb502c5 100644 --- a/datadog-sidecar/src/entry.rs +++ b/datadog-sidecar/src/entry.rs @@ -218,8 +218,8 @@ pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> { pub fn start_or_connect_to_sidecar(cfg: Config) -> anyhow::Result { let liaison = match cfg.ipc_mode { - config::IpcMode::Shared => setup::DefaultLiason::ipc_shared(), - config::IpcMode::InstancePerProcess => setup::DefaultLiason::ipc_per_process(), + config::IpcMode::Shared => setup::DefaultLiaison::ipc_shared(), + config::IpcMode::InstancePerProcess => setup::DefaultLiaison::ipc_per_process(), }; let err = match liaison.attempt_listen() { diff --git a/datadog-sidecar/src/setup/mod.rs b/datadog-sidecar/src/setup/mod.rs index 07c837aab0..550bb18ac7 100644 --- a/datadog-sidecar/src/setup/mod.rs +++ b/datadog-sidecar/src/setup/mod.rs @@ -23,4 +23,5 @@ pub trait Liaison: Sized { fn attempt_listen(&self) -> io::Result>; fn ipc_shared() -> Self; fn ipc_per_process() -> Self; + fn for_master_pid(master_pid: u32) -> Self; } diff --git a/datadog-sidecar/src/setup/unix.rs b/datadog-sidecar/src/setup/unix.rs index 589602b9b4..7ad083545f 100644 --- a/datadog-sidecar/src/setup/unix.rs +++ b/datadog-sidecar/src/setup/unix.rs @@ -89,6 +89,10 @@ impl Liaison for SharedDirLiaison { let liason_path = env::temp_dir().join(format!("libdatadog.{}.{pid}", *PROCESS_RANDOM_ID)); Self::new(liason_path) } + + fn for_master_pid(master_pid: u32) -> Self { + Self::new(env::temp_dir().join(format!("libdatadog.{}", master_pid))) + } } impl SharedDirLiaison { @@ -141,7 +145,7 @@ mod linux { pub struct AbstractUnixSocketLiaison { path: PathBuf, } - pub type DefaultLiason = AbstractUnixSocketLiaison; + pub type DefaultLiaison = AbstractUnixSocketLiaison; impl Liaison for AbstractUnixSocketLiaison { fn connect_to_server(&self) -> io::Result { @@ -173,6 +177,14 @@ mod linux { )); Self { path } } + + fn for_master_pid(master_pid: u32) -> Self { + let path = PathBuf::from(format!( + concat!("libdatadog/", crate::sidecar_version!(), ".{}.sock"), + master_pid + )); + Self { path } + } } impl Default for AbstractUnixSocketLiaison { @@ -193,7 +205,7 @@ mod linux { pub use linux::*; #[cfg(target_os = "macos")] -pub type DefaultLiason = SharedDirLiaison; +pub type DefaultLiaison = SharedDirLiaison; #[cfg(test)] mod tests { diff --git a/datadog-sidecar/src/setup/windows.rs b/datadog-sidecar/src/setup/windows.rs index 49c39caaf1..afc8c1b59d 100644 --- a/datadog-sidecar/src/setup/windows.rs +++ b/datadog-sidecar/src/setup/windows.rs @@ -166,6 +166,11 @@ impl Liaison for NamedPipeLiaison { fn ipc_per_process() -> Self { Self::new(format!("libdatadog_{}_", unsafe { getpid() })) } + + fn for_master_pid(master_pid: u32) -> Self { + let path = env::temp_dir().join(format!("libdatadog.{}", master_pid)); + Self::new(path.to_string_lossy().as_ref()) + } } impl NamedPipeLiaison { @@ -197,7 +202,7 @@ impl Default for NamedPipeLiaison { } } -pub type DefaultLiason = NamedPipeLiaison; +pub type DefaultLiaison = NamedPipeLiaison; #[cfg(test)] mod tests { diff --git a/datadog-sidecar/src/unix.rs b/datadog-sidecar/src/unix.rs index 61ce695a7f..8eb2aecd87 100644 --- a/datadog-sidecar/src/unix.rs +++ b/datadog-sidecar/src/unix.rs @@ -3,6 +3,8 @@ use spawn_worker::{getpid, SpawnWorker, Stdio, TrampolineData}; +use crate::service::blocking::SidecarTransport; +use crate::setup::{DefaultLiaison, Liaison}; use std::ffi::CString; use std::os::unix::net::UnixListener as StdUnixListener; @@ -13,6 +15,7 @@ use nix::sys::socket::{shutdown, Shutdown}; use std::io; use std::os::fd::RawFd; use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; +use std::thread; use std::time::Instant; use tokio::net::{UnixListener, UnixStream}; use tokio::select; @@ -32,6 +35,41 @@ use std::ffi::CStr; #[cfg(target_os = "linux")] use tracing::warn; +pub fn start_master_listener_unix(master_pid: i32) -> io::Result<()> { + let liaison = DefaultLiaison::for_master_pid(master_pid as u32); + + // Try to acquire the listening endpoint via the liaison + let std_listener = match liaison.attempt_listen()? { + Some(l) => l, + None => return Ok(()), + }; + + let _ = thread::Builder::new() + .name("dd-sidecar".into()) + .spawn(move || { + let acquire_listener = move || -> io::Result<_> { + std_listener.set_nonblocking(true)?; + let listener = UnixListener::from_std(std_listener.try_clone()?)?; + let cancel = { + let fd = listener.as_raw_fd(); + move || stop_listening(fd) + }; + Ok((move |handler| accept_socket_loop(listener, handler), cancel)) + }; + + let _ = enter_listener_loop(acquire_listener); + }) + .map_err(io::Error::other)?; + + Ok(()) +} + +pub fn connect_worker_unix(master_pid: i32) -> io::Result { + let liaison = DefaultLiaison::for_master_pid(master_pid as u32); + let channel = liaison.connect_to_server()?; + Ok(channel.into()) +} + #[no_mangle] #[allow(unused)] pub extern "C" fn ddog_daemon_entry_point(trampoline_data: &TrampolineData) { diff --git a/datadog-sidecar/src/windows.rs b/datadog-sidecar/src/windows.rs index 9080bcfedc..c6e0df6bec 100644 --- a/datadog-sidecar/src/windows.rs +++ b/datadog-sidecar/src/windows.rs @@ -2,9 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::enter_listener_loop; +use crate::one_way_shared_memory::open_named_shm; +use crate::service::blocking::SidecarTransport; use crate::setup::pid_shm_path; +use arrayref::array_ref; +use datadog_ipc::platform::metadata::ProcessHandle; use datadog_ipc::platform::{ - named_pipe_name_from_raw_handle, FileBackedHandle, MappedMem, NamedShmHandle, + named_pipe_name_from_raw_handle, Channel, FileBackedHandle, MappedMem, NamedShmHandle, + PIPE_PATH, }; use futures::FutureExt; @@ -14,32 +19,238 @@ use libdd_common_ffi::CharSlice; use libdd_crashtracker_ffi::{ddog_crasht_init_windows, Metadata}; use manual_future::ManualFuture; use spawn_worker::{write_crashtracking_trampoline, SpawnWorker, Stdio, TrampolineData}; -use std::ffi::CStr; +use std::ffi::{CStr, CString}; use std::io::{self, Error}; -use std::os::windows::io::{AsRawHandle, IntoRawHandle, OwnedHandle}; +use std::mem; +use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, OwnedHandle, RawHandle}; use std::ptr::null_mut; use std::sync::LazyLock; use std::sync::{Arc, Mutex}; -use std::time::Instant; +use std::thread; +use std::time::{Duration, Instant}; use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions}; use tokio::select; -use tracing::{error, info}; +use tracing::{error, info, warn}; use winapi::{ shared::{ + minwindef::DWORD, sddl::ConvertSidToStringSidA, - winerror::{ERROR_INSUFFICIENT_BUFFER, ERROR_NO_TOKEN}, + winerror::{ + ERROR_ACCESS_DENIED, ERROR_INSUFFICIENT_BUFFER, ERROR_NO_TOKEN, ERROR_PIPE_BUSY, + }, }, um::{ - handleapi::CloseHandle, + fileapi::{CreateFileA, OPEN_EXISTING}, + handleapi::{CloseHandle, INVALID_HANDLE_VALUE}, + minwinbase::SECURITY_ATTRIBUTES, processthreadsapi::{ GetCurrentProcess, GetCurrentThread, OpenProcessToken, OpenThreadToken, }, securitybaseapi::GetTokenInformation, - winbase::LocalFree, - winnt::{TokenUser, HANDLE, TOKEN_QUERY, TOKEN_USER}, + winbase::{ + CreateNamedPipeA, LocalFree, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, + PIPE_ACCESS_INBOUND, PIPE_ACCESS_OUTBOUND, PIPE_READMODE_BYTE, PIPE_TYPE_BYTE, + PIPE_UNLIMITED_INSTANCES, + }, + winnt::{TokenUser, GENERIC_READ, GENERIC_WRITE, HANDLE, TOKEN_QUERY, TOKEN_USER}, }, }; +// Helper function to generate the named pipe endpoint name for a master process +fn endpoint_name_for_master(master_pid: i32) -> String { + format!( + "{}libdatadog_master_{}_{}", + PIPE_PATH, + master_pid, + crate::sidecar_version!() + ) +} + +// Create and bind a Windows named pipe server +fn bind_named_pipe_listener(name: &str) -> io::Result { + let c_name = CString::new(name).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + + let mut sec_attributes = SECURITY_ATTRIBUTES { + nLength: mem::size_of::() as DWORD, + lpSecurityDescriptor: null_mut(), + bInheritHandle: 1, + }; + + unsafe { + let handle = CreateNamedPipeA( + c_name.as_ptr(), + FILE_FLAG_OVERLAPPED + | PIPE_ACCESS_OUTBOUND + | PIPE_ACCESS_INBOUND + | FILE_FLAG_FIRST_PIPE_INSTANCE, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + 0, + &mut sec_attributes, + ); + + if handle == INVALID_HANDLE_VALUE { + let error = io::Error::last_os_error(); + if error.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) { + return Err(io::Error::new(io::ErrorKind::AddrInUse, error)); + } + return Err(error); + } + + Ok(OwnedHandle::from_raw_handle(handle as RawHandle)) + } +} + +// Connect to an existing Windows named pipe as a client +fn connect_named_pipe_client(name: &str) -> io::Result { + let c_name = CString::new(name).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + + let timeout_end = Instant::now() + Duration::from_secs(2); + loop { + let handle = unsafe { + CreateFileA( + c_name.as_ptr(), + GENERIC_READ | GENERIC_WRITE, + 0, + null_mut(), + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + null_mut(), + ) + }; + + if handle == INVALID_HANDLE_VALUE { + let error = io::Error::last_os_error(); + if error.raw_os_error() != Some(ERROR_PIPE_BUSY as i32) { + return Err(error); + } + } else { + return Ok(handle as RawHandle); + } + + if Instant::now() > timeout_end { + return Err(io::Error::from(io::ErrorKind::TimedOut)); + } + std::thread::yield_now(); + } +} + +// Accept loop for incoming named pipe connections +async fn accept_pipe_loop( + pipe_listener: Arc, + handler: Box, +) -> io::Result<()> { + let name = named_pipe_name_from_raw_handle(pipe_listener.as_raw_handle()) + .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidInput))?; + + // We need to duplicate the handle to avoid consuming the Arc's inner handle + let raw_handle = pipe_listener.as_raw_handle(); + let mut pipe = unsafe { + // Create the first pipe server from the raw handle + NamedPipeServer::from_raw_handle(raw_handle) + }?; + + loop { + match pipe.connect().await { + Ok(_) => { + let connected_pipe = pipe; + // Create a new pipe instance for the next connection + pipe = ServerOptions::new().create(&name)?; + handler(connected_pipe); + } + Err(e) => { + error!("Error accepting pipe connection: {}", e); + break; + } + } + } + + Ok(()) +} + +// Stop listening on a named pipe handle +fn stop_listening_on_handle(raw: RawHandle) { + unsafe { + CloseHandle(raw as HANDLE); + } +} + +pub fn transport_from_owned_handle(handle: OwnedHandle) -> io::Result { + let raw: RawHandle = handle.as_raw_handle(); + + let name = named_pipe_name_from_raw_handle(raw) + .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidInput))?; + + let getter = ProcessHandle::Getter(Box::new(move || { + let timeout_end = Instant::now() + Duration::from_secs(2); + let mut last_err: Option> = None; + let pid_path = pid_shm_path(&name); + loop { + match open_named_shm(&pid_path) { + Ok(shm) => { + let pid = u32::from_ne_bytes(*array_ref![shm.as_slice(), 0, 4]); + if pid != 0 { + return Ok(ProcessHandle::Pid(pid)); + } + } + Err(e) => last_err = Some(Box::new(e)), + } + if Instant::now() > timeout_end { + warn!( + "Reading sidecar pid from {} timed out (last error: {:?})", + pid_path.to_string_lossy(), + last_err + ); + return Err(io::Error::from(io::ErrorKind::TimedOut)); + } + std::thread::yield_now(); + } + })); + + let channel = Channel::from_client_handle_and_pid(handle, getter); + Ok(channel.into()) +} + +pub fn start_master_listener_windows(master_pid: i32) -> io::Result<()> { + let name = endpoint_name_for_master(master_pid); + + let pipe_listener = match bind_named_pipe_listener(&name) { + Ok(l) => l, + Err(e) if e.kind() == io::ErrorKind::AddrInUse => return Ok(()), + Err(e) => return Err(e), + }; + + let pipe_listener = Arc::new(pipe_listener); + + thread::Builder::new() + .name("dd-sidecar".into()) + .spawn(move || { + let pipe_listener_clone = pipe_listener.clone(); + let acquire_listener = move || -> io::Result<_> { + // Convert RawHandle to isize for thread safety (Send + Sync) + let raw = pipe_listener.as_raw_handle() as isize; + let cancel = move || stop_listening_on_handle(raw as RawHandle); + Ok(( + move |handler| accept_pipe_loop(pipe_listener_clone.clone(), handler), + cancel, + )) + }; + + let _ = enter_listener_loop(acquire_listener); + }) + .map_err(io::Error::other)?; + + Ok(()) +} + +pub fn connect_worker_windows(master_pid: i32) -> io::Result { + let name = endpoint_name_for_master(master_pid); + let raw = connect_named_pipe_client(&name)?; + Ok(unsafe { OwnedHandle::from_raw_handle(raw) }) +} + /// cbindgen:ignore #[no_mangle] pub extern "C" fn ddog_daemon_entry_point(_trampoline_data: &TrampolineData) { From 68a43264223f784590fd72ea2212cf81f01e2da3 Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Fri, 7 Nov 2025 14:31:20 +0100 Subject: [PATCH 2/4] fix: threaded connection leaks Signed-off-by: Alexandre Rulleau --- datadog-sidecar-ffi/src/lib.rs | 34 ++++++++----- datadog-sidecar/src/entry.rs | 23 +++++---- datadog-sidecar/src/unix.rs | 93 +++++++++++++++++++++++----------- datadog-sidecar/src/windows.rs | 76 +++++++++++++++++++++++---- 4 files changed, 166 insertions(+), 60 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 81215ca180..654ccce193 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -31,6 +31,15 @@ use datadog_sidecar::service::{ }; use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions}; use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader}; +#[cfg(unix)] +use datadog_sidecar::{ + connect_worker_unix, shutdown_master_listener_unix, start_master_listener_unix, +}; +#[cfg(windows)] +use datadog_sidecar::{ + connect_worker_windows, shutdown_master_listener_windows, start_master_listener_windows, + transport_from_owned_handle, +}; use libc::c_char; use libdd_common::tag::Tag; use libdd_common::Endpoint; @@ -40,18 +49,6 @@ use libdd_common_ffi::{self as ffi, MaybeError}; use libdd_crashtracker_ffi::Metadata; use libdd_dogstatsd_client::DogStatsDActionOwned; use libdd_telemetry::{ -#[cfg(unix)] -use datadog_sidecar::{connect_worker_unix, start_master_listener_unix}; -#[cfg(windows)] -use datadog_sidecar::{ - connect_worker_windows, start_master_listener_windows, transport_from_owned_handle, -}; -use datadog_trace_utils::msgpack_encoder; -use ddcommon::tag::Tag; -use ddcommon::Endpoint; -use ddcommon_ffi::slice::{AsBytes, CharSlice}; -use ddcommon_ffi::{self as ffi, MaybeError}; -use ddtelemetry::{ data::{self, Dependency, Integration}, worker::{LifecycleAction, LogIdentifier, TelemetryActions}, }; @@ -349,6 +346,19 @@ pub extern "C" fn ddog_sidecar_connect_worker( MaybeError::None } +#[no_mangle] +pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError { + #[cfg(unix)] + { + try_c!(shutdown_master_listener_unix()); + } + #[cfg(windows)] + { + try_c!(shutdown_master_listener_windows()); + } + MaybeError::None +} + #[no_mangle] pub extern "C" fn ddog_sidecar_ping(transport: &mut Box) -> MaybeError { try_c!(blocking::ping(transport)); diff --git a/datadog-sidecar/src/entry.rs b/datadog-sidecar/src/entry.rs index d3bcb502c5..fbd3c6b969 100644 --- a/datadog-sidecar/src/entry.rs +++ b/datadog-sidecar/src/entry.rs @@ -64,13 +64,13 @@ where } }); - tokio::spawn(async move { - if let Err(err) = tokio::signal::ctrl_c().await { - tracing::error!("Error setting up signal handler {}", err); - } - tracing::info!("Received Ctrl-C Signal, shutting down"); - cancel(); - }); + // tokio::spawn(async move { + // if let Err(err) = tokio::signal::ctrl_c().await { + // tracing::error!("Error setting up signal handler {}", err); + // } + // tracing::info!("Received Ctrl-C Signal, shutting down"); + // cancel(); + // }); #[cfg(unix)] tokio::spawn(async move { @@ -153,9 +153,14 @@ where let (listener, cancel) = acquire_listener()?; - runtime + let result = runtime .block_on(main_loop(listener, Arc::new(cancel))) - .map_err(|e| e.into()) + .map_err(|e| e.into()); + + // Wait 1 second to shut down properly + runtime.shutdown_timeout(std::time::Duration::from_secs(1)); + + result } pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> { diff --git a/datadog-sidecar/src/unix.rs b/datadog-sidecar/src/unix.rs index 8eb2aecd87..efa904fa1c 100644 --- a/datadog-sidecar/src/unix.rs +++ b/datadog-sidecar/src/unix.rs @@ -15,11 +15,10 @@ use nix::sys::socket::{shutdown, Shutdown}; use std::io; use std::os::fd::RawFd; use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; +use std::sync::Mutex; use std::thread; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::net::{UnixListener, UnixStream}; -use tokio::select; -use tokio::signal::unix::{signal, SignalKind}; use tracing::{error, info}; #[cfg(target_os = "linux")] @@ -35,16 +34,21 @@ use std::ffi::CStr; #[cfg(target_os = "linux")] use tracing::warn; +static MASTER_LISTENER: Mutex, RawFd)>> = Mutex::new(None); + pub fn start_master_listener_unix(master_pid: i32) -> io::Result<()> { let liaison = DefaultLiaison::for_master_pid(master_pid as u32); - // Try to acquire the listening endpoint via the liaison let std_listener = match liaison.attempt_listen()? { Some(l) => l, - None => return Ok(()), + None => { + return Ok(()); + } }; - let _ = thread::Builder::new() + let listener_fd = std_listener.as_raw_fd(); + + let handle = thread::Builder::new() .name("dd-sidecar".into()) .spawn(move || { let acquire_listener = move || -> io::Result<_> { @@ -57,17 +61,61 @@ pub fn start_master_listener_unix(master_pid: i32) -> io::Result<()> { Ok((move |handler| accept_socket_loop(listener, handler), cancel)) }; - let _ = enter_listener_loop(acquire_listener); + let _ = enter_listener_loop(acquire_listener).map_err(|e| { + error!("enter_listener_loop failed: {}", e); + e + }); }) .map_err(io::Error::other)?; + match MASTER_LISTENER.lock() { + Ok(mut guard) => *guard = Some((handle, listener_fd)), + Err(e) => { + error!("Failed to acquire lock for storing master listener: {}", e); + return Err(io::Error::other("Mutex poisoned")); + } + } + Ok(()) } pub fn connect_worker_unix(master_pid: i32) -> io::Result { let liaison = DefaultLiaison::for_master_pid(master_pid as u32); - let channel = liaison.connect_to_server()?; - Ok(channel.into()) + + let mut last_error = None; + for _ in 0..10 { + match liaison.connect_to_server() { + Ok(channel) => { + return Ok(channel.into()); + } + Err(e) => { + last_error = Some(e); + std::thread::sleep(Duration::from_millis(10)); + } + } + } + + Err(last_error.unwrap_or_else(|| io::Error::other("Connection failed"))) +} + +pub fn shutdown_master_listener_unix() -> io::Result<()> { + let listener_data = match MASTER_LISTENER.lock() { + Ok(mut guard) => guard.take(), + Err(e) => { + error!( + "Failed to acquire lock for shutting down master listener: {}", + e + ); + return Err(io::Error::other("Mutex poisoned")); + } + }; + + if let Some((handle, fd)) = listener_data { + stop_listening(fd); + handle.join(); + } + + Ok(()) } #[no_mangle] @@ -127,32 +175,19 @@ pub extern "C" fn ddog_daemon_entry_point(trampoline_data: &TrampolineData) { fn stop_listening(listener_fd: RawFd) { // We need to drop O_NONBLOCK, as accept() on a shutdown socket will just give // EAGAIN instead of EINVAL - #[allow(clippy::unwrap_used)] - let flags = OFlag::from_bits_truncate(fcntl(listener_fd, F_GETFL).ok().unwrap()); - _ = fcntl(listener_fd, F_SETFL(flags & !OFlag::O_NONBLOCK)); - _ = shutdown(listener_fd, Shutdown::Both); + if let Ok(flags_raw) = fcntl(listener_fd, F_GETFL) { + let flags = OFlag::from_bits_truncate(flags_raw); + _ = fcntl(listener_fd, F_SETFL(flags & !OFlag::O_NONBLOCK)); + _ = shutdown(listener_fd, Shutdown::Both); + } } async fn accept_socket_loop( listener: UnixListener, handler: Box, ) -> io::Result<()> { - #[allow(clippy::unwrap_used)] - let mut termsig = signal(SignalKind::terminate()).unwrap(); - loop { - select! { - _ = termsig.recv() => { - stop_listening(listener.as_raw_fd()); - break; - } - accept = listener.accept() => { - if let Ok((socket, _)) = accept { - handler(socket); - } else { - break; - } - } - } + while let Ok((socket, _)) = listener.accept().await { + handler(socket); } Ok(()) } diff --git a/datadog-sidecar/src/windows.rs b/datadog-sidecar/src/windows.rs index c6e0df6bec..2b3f72457e 100644 --- a/datadog-sidecar/src/windows.rs +++ b/datadog-sidecar/src/windows.rs @@ -28,6 +28,9 @@ use std::sync::LazyLock; use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; + +static MASTER_LISTENER: Mutex, Arc)>> = + Mutex::new(None); use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions}; use tokio::select; use tracing::{error, info, warn}; @@ -103,7 +106,6 @@ fn bind_named_pipe_listener(name: &str) -> io::Result { } } -// Connect to an existing Windows named pipe as a client fn connect_named_pipe_client(name: &str) -> io::Result { let c_name = CString::new(name).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; @@ -137,7 +139,6 @@ fn connect_named_pipe_client(name: &str) -> io::Result { } } -// Accept loop for incoming named pipe connections async fn accept_pipe_loop( pipe_listener: Arc, handler: Box, @@ -145,10 +146,8 @@ async fn accept_pipe_loop( let name = named_pipe_name_from_raw_handle(pipe_listener.as_raw_handle()) .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidInput))?; - // We need to duplicate the handle to avoid consuming the Arc's inner handle let raw_handle = pipe_listener.as_raw_handle(); let mut pipe = unsafe { - // Create the first pipe server from the raw handle NamedPipeServer::from_raw_handle(raw_handle) }?; @@ -156,7 +155,6 @@ async fn accept_pipe_loop( match pipe.connect().await { Ok(_) => { let connected_pipe = pipe; - // Create a new pipe instance for the next connection pipe = ServerOptions::new().create(&name)?; handler(connected_pipe); } @@ -170,7 +168,6 @@ async fn accept_pipe_loop( Ok(()) } -// Stop listening on a named pipe handle fn stop_listening_on_handle(raw: RawHandle) { unsafe { CloseHandle(raw as HANDLE); @@ -223,13 +220,13 @@ pub fn start_master_listener_windows(master_pid: i32) -> io::Result<()> { }; let pipe_listener = Arc::new(pipe_listener); + let pipe_listener_for_shutdown = pipe_listener.clone(); - thread::Builder::new() + let handle = thread::Builder::new() .name("dd-sidecar".into()) .spawn(move || { let pipe_listener_clone = pipe_listener.clone(); let acquire_listener = move || -> io::Result<_> { - // Convert RawHandle to isize for thread safety (Send + Sync) let raw = pipe_listener.as_raw_handle() as isize; let cancel = move || stop_listening_on_handle(raw as RawHandle); Ok(( @@ -242,13 +239,72 @@ pub fn start_master_listener_windows(master_pid: i32) -> io::Result<()> { }) .map_err(io::Error::other)?; + match MASTER_LISTENER.lock() { + Ok(mut guard) => *guard = Some((handle, pipe_listener_for_shutdown)), + Err(e) => { + error!("Failed to acquire lock for storing master listener: {}", e); + return Err(io::Error::other("Mutex poisoned")); + } + } + Ok(()) } pub fn connect_worker_windows(master_pid: i32) -> io::Result { let name = endpoint_name_for_master(master_pid); - let raw = connect_named_pipe_client(&name)?; - Ok(unsafe { OwnedHandle::from_raw_handle(raw) }) + + let mut last_error = None; + for _ in 0..10 { + match connect_named_pipe_client(&name) { + Ok(raw) => { + return Ok(unsafe { OwnedHandle::from_raw_handle(raw) }); + } + Err(e) => { + last_error = Some(e); + std::thread::sleep(Duration::from_millis(10)); + } + } + } + + error!("Failed to connect to master listener"); + Err(last_error.unwrap_or_else(|| io::Error::new(io::ErrorKind::Other, "Connection failed"))) +} + +pub fn shutdown_master_listener_windows() -> io::Result<()> { + let listener_data = match MASTER_LISTENER.lock() { + Ok(mut guard) => guard.take(), + Err(e) => { + error!( + "Failed to acquire lock for shutting down master listener: {}", + e + ); + return Err(io::Error::other("Mutex poisoned")); + } + }; + + if let Some((handle, pipe_listener)) = listener_data { + let raw = pipe_listener.as_raw_handle(); + stop_listening_on_handle(raw); + + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let result = handle.join(); + let _ = tx.send(result); + }); + + // Wait up to 500ms for proper shutdown + match rx.recv_timeout(Duration::from_millis(500)) { + Ok(Ok(())) => { } + Ok(Err(_)) => { + error!("Listener thread panicked during shutdown"); + } + Err(err) => { + error!("Timeout waiting for listener thread to shut down: {}", err); + } + } + } + + Ok(()) } /// cbindgen:ignore From c2690e731665922fafd7e13575e56b7dd72d7be9 Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Fri, 7 Nov 2025 14:31:20 +0100 Subject: [PATCH 3/4] fix: threaded connection leaks Signed-off-by: Alexandre Rulleau --- datadog-sidecar/src/unix.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/datadog-sidecar/src/unix.rs b/datadog-sidecar/src/unix.rs index efa904fa1c..9db5513156 100644 --- a/datadog-sidecar/src/unix.rs +++ b/datadog-sidecar/src/unix.rs @@ -112,7 +112,28 @@ pub fn shutdown_master_listener_unix() -> io::Result<()> { if let Some((handle, fd)) = listener_data { stop_listening(fd); - handle.join(); + + // Try to join with a timeout to avoid hanging the shutdown + // We spawn a helper thread to do the join so we can implement a timeout + let (tx, rx) = std::sync::mpsc::channel(); + std::thread::spawn(move || { + let result = handle.join(); + let _ = tx.send(result); + }); + + // Wait up to 2 seconds for clean shutdown (including time for tokio runtime shutdown) + match rx.recv_timeout(Duration::from_millis(2000)) { + Ok(Ok(())) => { + // Clean shutdown + } + Ok(Err(_)) => { + error!("Listener thread panicked during shutdown"); + } + Err(_) => { + // Timeout - thread didn't exit in time + // This is acceptable as the OS will clean up when the process exits + } + } } Ok(()) From a0ffd63bbcbee90b7afaf545ecc16ed895427a55 Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Fri, 14 Nov 2025 16:18:26 +0100 Subject: [PATCH 4/4] fix: thread shutdown leaks Signed-off-by: Alexandre Rulleau --- datadog-ipc/tarpc/src/trace.rs | 8 +- datadog-sidecar-ffi/src/lib.rs | 16 +- datadog-sidecar/src/entry.rs | 17 +-- datadog-sidecar/src/service/queue_id.rs | 12 +- datadog-sidecar/src/setup/unix.rs | 10 +- datadog-sidecar/src/unix.rs | 193 ++++++++++++++++++++---- datadog-sidecar/src/windows.rs | 13 +- 7 files changed, 214 insertions(+), 55 deletions(-) diff --git a/datadog-ipc/tarpc/src/trace.rs b/datadog-ipc/tarpc/src/trace.rs index b3bc326ea4..a367756f76 100644 --- a/datadog-ipc/tarpc/src/trace.rs +++ b/datadog-ipc/tarpc/src/trace.rs @@ -22,10 +22,14 @@ use rand::Rng; use std::{ fmt::{self, Formatter}, num::{NonZeroU128, NonZeroU64}, + sync::atomic::{AtomicU64, Ordering}, }; #[cfg(feature = "opentelemetry")] use tracing_opentelemetry::OpenTelemetrySpanExt; +/// Global atomic counter for generating unique span IDs +static SPAN_ID_COUNTER: AtomicU64 = AtomicU64::new(1); + /// A context for tracing the execution of processes, distributed or otherwise. /// /// Consists of a span identifying an event, an optional parent span identifying a causal event @@ -80,9 +84,11 @@ pub enum SamplingDecision { impl Context { /// Constructs a new context with the trace ID and sampling decision inherited from the parent. pub(crate) fn new_child(&self) -> Self { + // Use atomic counter instead of rand to avoid TLS allocation + let span_id_value = SPAN_ID_COUNTER.fetch_add(1, Ordering::Relaxed); Self { trace_id: self.trace_id, - span_id: SpanId::random(&mut rand::thread_rng()), + span_id: SpanId(span_id_value), sampling_decision: self.sampling_decision, } } diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 654ccce193..c507b97eaf 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -33,7 +33,8 @@ use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryAct use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader}; #[cfg(unix)] use datadog_sidecar::{ - connect_worker_unix, shutdown_master_listener_unix, start_master_listener_unix, + clear_inherited_listener_unix, connect_worker_unix, shutdown_master_listener_unix, + start_master_listener_unix, }; #[cfg(windows)] use datadog_sidecar::{ @@ -359,6 +360,19 @@ pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError { MaybeError::None } +#[no_mangle] +pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError { + #[cfg(unix)] + { + try_c!(clear_inherited_listener_unix()); + } + #[cfg(windows)] + { + // Windows doesn't use fork, so no inherited state to clear + } + MaybeError::None +} + #[no_mangle] pub extern "C" fn ddog_sidecar_ping(transport: &mut Box) -> MaybeError { try_c!(blocking::ping(transport)); diff --git a/datadog-sidecar/src/entry.rs b/datadog-sidecar/src/entry.rs index fbd3c6b969..c8f3db74e4 100644 --- a/datadog-sidecar/src/entry.rs +++ b/datadog-sidecar/src/entry.rs @@ -129,10 +129,12 @@ where // Shutdown final sender so the receiver can complete drop(shutdown_complete_tx); - // Await everything else to completion - _ = telemetry_handle.await; + // Await everything else to completion with timeouts to ensure we don't hang + let shutdown_timeout = Duration::from_millis(500); + + _ = tokio::time::timeout(shutdown_timeout, telemetry_handle).await; server.shutdown(); - _ = server.trace_flusher.join().await; + _ = tokio::time::timeout(shutdown_timeout, server.trace_flusher.join()).await; Ok(()) } @@ -153,14 +155,9 @@ where let (listener, cancel) = acquire_listener()?; - let result = runtime + runtime .block_on(main_loop(listener, Arc::new(cancel))) - .map_err(|e| e.into()); - - // Wait 1 second to shut down properly - runtime.shutdown_timeout(std::time::Duration::from_secs(1)); - - result + .map_err(|e| e.into()) } pub fn daemonize(listener: IpcServer, mut cfg: Config) -> anyhow::Result<()> { diff --git a/datadog-sidecar/src/service/queue_id.rs b/datadog-sidecar/src/service/queue_id.rs index 33f815acce..2a8c8589aa 100644 --- a/datadog-sidecar/src/service/queue_id.rs +++ b/datadog-sidecar/src/service/queue_id.rs @@ -1,8 +1,8 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use rand::Rng; use serde::{Deserialize, Serialize}; +use std::sync::atomic::{AtomicU64, Ordering}; /// `QueueId` is a struct that represents a unique identifier for a queue. /// It contains a single field, `inner`, which is a 64-bit unsigned integer. @@ -12,11 +12,15 @@ pub struct QueueId { pub(crate) inner: u64, } +/// Global atomic counter for generating unique queue IDs +static QUEUE_ID_COUNTER: AtomicU64 = AtomicU64::new(1); + impl QueueId { /// Generates a new unique `QueueId`. /// - /// This method generates a random 64-bit unsigned integer between 1 (inclusive) and `u64::MAX` - /// (exclusive) and uses it as the `inner` value of the new `QueueId`. + /// This method uses an atomic counter to generate monotonically increasing + /// unique IDs. The counter starts at 1 and increments with each call. + /// This approach avoids TLS allocations from random number generators. /// /// # Examples /// @@ -27,7 +31,7 @@ impl QueueId { /// ``` pub fn new_unique() -> Self { Self { - inner: rand::thread_rng().gen_range(1u64..u64::MAX), + inner: QUEUE_ID_COUNTER.fetch_add(1, Ordering::Relaxed), } } } diff --git a/datadog-sidecar/src/setup/unix.rs b/datadog-sidecar/src/setup/unix.rs index 7ad083545f..b189f2c734 100644 --- a/datadog-sidecar/src/setup/unix.rs +++ b/datadog-sidecar/src/setup/unix.rs @@ -1,7 +1,10 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use std::sync::LazyLock; +use std::sync::{ + atomic::{AtomicU16, Ordering}, + LazyLock, +}; use std::{ env, fs, io, os::unix::{ @@ -83,7 +86,10 @@ impl Liaison for SharedDirLiaison { } fn ipc_per_process() -> Self { - static PROCESS_RANDOM_ID: LazyLock = LazyLock::new(rand::random); + // Use atomic counter instead of rand::random to avoid TLS allocation + static PROCESS_ID_COUNTER: AtomicU16 = AtomicU16::new(1); + static PROCESS_RANDOM_ID: LazyLock = + LazyLock::new(|| PROCESS_ID_COUNTER.fetch_add(1, Ordering::Relaxed)); let pid = std::process::id(); let liason_path = env::temp_dir().join(format!("libdatadog.{}.{pid}", *PROCESS_RANDOM_ID)); diff --git a/datadog-sidecar/src/unix.rs b/datadog-sidecar/src/unix.rs index 9db5513156..daebb68edb 100644 --- a/datadog-sidecar/src/unix.rs +++ b/datadog-sidecar/src/unix.rs @@ -15,7 +15,8 @@ use nix::sys::socket::{shutdown, Shutdown}; use std::io; use std::os::fd::RawFd; use std::os::unix::prelude::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::{Duration, Instant}; use tokio::net::{UnixListener, UnixStream}; @@ -51,20 +52,140 @@ pub fn start_master_listener_unix(master_pid: i32) -> io::Result<()> { let handle = thread::Builder::new() .name("dd-sidecar".into()) .spawn(move || { - let acquire_listener = move || -> io::Result<_> { - std_listener.set_nonblocking(true)?; - let listener = UnixListener::from_std(std_listener.try_clone()?)?; - let cancel = { - let fd = listener.as_raw_fd(); - move || stop_listening(fd) - }; - Ok((move |handler| accept_socket_loop(listener, handler), cancel)) + // Use blocking I/O - no shared tokio Runtime needed + // This makes the code fork-safe + use crate::service::sidecar_server::SidecarServer; + let runtime = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + error!("Failed to create runtime for server initialization: {}", e); + return; + } }; - let _ = enter_listener_loop(acquire_listener).map_err(|e| { - error!("enter_listener_loop failed: {}", e); - e - }); + let server = runtime.block_on(async { SidecarServer::default() }); + + // Shutdown flag to signal connection threads to stop + let shutdown_flag = Arc::new(AtomicBool::new(false)); + + // Track connection threads and stream fds for forceful shutdown + let mut handler_threads: Vec> = Vec::new(); + let active_fds: Arc>> = Arc::new(Mutex::new(Vec::new())); + + loop { + // Clean up finished threads to avoid accumulating handles + handler_threads.retain(|h| !h.is_finished()); + + match std_listener.accept() { + Ok((stream, _addr)) => { + // Store the raw fd so we can shutdown the connection later + let stream_fd = stream.as_raw_fd(); + if let Ok(mut fds) = active_fds.lock() { + fds.push(stream_fd); + } + + let server = server.clone(); + let shutdown = shutdown_flag.clone(); + let fds_cleanup = active_fds.clone(); + + // Spawn a thread for each connection + match thread::Builder::new().name("dd-conn-handler".into()).spawn( + move || { + // Create a minimal single-threaded runtime for this connection only + // This runtime will be dropped when the connection closes + let runtime = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(e) => { + error!("Failed to create runtime for connection: {}", e); + return; + } + }; + + runtime.block_on(async move { + // Check shutdown flag + if shutdown.load(Ordering::Relaxed) { + return; + } + + // Convert std UnixStream to tokio UnixStream + if let Err(e) = stream.set_nonblocking(true) { + error!("Failed to set nonblocking: {}", e); + return; + } + + let tokio_stream = match UnixStream::from_std(stream) { + Ok(s) => s, + Err(e) => { + error!("Failed to convert stream: {}", e); + return; + } + }; + + // Handle the connection using existing async infrastructure + use datadog_ipc::platform::AsyncChannel; + + // Use the cloned shared server + server + .accept_connection(AsyncChannel::from(tokio_stream)) + .await; + + // Remove this fd from active list when done + if let Ok(mut fds) = fds_cleanup.lock() { + fds.retain(|&fd| fd != stream_fd); + } + }); + }, + ) { + Ok(handle) => handler_threads.push(handle), + Err(e) => error!("Failed to spawn handler thread: {}", e), + } + } + Err(e) => { + match e.kind() { + io::ErrorKind::Interrupted => continue, + io::ErrorKind::InvalidInput => break, // Socket shut down + _ => { + error!("Accept error: {}", e); + thread::sleep(Duration::from_millis(100)); + } + } + } + } + } + + info!("Master listener stopped accepting connections"); + + // Signal all connection threads to stop + shutdown_flag.store(true, Ordering::Relaxed); + + // Forcefully shutdown all active connection streams + // This will cause accept_connection().await to complete immediately + if let Ok(fds) = active_fds.lock() { + info!("Forcefully closing {} active connections", fds.len()); + for &fd in fds.iter() { + // Shutdown both directions to force connection close + let _ = shutdown(fd, Shutdown::Both); + } + } + + // Shutdown the server + server.shutdown(); + + // Now join all connection threads - they should exit immediately + // because all connections were forcefully closed + info!("Waiting for {} connection threads to finish", handler_threads.len()); + for (i, handle) in handler_threads.into_iter().enumerate() { + if let Err(e) = handle.join() { + error!("Connection thread {} panicked: {:?}", i, e); + } + } + info!("All connection threads finished"); }) .map_err(io::Error::other)?; @@ -95,6 +216,7 @@ pub fn connect_worker_unix(master_pid: i32) -> io::Result { } } + error!("Worker failed to connect after 10 attempts"); Err(last_error.unwrap_or_else(|| io::Error::other("Connection failed"))) } @@ -112,28 +234,35 @@ pub fn shutdown_master_listener_unix() -> io::Result<()> { if let Some((handle, fd)) = listener_data { stop_listening(fd); + let _ = handle.join(); + } - // Try to join with a timeout to avoid hanging the shutdown - // We spawn a helper thread to do the join so we can implement a timeout - let (tx, rx) = std::sync::mpsc::channel(); - std::thread::spawn(move || { - let result = handle.join(); - let _ = tx.send(result); - }); - - // Wait up to 2 seconds for clean shutdown (including time for tokio runtime shutdown) - match rx.recv_timeout(Duration::from_millis(2000)) { - Ok(Ok(())) => { - // Clean shutdown - } - Ok(Err(_)) => { - error!("Listener thread panicked during shutdown"); - } - Err(_) => { - // Timeout - thread didn't exit in time - // This is acceptable as the OS will clean up when the process exits + Ok(()) +} + +/// Clears inherited resources in child processes after fork(). +/// With the new blocking I/O approach, we only need to forget the listener thread handle. +/// Each connection creates its own short-lived runtime, so there's no global runtime to inherit. +pub fn clear_inherited_listener_unix() -> io::Result<()> { + info!("Child process clearing inherited listener state"); + match MASTER_LISTENER.lock() { + Ok(mut guard) => { + if let Some((handle, _fd)) = guard.take() { + info!("Child forgetting inherited listener thread handle"); + // Forget the handle without joining - parent owns the thread + std::mem::forget(handle); + info!("Child successfully forgot listener handle"); + } else { + info!("Child found no listener to clear"); } } + Err(e) => { + error!( + "Failed to acquire lock for clearing inherited listener: {}", + e + ); + return Err(io::Error::other("Mutex poisoned")); + } } Ok(()) diff --git a/datadog-sidecar/src/windows.rs b/datadog-sidecar/src/windows.rs index 2b3f72457e..6fd8144e5c 100644 --- a/datadog-sidecar/src/windows.rs +++ b/datadog-sidecar/src/windows.rs @@ -147,9 +147,7 @@ async fn accept_pipe_loop( .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidInput))?; let raw_handle = pipe_listener.as_raw_handle(); - let mut pipe = unsafe { - NamedPipeServer::from_raw_handle(raw_handle) - }?; + let mut pipe = unsafe { NamedPipeServer::from_raw_handle(raw_handle) }?; loop { match pipe.connect().await { @@ -287,14 +285,14 @@ pub fn shutdown_master_listener_windows() -> io::Result<()> { stop_listening_on_handle(raw); let (tx, rx) = std::sync::mpsc::channel(); - std::thread::spawn(move || { + let helper_handle = std::thread::spawn(move || { let result = handle.join(); let _ = tx.send(result); }); // Wait up to 500ms for proper shutdown match rx.recv_timeout(Duration::from_millis(500)) { - Ok(Ok(())) => { } + Ok(Ok(())) => {} Ok(Err(_)) => { error!("Listener thread panicked during shutdown"); } @@ -302,6 +300,11 @@ pub fn shutdown_master_listener_windows() -> io::Result<()> { error!("Timeout waiting for listener thread to shut down: {}", err); } } + + // Join the helper thread to clean up its TLS + if let Err(_) = helper_handle.join() { + error!("Helper thread panicked"); + } } Ok(())