From f62c2bf7b67a0e9fc5aac933ceef42e422b0c204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Wed, 5 Nov 2025 10:33:34 +0100 Subject: [PATCH 1/2] feat: allow OpenTelemetry context access from SpanRef --- examples/otel_context.rs | 300 +++++++++++++++++++++++++++++++++++++++ src/layer.rs | 38 ++++- src/layer/filtered.rs | 1 + src/lib.rs | 3 + src/otel_context.rs | 72 ++++++++++ tests/otel_context.rs | 203 ++++++++++++++++++++++++++ 6 files changed, 615 insertions(+), 2 deletions(-) create mode 100644 examples/otel_context.rs create mode 100644 src/otel_context.rs create mode 100644 tests/otel_context.rs diff --git a/examples/otel_context.rs b/examples/otel_context.rs new file mode 100644 index 0000000..291b0ee --- /dev/null +++ b/examples/otel_context.rs @@ -0,0 +1,300 @@ +//! This example demonstrates how to use `OpenTelemetryContext` from a separate layer +//! to access OpenTelemetry context data from tracing spans. + +use opentelemetry::trace::{TraceContextExt, TracerProvider as _}; +use opentelemetry_sdk::trace::SdkTracerProvider; +use opentelemetry_stdout as stdout; +use std::sync::{Arc, Mutex, RwLock}; +use tracing::{debug, info, span, warn, Subscriber}; +use tracing::{dispatcher::WeakDispatch, level_filters::LevelFilter, Dispatch}; +use tracing_opentelemetry::{layer, OpenTelemetryContext}; +use tracing_subscriber::layer::Context; +use tracing_subscriber::prelude::*; +use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::Layer; + +/// A custom layer that demonstrates how to use OpenTelemetryContext +/// to extract OpenTelemetry contexts from span extensions. +#[derive(Clone)] +struct SpanAnalysisLayer { + /// Store span analysis results for demonstration + analysis_results: Arc>>, + /// Weak reference to the dispatcher for context extraction + dispatch: Arc>>, +} + +#[derive(Debug, Clone)] +struct SpanAnalysis { + span_name: String, + trace_id: String, + span_id: String, + is_sampled: bool, +} + +impl SpanAnalysisLayer { + fn new() -> Self { + Self { + analysis_results: Arc::new(Mutex::new(Vec::new())), + dispatch: Arc::new(RwLock::new(None)), + } + } + + fn get_analysis_results(&self) -> Vec { + self.analysis_results.lock().unwrap().clone() + } + + fn analyze_span_context(&self, span_name: &str, otel_context: &opentelemetry::Context) { + let span = otel_context.span(); + let span_context = span.span_context(); + + if span_context.is_valid() { + let analysis = SpanAnalysis { + span_name: span_name.to_string(), + trace_id: format!("{:032x}", span_context.trace_id()), + span_id: format!("{:016x}", span_context.span_id()), + is_sampled: span_context.is_sampled(), + }; + + println!( + "šŸ” Analyzing span '{}': trace_id={}, span_id={}, sampled={}", + analysis.span_name, + analysis.trace_id, + analysis.span_id, + span_context.trace_flags().is_sampled() + ); + + if let Ok(mut results) = self.analysis_results.lock() { + results.push(analysis); + } + } + } + + fn get_weak_dispatch(&self, get_default: bool) -> Option { + let read_guard = self.dispatch.read().unwrap(); + match read_guard.as_ref() { + Some(weak_dispatch) => Some(weak_dispatch.clone()), + // Note: This workaround is needed until https://github.com/tokio-rs/tracing/pull/3379 + // is merged and released. It should really be handled in on_register_dispatch + None => { + if !get_default { + None + } else { + drop(read_guard); + let mut dispatch = self.dispatch.write().unwrap(); + let weak_dispatch = Dispatch::default().downgrade(); + *dispatch = Some(weak_dispatch.clone()); + Some(weak_dispatch) + } + } + } + } +} + +impl Layer for SpanAnalysisLayer +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + fn on_new_span( + &self, + attrs: &tracing::span::Attributes<'_>, + id: &tracing::span::Id, + ctx: Context<'_, S>, + ) { + // Get the weak dispatch reference. + // + // Note: We can't use the Dispatch::default() workaround described above here since this + // method is called from inside a dispatcher::get_default block, and such calls can't be + // nested so we would get the global dispatcher instead, which can't downcast to the right + // types when extracting the OpenTelemetry context. This also means that we will miss + // analyzing the first span that is created šŸ¤·šŸ¼ā€ā™‚ļø + let Some(weak_dispatch) = self.get_weak_dispatch(false) else { + return; + }; + + // Get the span reference and extract OpenTelemetry context + if let Some(span_ref) = ctx.span(id) { + // This is the key functionality: using OpenTelemetryContext + // to extract the OpenTelemetry context from span extensions + let mut extensions = span_ref.extensions_mut(); + if let Some(otel_context) = + OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()) + { + self.analyze_span_context(attrs.metadata().name(), &otel_context); + } else { + println!( + "āš ļø Could not extract OpenTelemetry context for span '{}'", + attrs.metadata().name() + ); + } + } + } + + fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) { + if let Some(weak_dispatch) = self.get_weak_dispatch(true) { + if let Some(span_ref) = ctx.span(id) { + let mut extensions = span_ref.extensions_mut(); + if let Some(otel_context) = + OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()) + { + let span = otel_context.span(); + let span_context = span.span_context(); + if span_context.is_valid() { + println!( + "šŸ“ Entering span with trace_id: {:032x}, span_id: {:016x}", + span_context.trace_id(), + span_context.span_id() + ); + } + } + } + } + } + + fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) { + // Note: This does not work for Layer until https://github.com/tokio-rs/tracing/pull/3379 + // is merged and released, since `on_register_dispatch` is never called. + let mut dispatch = self.dispatch.write().unwrap(); + *dispatch = Some(subscriber.clone().downgrade()); + } +} + +fn setup_tracing() -> (impl Subscriber, SdkTracerProvider, SpanAnalysisLayer) { + // Create OpenTelemetry tracer that outputs to stdout + let provider = SdkTracerProvider::builder() + .with_simple_exporter(stdout::SpanExporter::default()) + .build(); + let tracer = provider.tracer("span_ref_ext_example"); + + // Create our custom analysis layer + let analysis_layer = SpanAnalysisLayer::new(); + + // Build the subscriber with multiple layers: + // 1. OpenTelemetry layer for trace export + // 2. Our custom analysis layer that uses OpenTelemetryContext + // 3. Formatting layer for console output + let subscriber = tracing_subscriber::registry() + .with(layer().with_tracer(tracer).with_filter(LevelFilter::DEBUG)) + .with(analysis_layer.clone()) + .with( + tracing_subscriber::fmt::layer() + .with_target(false) + .with_filter(LevelFilter::INFO), + ); + + (subscriber, provider, analysis_layer) +} + +fn simulate_application_work() { + // Create a root span for the main application work + let root_span = span!(tracing::Level::INFO, "application_main", version = "1.0.0"); + let _root_guard = root_span.enter(); + + info!("Starting application"); + + // Simulate some business logic with nested spans + { + let auth_span = span!(tracing::Level::DEBUG, "authenticate_user", user_id = 12345); + let _auth_guard = auth_span.enter(); + + debug!("Validating user credentials"); + + // Simulate authentication work + std::thread::sleep(std::time::Duration::from_millis(10)); + + info!("User authenticated successfully"); + } + + // Simulate database operations + { + let db_span = span!( + tracing::Level::DEBUG, + "database_query", + query = "SELECT * FROM users", + table = "users" + ); + let _db_guard = db_span.enter(); + + debug!("Executing database query"); + + // Nested span for connection management + { + let conn_span = span!(tracing::Level::DEBUG, "acquire_connection", pool_size = 10); + let _conn_guard = conn_span.enter(); + + debug!("Acquiring database connection from pool"); + std::thread::sleep(std::time::Duration::from_millis(5)); + } + + std::thread::sleep(std::time::Duration::from_millis(20)); + info!("Database query completed"); + } + + // Simulate some processing work + { + let process_span = span!( + tracing::Level::DEBUG, + "process_data", + records_count = 150, + batch_size = 50 + ); + let _process_guard = process_span.enter(); + + debug!("Processing user data"); + + for batch in 1..=3 { + let batch_span = span!( + tracing::Level::DEBUG, + "process_batch", + batch_number = batch, + batch_size = 50 + ); + let _batch_guard = batch_span.enter(); + + debug!("Processing batch {}", batch); + std::thread::sleep(std::time::Duration::from_millis(8)); + } + + info!("Data processing completed"); + } + + warn!("Application work completed"); +} + +fn main() { + println!( + "šŸš€ OpenTelemetryContext Example: Extracting OpenTelemetry Contexts from Separate Layer" + ); + println!("{}", "=".repeat(80)); + + // Setup tracing with our custom layer + let (subscriber, provider, analysis_layer) = setup_tracing(); + + tracing::subscriber::with_default(subscriber, || { + // Simulate application work that generates spans + simulate_application_work(); + }); + + // Ensure all spans are flushed + drop(provider); + + // Display the analysis results + println!("\nšŸ“Š Span Analysis Results:"); + println!("{}", "-".repeat(80)); + + let results = analysis_layer.get_analysis_results(); + for (i, analysis) in results.iter().enumerate() { + println!( + "{}. Span: '{}'\n Trace ID: {}\n Span ID: {}\n Sampled: {}\n", + i + 1, + analysis.span_name, + analysis.trace_id, + analysis.span_id, + analysis.is_sampled + ); + } + + println!( + "āœ… Example completed! Total spans analyzed: {}", + results.len() + ); +} diff --git a/src/layer.rs b/src/layer.rs index 1171ac8..38d72de 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -19,7 +19,7 @@ use tracing_core::{field, Event, Subscriber}; use tracing_log::NormalizeEvent; use tracing_subscriber::layer::Context; use tracing_subscriber::layer::Filter; -use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::registry::{ExtensionsMut, LookupSpan}; use tracing_subscriber::Layer; #[cfg(all(target_arch = "wasm32", not(target_os = "wasi")))] use web_time::Instant; @@ -109,6 +109,14 @@ pub(crate) struct WithContext { #[allow(clippy::type_complexity)] pub(crate) with_activated_context: fn(&tracing::Dispatch, &span::Id, f: &mut dyn FnMut(&mut OtelData)), + + /// + /// Ensures the given SpanId has been activated - that is, created in the OTel side of things, + /// and had its SpanBuilder consumed - and then provides access to the OtelData associated with it. + /// + #[allow(clippy::type_complexity)] + pub(crate) with_activated_context_extensions: + fn(&tracing::Dispatch, &mut ExtensionsMut<'_>, f: &mut dyn FnMut(&mut OtelData)), } impl WithContext { @@ -138,6 +146,20 @@ impl WithContext { ) { (self.with_activated_context)(dispatch, id, &mut f) } + + /// + /// Ensures the given SpanId has been activated - that is, created in the OTel side of things, + /// and had its SpanBuilder consumed - and then provides access to the OtelData associated with it. + /// + #[allow(clippy::type_complexity)] + pub(crate) fn with_activated_context_extensions( + &self, + dispatch: &tracing::Dispatch, + extensions: &mut ExtensionsMut<'_>, + mut f: impl FnMut(&mut OtelData), + ) { + (self.with_activated_context_extensions)(dispatch, extensions, &mut f) + } } fn str_to_span_kind(s: &str) -> Option { @@ -630,6 +652,7 @@ where with_context: WithContext { with_context: Self::get_context, with_activated_context: Self::get_activated_context, + with_activated_context_extensions: Self::get_activated_context_extensions, }, _registry: marker::PhantomData, } @@ -685,6 +708,8 @@ where with_context: WithContext { with_context: OpenTelemetryLayer::::get_context, with_activated_context: OpenTelemetryLayer::::get_activated_context, + with_activated_context_extensions: + OpenTelemetryLayer::::get_activated_context_extensions, }, _registry: self._registry, // cannot use ``..self` here due to different generics @@ -954,11 +979,20 @@ where .span(id) .expect("registry should have a span for the current ID"); + let mut extensions = span.extensions_mut(); + + Self::get_activated_context_extensions(dispatch, &mut extensions, f) + } + + fn get_activated_context_extensions( + dispatch: &tracing::Dispatch, + extensions: &mut ExtensionsMut<'_>, + f: &mut dyn FnMut(&mut OtelData), + ) { let layer = dispatch .downcast_ref::>() .expect("layer should downcast to expected type; this is a bug!"); - let mut extensions = span.extensions_mut(); if let Some(otel_data) = extensions.get_mut::() { // Activate the context layer.start_cx(otel_data); diff --git a/src/layer/filtered.rs b/src/layer/filtered.rs index 131cfb4..36048e7 100644 --- a/src/layer/filtered.rs +++ b/src/layer/filtered.rs @@ -144,6 +144,7 @@ where OtelDataState::Builder { builder, parent_cx: _, + status: _, } => { builder.attributes.get_or_insert(Vec::new()).push(key_value); } diff --git a/src/lib.rs b/src/lib.rs index 1b25c83..6bda424 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,6 +115,8 @@ mod metrics; /// Implementation of the trace::Layer as a source of OpenTelemetry data. mod layer; +/// OpenTelemetryContext which enables OpenTelemetry context extraction from span extensions. +mod otel_context; /// Span extension which enables OpenTelemetry context management. mod span_ext; @@ -127,6 +129,7 @@ pub use layer::{layer, FilteredOpenTelemetryLayer, OpenTelemetryLayer}; #[cfg(feature = "metrics")] pub use metrics::MetricsLayer; use opentelemetry::trace::TraceContextExt as _; +pub use otel_context::OpenTelemetryContext; pub use span_ext::{OpenTelemetrySpanExt, SetParentError}; /// Per-span OpenTelemetry data tracked by this crate. diff --git a/src/otel_context.rs b/src/otel_context.rs new file mode 100644 index 0000000..7fa9670 --- /dev/null +++ b/src/otel_context.rs @@ -0,0 +1,72 @@ +use crate::{layer::WithContext, OtelData, OtelDataState}; +use tracing::Dispatch; +use tracing_subscriber::registry::ExtensionsMut; + +/// Utility functions to allow tracing [`ExtensionsMut`]s to return +/// [OpenTelemetry] [`Context`]s. +/// +/// [`ExtensionsMut`]: tracing_subscriber::registry::ExtensionsMut +/// [OpenTelemetry]: https://opentelemetry.io +/// [`Context`]: opentelemetry::Context +pub struct OpenTelemetryContext {} +impl OpenTelemetryContext { + /// Extracts the OpenTelemetry [`Context`] associated with this span extensions. + /// + /// This method retrieves the OpenTelemetry context data that has been stored + /// for the span by the OpenTelemetry layer. The context includes the span's + /// OpenTelemetry span context, which contains trace ID, span ID, and other + /// trace-related metadata. + /// + /// [`Context`]: opentelemetry::Context + /// + /// # Examples + /// + /// ```rust + /// use tracing_opentelemetry::OpenTelemetryContext; + /// use tracing::dispatcher::WeakDispatch; + /// use tracing_subscriber::registry::LookupSpan; + /// use opentelemetry::trace::TraceContextExt; + /// + /// fn do_things_with_otel_context<'a, D>( + /// span_ref: &tracing_subscriber::registry::SpanRef<'a, D>, + /// weak_dispatch: &WeakDispatch + /// ) where + /// D: LookupSpan<'a>, + /// { + /// if let Some(otel_context) = OpenTelemetryContext::context(&mut span_ref.extensions_mut(), &weak_dispatch.upgrade()) { + /// // Process the extracted context + /// let span = otel_context.span(); + /// let span_context = span.span_context(); + /// if span_context.is_valid() { + /// // Handle the valid context... + /// } + /// } + /// } + /// ``` + /// + /// # Use Cases + /// + /// - When working with multiple subscriber configurations + /// - When implementing advanced tracing middleware that manages multiple dispatches + pub fn context( + extensions: &mut ExtensionsMut<'_>, + dispatch: &Option, + ) -> Option { + dispatch.as_ref().and_then(|dispatch| { + let mut cx = None; + if let Some(get_context) = dispatch.downcast_ref::() { + // If our span hasn't been built, we should build it and get the context in one call + get_context.with_activated_context_extensions( + dispatch, + extensions, + |data: &mut OtelData| { + if let OtelDataState::Context { current_cx } = &data.state { + cx = Some(current_cx.clone()); + } + }, + ); + } + cx + }) + } +} diff --git a/tests/otel_context.rs b/tests/otel_context.rs new file mode 100644 index 0000000..156f5de --- /dev/null +++ b/tests/otel_context.rs @@ -0,0 +1,203 @@ +use opentelemetry::trace::{TraceContextExt, TracerProvider as _}; +use opentelemetry_sdk::{ + error::OTelSdkResult, + trace::{SdkTracerProvider, SpanData, SpanExporter, Tracer}, +}; +use std::sync::{Arc, Mutex, RwLock}; +use tracing::Subscriber; +use tracing::{dispatcher::WeakDispatch, level_filters::LevelFilter, Dispatch}; +use tracing_opentelemetry::{layer, OpenTelemetryContext}; +use tracing_subscriber::layer::Context; +use tracing_subscriber::prelude::*; +use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::Layer; + +#[derive(Clone, Default, Debug)] +struct TestExporter(Arc>>); + +impl SpanExporter for TestExporter { + async fn export(&self, mut batch: Vec) -> OTelSdkResult { + let spans = self.0.clone(); + if let Ok(mut inner) = spans.lock() { + inner.append(&mut batch); + } + Ok(()) + } +} + +/// A custom tracing layer that uses OpenTelemetryContext to access OpenTelemetry contexts +/// from span extensions. This simulates a separate layer that needs to interact with +/// OpenTelemetry data managed by the OpenTelemetryLayer. +#[derive(Clone)] +struct CustomLayer { + /// Store extracted contexts for verification + extracted_contexts: Arc>>, + dispatch: Arc>>, +} + +impl CustomLayer { + fn new() -> Self { + Self { + extracted_contexts: Arc::new(Mutex::new(Vec::new())), + dispatch: Arc::new(RwLock::new(None)), + } + } + + fn get_extracted_contexts(&self) -> Vec { + self.extracted_contexts.lock().unwrap().clone() + } +} + +impl Layer for CustomLayer +where + S: Subscriber + for<'span> LookupSpan<'span>, +{ + fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) { + let weak_dispatch = { + let read_guard = self.dispatch.read().unwrap(); + match read_guard.as_ref() { + Some(weak_dispatch) => weak_dispatch.clone(), + None => { + drop(read_guard); + let mut dispatch = self.dispatch.write().unwrap(); + let weak_dispatch = Dispatch::default().downgrade(); + *dispatch = Some(weak_dispatch.clone()); + weak_dispatch + } + } + }; + + // Get the span reference from the registry when the span is entered + if let Some(span_ref) = ctx.span(id) { + // Use OpenTelemetryContext to extract the OpenTelemetry context + let mut extensions = span_ref.extensions_mut(); + let otel_context = + OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()); + if let Some(otel_context) = otel_context { + // Store the extracted context for verification + if let Ok(mut contexts) = self.extracted_contexts.lock() { + contexts.push(otel_context); + } + } + } + } + + fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) { + // Note: This does not work for Layer until https://github.com/tokio-rs/tracing/pull/3379 + // is merged and released, since `on_register_dispatch` is never called. + let mut dispatch = self.dispatch.write().unwrap(); + *dispatch = Some(subscriber.clone().downgrade()); + } +} + +fn test_tracer_with_custom_layer() -> ( + Tracer, + SdkTracerProvider, + TestExporter, + CustomLayer, + impl Subscriber, +) { + let exporter = TestExporter::default(); + let provider = SdkTracerProvider::builder() + .with_simple_exporter(exporter.clone()) + .build(); + let tracer = provider.tracer("test"); + + let custom_layer = CustomLayer::new(); + + let subscriber = tracing_subscriber::registry() + .with( + layer() + .with_tracer(tracer.clone()) + .with_filter(LevelFilter::DEBUG), + ) + .with(custom_layer.clone()) + .with(tracing_subscriber::fmt::layer().with_filter(LevelFilter::TRACE)); + + (tracer, provider, exporter, custom_layer, subscriber) +} + +#[test] +fn test_span_ref_ext_from_separate_layer() { + let (_tracer, provider, exporter, custom_layer, subscriber) = test_tracer_with_custom_layer(); + + tracing::subscriber::with_default(subscriber, || { + // Create a span that will be processed by both the OpenTelemetry layer + // and our custom layer + let _span = tracing::debug_span!("test_span", test_field = "test_value").entered(); + + // Create a child span to test hierarchical context extraction + let _child_span = tracing::debug_span!("child_span", child_field = "child_value").entered(); + }); + + drop(provider); // flush all spans + + // Verify that spans were exported by the OpenTelemetry layer + let spans = exporter.0.lock().unwrap(); + assert_eq!(spans.len(), 2, "Expected 2 spans to be exported"); + + // Verify that our custom layer extracted OpenTelemetry contexts + let extracted_contexts = custom_layer.get_extracted_contexts(); + assert_eq!( + extracted_contexts.len(), + 2, + "Expected 2 contexts to be extracted by custom layer" + ); + + // Verify that the extracted contexts contain valid span contexts + for (i, context) in extracted_contexts.iter().enumerate() { + let span = context.span(); + let span_context = span.span_context(); + assert!( + span_context.is_valid(), + "Context {} should have a valid span context", + i + ); + assert_ne!( + span_context.trace_id(), + opentelemetry::trace::TraceId::INVALID, + "Context {} should have a valid trace ID", + i + ); + assert_ne!( + span_context.span_id(), + opentelemetry::trace::SpanId::INVALID, + "Context {} should have a valid span ID", + i + ); + } + + // Verify that the contexts correspond to the exported spans + let parent_span = spans.iter().find(|s| s.name == "test_span").unwrap(); + let child_span = spans.iter().find(|s| s.name == "child_span").unwrap(); + + // The first extracted context should correspond to the parent span + let parent_context = &extracted_contexts[0]; + assert_eq!( + parent_context.span().span_context().span_id(), + parent_span.span_context.span_id(), + "Parent context should match parent span" + ); + + // The second extracted context should correspond to the child span + let child_context = &extracted_contexts[1]; + assert_eq!( + child_context.span().span_context().span_id(), + child_span.span_context.span_id(), + "Child context should match child span" + ); + + // Verify that both spans share the same trace ID (hierarchical relationship) + assert_eq!( + parent_span.span_context.trace_id(), + child_span.span_context.trace_id(), + "Parent and child spans should share the same trace ID" + ); + + // Verify that the child span has the parent span as its parent + assert_eq!( + child_span.parent_span_id, + parent_span.span_context.span_id(), + "Child span should have parent span as its parent" + ); +} From c18a70e4aa1fd30a6fb92f368278cf7736deb707 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Tue, 2 Dec 2025 11:49:25 +0100 Subject: [PATCH 2/2] chore: update SpanRef context access based on feedback --- Cargo.toml | 2 +- examples/otel_context.rs | 99 ++++++++++---------------------- src/layer.rs | 58 ++++++++++++++++--- src/lib.rs | 4 +- src/otel_context.rs | 118 +++++++++++++++++++-------------------- tests/otel_context.rs | 66 ++++++++-------------- 6 files changed, 162 insertions(+), 185 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 297b0c2..222fcb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ opentelemetry_sdk = { version = "0.31.0", default-features = false, features = [ ] } tracing = { version = "0.1.35", default-features = false, features = ["std"] } tracing-core = "0.1.28" -tracing-subscriber = { version = "0.3.0", default-features = false, features = [ +tracing-subscriber = { version = "0.3.22", default-features = false, features = [ "registry", "std", ] } diff --git a/examples/otel_context.rs b/examples/otel_context.rs index 291b0ee..0c59419 100644 --- a/examples/otel_context.rs +++ b/examples/otel_context.rs @@ -4,10 +4,10 @@ use opentelemetry::trace::{TraceContextExt, TracerProvider as _}; use opentelemetry_sdk::trace::SdkTracerProvider; use opentelemetry_stdout as stdout; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex, OnceLock}; use tracing::{debug, info, span, warn, Subscriber}; use tracing::{dispatcher::WeakDispatch, level_filters::LevelFilter, Dispatch}; -use tracing_opentelemetry::{layer, OpenTelemetryContext}; +use tracing_opentelemetry::{get_otel_context, layer}; use tracing_subscriber::layer::Context; use tracing_subscriber::prelude::*; use tracing_subscriber::registry::LookupSpan; @@ -15,12 +15,12 @@ use tracing_subscriber::Layer; /// A custom layer that demonstrates how to use OpenTelemetryContext /// to extract OpenTelemetry contexts from span extensions. -#[derive(Clone)] +#[derive(Clone, Default)] struct SpanAnalysisLayer { /// Store span analysis results for demonstration analysis_results: Arc>>, /// Weak reference to the dispatcher for context extraction - dispatch: Arc>>, + dispatch: Arc>, } #[derive(Debug, Clone)] @@ -32,13 +32,6 @@ struct SpanAnalysis { } impl SpanAnalysisLayer { - fn new() -> Self { - Self { - analysis_results: Arc::new(Mutex::new(Vec::new())), - dispatch: Arc::new(RwLock::new(None)), - } - } - fn get_analysis_results(&self) -> Vec { self.analysis_results.lock().unwrap().clone() } @@ -68,46 +61,23 @@ impl SpanAnalysisLayer { } } } - - fn get_weak_dispatch(&self, get_default: bool) -> Option { - let read_guard = self.dispatch.read().unwrap(); - match read_guard.as_ref() { - Some(weak_dispatch) => Some(weak_dispatch.clone()), - // Note: This workaround is needed until https://github.com/tokio-rs/tracing/pull/3379 - // is merged and released. It should really be handled in on_register_dispatch - None => { - if !get_default { - None - } else { - drop(read_guard); - let mut dispatch = self.dispatch.write().unwrap(); - let weak_dispatch = Dispatch::default().downgrade(); - *dispatch = Some(weak_dispatch.clone()); - Some(weak_dispatch) - } - } - } - } } impl Layer for SpanAnalysisLayer where S: Subscriber + for<'span> LookupSpan<'span>, { + fn on_register_dispatch(&self, subscriber: &Dispatch) { + let _ = self.dispatch.set(subscriber.downgrade()); + } + fn on_new_span( &self, attrs: &tracing::span::Attributes<'_>, id: &tracing::span::Id, ctx: Context<'_, S>, ) { - // Get the weak dispatch reference. - // - // Note: We can't use the Dispatch::default() workaround described above here since this - // method is called from inside a dispatcher::get_default block, and such calls can't be - // nested so we would get the global dispatcher instead, which can't downcast to the right - // types when extracting the OpenTelemetry context. This also means that we will miss - // analyzing the first span that is created šŸ¤·šŸ¼ā€ā™‚ļø - let Some(weak_dispatch) = self.get_weak_dispatch(false) else { + let Some(weak_dispatch) = self.dispatch.get() else { return; }; @@ -116,46 +86,39 @@ where // This is the key functionality: using OpenTelemetryContext // to extract the OpenTelemetry context from span extensions let mut extensions = span_ref.extensions_mut(); - if let Some(otel_context) = - OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()) - { - self.analyze_span_context(attrs.metadata().name(), &otel_context); - } else { - println!( - "āš ļø Could not extract OpenTelemetry context for span '{}'", - attrs.metadata().name() - ); + if let Some(dispatch) = weak_dispatch.upgrade() { + if let Some(otel_context) = get_otel_context(&mut extensions, &dispatch) { + self.analyze_span_context(attrs.metadata().name(), &otel_context); + } else { + println!( + "āš ļø Could not extract OpenTelemetry context for span '{}'", + attrs.metadata().name() + ); + } } } } fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) { - if let Some(weak_dispatch) = self.get_weak_dispatch(true) { + if let Some(weak_dispatch) = self.dispatch.get() { if let Some(span_ref) = ctx.span(id) { let mut extensions = span_ref.extensions_mut(); - if let Some(otel_context) = - OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()) - { - let span = otel_context.span(); - let span_context = span.span_context(); - if span_context.is_valid() { - println!( - "šŸ“ Entering span with trace_id: {:032x}, span_id: {:016x}", - span_context.trace_id(), - span_context.span_id() - ); + if let Some(dispatch) = weak_dispatch.upgrade() { + if let Some(otel_context) = get_otel_context(&mut extensions, &dispatch) { + let span = otel_context.span(); + let span_context = span.span_context(); + if span_context.is_valid() { + println!( + "šŸ“ Entering span with trace_id: {:032x}, span_id: {:016x}", + span_context.trace_id(), + span_context.span_id() + ); + } } } } } } - - fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) { - // Note: This does not work for Layer until https://github.com/tokio-rs/tracing/pull/3379 - // is merged and released, since `on_register_dispatch` is never called. - let mut dispatch = self.dispatch.write().unwrap(); - *dispatch = Some(subscriber.clone().downgrade()); - } } fn setup_tracing() -> (impl Subscriber, SdkTracerProvider, SpanAnalysisLayer) { @@ -166,7 +129,7 @@ fn setup_tracing() -> (impl Subscriber, SdkTracerProvider, SpanAnalysisLayer) { let tracer = provider.tracer("span_ref_ext_example"); // Create our custom analysis layer - let analysis_layer = SpanAnalysisLayer::new(); + let analysis_layer = SpanAnalysisLayer::default(); // Build the subscriber with multiple layers: // 1. OpenTelemetry layer for trace export diff --git a/src/layer.rs b/src/layer.rs index 38d72de..6461878 100644 --- a/src/layer.rs +++ b/src/layer.rs @@ -112,11 +112,11 @@ pub(crate) struct WithContext { /// /// Ensures the given SpanId has been activated - that is, created in the OTel side of things, - /// and had its SpanBuilder consumed - and then provides access to the OtelData associated with it. + /// and had its SpanBuilder consumed - and then provides access to the OTel Context associated with it. /// #[allow(clippy::type_complexity)] - pub(crate) with_activated_context_extensions: - fn(&tracing::Dispatch, &mut ExtensionsMut<'_>, f: &mut dyn FnMut(&mut OtelData)), + pub(crate) with_activated_otel_context: + fn(&tracing::Dispatch, &mut ExtensionsMut<'_>, f: &mut dyn FnMut(&OtelContext)), } impl WithContext { @@ -152,13 +152,13 @@ impl WithContext { /// and had its SpanBuilder consumed - and then provides access to the OtelData associated with it. /// #[allow(clippy::type_complexity)] - pub(crate) fn with_activated_context_extensions( + pub(crate) fn with_activated_otel_context( &self, dispatch: &tracing::Dispatch, extensions: &mut ExtensionsMut<'_>, - mut f: impl FnMut(&mut OtelData), + mut f: impl FnMut(&OtelContext), ) { - (self.with_activated_context_extensions)(dispatch, extensions, &mut f) + (self.with_activated_otel_context)(dispatch, extensions, &mut f) } } @@ -652,7 +652,7 @@ where with_context: WithContext { with_context: Self::get_context, with_activated_context: Self::get_activated_context, - with_activated_context_extensions: Self::get_activated_context_extensions, + with_activated_otel_context: Self::get_activated_otel_context, }, _registry: marker::PhantomData, } @@ -708,8 +708,8 @@ where with_context: WithContext { with_context: OpenTelemetryLayer::::get_context, with_activated_context: OpenTelemetryLayer::::get_activated_context, - with_activated_context_extensions: - OpenTelemetryLayer::::get_activated_context_extensions, + with_activated_otel_context: + OpenTelemetryLayer::::get_activated_otel_context, }, _registry: self._registry, // cannot use ``..self` here due to different generics @@ -967,6 +967,18 @@ where } } + /// Retrieves the OpenTelemetry data for a span and activates its context before calling + /// the provided function. + /// + /// This function retrieves the span from the subscriber's registry using the provided + /// span ID, activates the OTel `Context` in the span's `OtelData` if present, and then + /// applies the callback function `f` to the `OtelData`. + /// + /// # Parameters + /// + /// * `dispatch` - The tracing dispatch to downcast and retrieve the span from + /// * `id` - The span ID to look up in the registry + /// * `f` - The closure to invoke with mutable access to the span's `OtelData` fn get_activated_context( dispatch: &tracing::Dispatch, id: &span::Id, @@ -984,6 +996,34 @@ where Self::get_activated_context_extensions(dispatch, &mut extensions, f) } + /// Retrieves the activated OpenTelemetry context from a span's extensions and passes it + /// to the provided function. + /// + /// This method activates the context and extracts the current OTel `Context` from the + /// `OtelData` state if present, and then applies the callback function `f` to a reference + /// to the OTel `Context`. + /// + /// # Parameters + /// + /// * `dispatch` - The tracing dispatch to downcast to the `OpenTelemetryLayer` + /// * `extensions` - Mutable reference to the span's extensions containing `OtelData` + /// * `f` - The closure to invoke with a reference to the OTel `Context` + fn get_activated_otel_context( + dispatch: &tracing::Dispatch, + extensions: &mut ExtensionsMut<'_>, + f: &mut dyn FnMut(&OtelContext), + ) { + Self::get_activated_context_extensions( + dispatch, + extensions, + &mut |otel_data: &mut OtelData| { + if let OtelDataState::Context { current_cx } = &otel_data.state { + f(current_cx) + } + }, + ); + } + fn get_activated_context_extensions( dispatch: &tracing::Dispatch, extensions: &mut ExtensionsMut<'_>, diff --git a/src/lib.rs b/src/lib.rs index 6bda424..5146901 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -115,7 +115,7 @@ mod metrics; /// Implementation of the trace::Layer as a source of OpenTelemetry data. mod layer; -/// OpenTelemetryContext which enables OpenTelemetry context extraction from span extensions. +/// Function which enables OpenTelemetry context extraction from span extensions. mod otel_context; /// Span extension which enables OpenTelemetry context management. mod span_ext; @@ -129,7 +129,7 @@ pub use layer::{layer, FilteredOpenTelemetryLayer, OpenTelemetryLayer}; #[cfg(feature = "metrics")] pub use metrics::MetricsLayer; use opentelemetry::trace::TraceContextExt as _; -pub use otel_context::OpenTelemetryContext; +pub use otel_context::get_otel_context; pub use span_ext::{OpenTelemetrySpanExt, SetParentError}; /// Per-span OpenTelemetry data tracked by this crate. diff --git a/src/otel_context.rs b/src/otel_context.rs index 7fa9670..c3c8d08 100644 --- a/src/otel_context.rs +++ b/src/otel_context.rs @@ -1,4 +1,4 @@ -use crate::{layer::WithContext, OtelData, OtelDataState}; +use crate::layer::WithContext; use tracing::Dispatch; use tracing_subscriber::registry::ExtensionsMut; @@ -8,65 +8,61 @@ use tracing_subscriber::registry::ExtensionsMut; /// [`ExtensionsMut`]: tracing_subscriber::registry::ExtensionsMut /// [OpenTelemetry]: https://opentelemetry.io /// [`Context`]: opentelemetry::Context -pub struct OpenTelemetryContext {} -impl OpenTelemetryContext { - /// Extracts the OpenTelemetry [`Context`] associated with this span extensions. - /// - /// This method retrieves the OpenTelemetry context data that has been stored - /// for the span by the OpenTelemetry layer. The context includes the span's - /// OpenTelemetry span context, which contains trace ID, span ID, and other - /// trace-related metadata. - /// - /// [`Context`]: opentelemetry::Context - /// - /// # Examples - /// - /// ```rust - /// use tracing_opentelemetry::OpenTelemetryContext; - /// use tracing::dispatcher::WeakDispatch; - /// use tracing_subscriber::registry::LookupSpan; - /// use opentelemetry::trace::TraceContextExt; - /// - /// fn do_things_with_otel_context<'a, D>( - /// span_ref: &tracing_subscriber::registry::SpanRef<'a, D>, - /// weak_dispatch: &WeakDispatch - /// ) where - /// D: LookupSpan<'a>, - /// { - /// if let Some(otel_context) = OpenTelemetryContext::context(&mut span_ref.extensions_mut(), &weak_dispatch.upgrade()) { - /// // Process the extracted context - /// let span = otel_context.span(); - /// let span_context = span.span_context(); - /// if span_context.is_valid() { - /// // Handle the valid context... - /// } - /// } - /// } - /// ``` - /// - /// # Use Cases - /// - /// - When working with multiple subscriber configurations - /// - When implementing advanced tracing middleware that manages multiple dispatches - pub fn context( - extensions: &mut ExtensionsMut<'_>, - dispatch: &Option, - ) -> Option { - dispatch.as_ref().and_then(|dispatch| { - let mut cx = None; - if let Some(get_context) = dispatch.downcast_ref::() { - // If our span hasn't been built, we should build it and get the context in one call - get_context.with_activated_context_extensions( - dispatch, - extensions, - |data: &mut OtelData| { - if let OtelDataState::Context { current_cx } = &data.state { - cx = Some(current_cx.clone()); - } - }, - ); - } - cx - }) +/// +/// Extracts the OpenTelemetry [`Context`] associated with this span extensions. +/// +/// This method retrieves the OpenTelemetry context data that has been stored +/// for the span by the OpenTelemetry layer. The context includes the span's +/// OpenTelemetry span context, which contains trace ID, span ID, and other +/// trace-related metadata. +/// +/// [`Context`]: opentelemetry::Context +/// +/// # Examples +/// +/// ```rust +/// use tracing_opentelemetry::get_otel_context; +/// use tracing::dispatcher::WeakDispatch; +/// use tracing_subscriber::registry::LookupSpan; +/// use opentelemetry::trace::TraceContextExt; +/// +/// fn do_things_with_otel_context<'a, D>( +/// span_ref: &tracing_subscriber::registry::SpanRef<'a, D>, +/// weak_dispatch: &WeakDispatch +/// ) where +/// D: LookupSpan<'a>, +/// { +/// if let Some(dispatch) = weak_dispatch.upgrade() { +/// if let Some(otel_context) = get_otel_context(&mut span_ref.extensions_mut(), &dispatch) { +/// // Process the extracted context +/// let span = otel_context.span(); +/// let span_context = span.span_context(); +/// if span_context.is_valid() { +/// // Handle the valid context... +/// } +/// } +/// } +/// } +/// ``` +/// +/// # Use Cases +/// +/// - When working with multiple subscriber configurations +/// - When implementing advanced tracing middleware that manages multiple dispatches +pub fn get_otel_context( + extensions: &mut ExtensionsMut<'_>, + dispatch: &Dispatch, +) -> Option { + let mut cx = None; + if let Some(get_context) = dispatch.downcast_ref::() { + // If our span hasn't been built, we should build it and get the context in one call + get_context.with_activated_otel_context( + dispatch, + extensions, + |current_cx: &opentelemetry::Context| { + cx = Some(current_cx.clone()); + }, + ); } + cx } diff --git a/tests/otel_context.rs b/tests/otel_context.rs index 156f5de..63495df 100644 --- a/tests/otel_context.rs +++ b/tests/otel_context.rs @@ -3,10 +3,10 @@ use opentelemetry_sdk::{ error::OTelSdkResult, trace::{SdkTracerProvider, SpanData, SpanExporter, Tracer}, }; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, Mutex, OnceLock}; use tracing::Subscriber; use tracing::{dispatcher::WeakDispatch, level_filters::LevelFilter, Dispatch}; -use tracing_opentelemetry::{layer, OpenTelemetryContext}; +use tracing_opentelemetry::{get_otel_context, layer}; use tracing_subscriber::layer::Context; use tracing_subscriber::prelude::*; use tracing_subscriber::registry::LookupSpan; @@ -25,24 +25,17 @@ impl SpanExporter for TestExporter { } } -/// A custom tracing layer that uses OpenTelemetryContext to access OpenTelemetry contexts +/// A custom tracing layer that uses get_otel_context to access OpenTelemetry contexts /// from span extensions. This simulates a separate layer that needs to interact with /// OpenTelemetry data managed by the OpenTelemetryLayer. -#[derive(Clone)] +#[derive(Clone, Default)] struct CustomLayer { /// Store extracted contexts for verification extracted_contexts: Arc>>, - dispatch: Arc>>, + dispatch: Arc>, } impl CustomLayer { - fn new() -> Self { - Self { - extracted_contexts: Arc::new(Mutex::new(Vec::new())), - dispatch: Arc::new(RwLock::new(None)), - } - } - fn get_extracted_contexts(&self) -> Vec { self.extracted_contexts.lock().unwrap().clone() } @@ -52,42 +45,27 @@ impl Layer for CustomLayer where S: Subscriber + for<'span> LookupSpan<'span>, { + fn on_register_dispatch(&self, subscriber: &Dispatch) { + let _ = self.dispatch.set(subscriber.downgrade()); + } + fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) { - let weak_dispatch = { - let read_guard = self.dispatch.read().unwrap(); - match read_guard.as_ref() { - Some(weak_dispatch) => weak_dispatch.clone(), - None => { - drop(read_guard); - let mut dispatch = self.dispatch.write().unwrap(); - let weak_dispatch = Dispatch::default().downgrade(); - *dispatch = Some(weak_dispatch.clone()); - weak_dispatch - } - } - }; - - // Get the span reference from the registry when the span is entered - if let Some(span_ref) = ctx.span(id) { - // Use OpenTelemetryContext to extract the OpenTelemetry context - let mut extensions = span_ref.extensions_mut(); - let otel_context = - OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()); - if let Some(otel_context) = otel_context { - // Store the extracted context for verification - if let Ok(mut contexts) = self.extracted_contexts.lock() { - contexts.push(otel_context); + if let Some(weak_dispatch) = self.dispatch.get() { + // Get the span reference from the registry when the span is entered + if let Some(span_ref) = ctx.span(id) { + // Use OpenTelemetryContext to extract the OpenTelemetry context + let mut extensions = span_ref.extensions_mut(); + if let Some(dispatch) = weak_dispatch.upgrade() { + if let Some(otel_context) = get_otel_context(&mut extensions, &dispatch) { + // Store the extracted context for verification + if let Ok(mut contexts) = self.extracted_contexts.lock() { + contexts.push(otel_context); + } + } } } } } - - fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) { - // Note: This does not work for Layer until https://github.com/tokio-rs/tracing/pull/3379 - // is merged and released, since `on_register_dispatch` is never called. - let mut dispatch = self.dispatch.write().unwrap(); - *dispatch = Some(subscriber.clone().downgrade()); - } } fn test_tracer_with_custom_layer() -> ( @@ -103,7 +81,7 @@ fn test_tracer_with_custom_layer() -> ( .build(); let tracer = provider.tracer("test"); - let custom_layer = CustomLayer::new(); + let custom_layer = CustomLayer::default(); let subscriber = tracing_subscriber::registry() .with(