-
Notifications
You must be signed in to change notification settings - Fork 123
feat: allow OpenTelemetry context access from SpanRef #234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
bantonsson
wants to merge
2
commits into
tokio-rs:v0.1.x
Choose a base branch
from
bantonsson:ban/spanrefext-for-otel-context
base: v0.1.x
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,300 @@ | ||
| //! This example demonstrates how to use `OpenTelemetryContext` from a separate layer | ||
| //! to access OpenTelemetry context data from tracing spans. | ||
|
|
||
| use opentelemetry::trace::{TraceContextExt, TracerProvider as _}; | ||
| use opentelemetry_sdk::trace::SdkTracerProvider; | ||
| use opentelemetry_stdout as stdout; | ||
| use std::sync::{Arc, Mutex, RwLock}; | ||
| use tracing::{debug, info, span, warn, Subscriber}; | ||
| use tracing::{dispatcher::WeakDispatch, level_filters::LevelFilter, Dispatch}; | ||
| use tracing_opentelemetry::{layer, OpenTelemetryContext}; | ||
| use tracing_subscriber::layer::Context; | ||
| use tracing_subscriber::prelude::*; | ||
| use tracing_subscriber::registry::LookupSpan; | ||
| use tracing_subscriber::Layer; | ||
|
|
||
| /// A custom layer that demonstrates how to use OpenTelemetryContext | ||
| /// to extract OpenTelemetry contexts from span extensions. | ||
| #[derive(Clone)] | ||
| struct SpanAnalysisLayer { | ||
| /// Store span analysis results for demonstration | ||
| analysis_results: Arc<Mutex<Vec<SpanAnalysis>>>, | ||
| /// Weak reference to the dispatcher for context extraction | ||
| dispatch: Arc<RwLock<Option<WeakDispatch>>>, | ||
| } | ||
|
|
||
| #[derive(Debug, Clone)] | ||
| struct SpanAnalysis { | ||
| span_name: String, | ||
| trace_id: String, | ||
| span_id: String, | ||
| is_sampled: bool, | ||
| } | ||
|
|
||
| impl SpanAnalysisLayer { | ||
| fn new() -> Self { | ||
| Self { | ||
| analysis_results: Arc::new(Mutex::new(Vec::new())), | ||
| dispatch: Arc::new(RwLock::new(None)), | ||
| } | ||
| } | ||
|
|
||
| fn get_analysis_results(&self) -> Vec<SpanAnalysis> { | ||
| self.analysis_results.lock().unwrap().clone() | ||
| } | ||
|
|
||
| fn analyze_span_context(&self, span_name: &str, otel_context: &opentelemetry::Context) { | ||
| let span = otel_context.span(); | ||
| let span_context = span.span_context(); | ||
|
|
||
| if span_context.is_valid() { | ||
| let analysis = SpanAnalysis { | ||
| span_name: span_name.to_string(), | ||
| trace_id: format!("{:032x}", span_context.trace_id()), | ||
| span_id: format!("{:016x}", span_context.span_id()), | ||
| is_sampled: span_context.is_sampled(), | ||
| }; | ||
|
|
||
| println!( | ||
| "🔍 Analyzing span '{}': trace_id={}, span_id={}, sampled={}", | ||
| analysis.span_name, | ||
| analysis.trace_id, | ||
| analysis.span_id, | ||
| span_context.trace_flags().is_sampled() | ||
| ); | ||
|
|
||
| if let Ok(mut results) = self.analysis_results.lock() { | ||
| results.push(analysis); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn get_weak_dispatch(&self, get_default: bool) -> Option<WeakDispatch> { | ||
| let read_guard = self.dispatch.read().unwrap(); | ||
| match read_guard.as_ref() { | ||
| Some(weak_dispatch) => Some(weak_dispatch.clone()), | ||
| // Note: This workaround is needed until https://github.com/tokio-rs/tracing/pull/3379 | ||
| // is merged and released. It should really be handled in on_register_dispatch | ||
| None => { | ||
| if !get_default { | ||
| None | ||
| } else { | ||
| drop(read_guard); | ||
| let mut dispatch = self.dispatch.write().unwrap(); | ||
| let weak_dispatch = Dispatch::default().downgrade(); | ||
| *dispatch = Some(weak_dispatch.clone()); | ||
| Some(weak_dispatch) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<S> Layer<S> for SpanAnalysisLayer | ||
| where | ||
| S: Subscriber + for<'span> LookupSpan<'span>, | ||
| { | ||
| fn on_new_span( | ||
| &self, | ||
| attrs: &tracing::span::Attributes<'_>, | ||
| id: &tracing::span::Id, | ||
| ctx: Context<'_, S>, | ||
| ) { | ||
| // Get the weak dispatch reference. | ||
| // | ||
| // Note: We can't use the Dispatch::default() workaround described above here since this | ||
| // method is called from inside a dispatcher::get_default block, and such calls can't be | ||
| // nested so we would get the global dispatcher instead, which can't downcast to the right | ||
| // types when extracting the OpenTelemetry context. This also means that we will miss | ||
| // analyzing the first span that is created 🤷🏼♂️ | ||
| let Some(weak_dispatch) = self.get_weak_dispatch(false) else { | ||
| return; | ||
| }; | ||
|
|
||
| // Get the span reference and extract OpenTelemetry context | ||
| if let Some(span_ref) = ctx.span(id) { | ||
| // This is the key functionality: using OpenTelemetryContext | ||
| // to extract the OpenTelemetry context from span extensions | ||
| let mut extensions = span_ref.extensions_mut(); | ||
| if let Some(otel_context) = | ||
| OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()) | ||
| { | ||
| self.analyze_span_context(attrs.metadata().name(), &otel_context); | ||
| } else { | ||
| println!( | ||
| "⚠️ Could not extract OpenTelemetry context for span '{}'", | ||
| attrs.metadata().name() | ||
| ); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn on_enter(&self, id: &tracing::span::Id, ctx: Context<'_, S>) { | ||
| if let Some(weak_dispatch) = self.get_weak_dispatch(true) { | ||
| if let Some(span_ref) = ctx.span(id) { | ||
| let mut extensions = span_ref.extensions_mut(); | ||
| if let Some(otel_context) = | ||
| OpenTelemetryContext::context(&mut extensions, &weak_dispatch.upgrade()) | ||
| { | ||
| let span = otel_context.span(); | ||
| let span_context = span.span_context(); | ||
| if span_context.is_valid() { | ||
| println!( | ||
| "📍 Entering span with trace_id: {:032x}, span_id: {:016x}", | ||
| span_context.trace_id(), | ||
| span_context.span_id() | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn on_register_dispatch(&self, subscriber: &tracing::Dispatch) { | ||
| // Note: This does not work for Layer until https://github.com/tokio-rs/tracing/pull/3379 | ||
| // is merged and released, since `on_register_dispatch` is never called. | ||
| let mut dispatch = self.dispatch.write().unwrap(); | ||
| *dispatch = Some(subscriber.clone().downgrade()); | ||
| } | ||
| } | ||
|
|
||
| fn setup_tracing() -> (impl Subscriber, SdkTracerProvider, SpanAnalysisLayer) { | ||
| // Create OpenTelemetry tracer that outputs to stdout | ||
| let provider = SdkTracerProvider::builder() | ||
| .with_simple_exporter(stdout::SpanExporter::default()) | ||
| .build(); | ||
| let tracer = provider.tracer("span_ref_ext_example"); | ||
|
|
||
| // Create our custom analysis layer | ||
| let analysis_layer = SpanAnalysisLayer::new(); | ||
|
|
||
| // Build the subscriber with multiple layers: | ||
| // 1. OpenTelemetry layer for trace export | ||
| // 2. Our custom analysis layer that uses OpenTelemetryContext | ||
| // 3. Formatting layer for console output | ||
| let subscriber = tracing_subscriber::registry() | ||
| .with(layer().with_tracer(tracer).with_filter(LevelFilter::DEBUG)) | ||
| .with(analysis_layer.clone()) | ||
| .with( | ||
| tracing_subscriber::fmt::layer() | ||
| .with_target(false) | ||
| .with_filter(LevelFilter::INFO), | ||
| ); | ||
|
|
||
| (subscriber, provider, analysis_layer) | ||
| } | ||
|
|
||
| fn simulate_application_work() { | ||
| // Create a root span for the main application work | ||
| let root_span = span!(tracing::Level::INFO, "application_main", version = "1.0.0"); | ||
| let _root_guard = root_span.enter(); | ||
|
|
||
| info!("Starting application"); | ||
|
|
||
| // Simulate some business logic with nested spans | ||
| { | ||
| let auth_span = span!(tracing::Level::DEBUG, "authenticate_user", user_id = 12345); | ||
| let _auth_guard = auth_span.enter(); | ||
|
|
||
| debug!("Validating user credentials"); | ||
|
|
||
| // Simulate authentication work | ||
| std::thread::sleep(std::time::Duration::from_millis(10)); | ||
|
|
||
| info!("User authenticated successfully"); | ||
| } | ||
|
|
||
| // Simulate database operations | ||
| { | ||
| let db_span = span!( | ||
| tracing::Level::DEBUG, | ||
| "database_query", | ||
| query = "SELECT * FROM users", | ||
| table = "users" | ||
| ); | ||
| let _db_guard = db_span.enter(); | ||
|
|
||
| debug!("Executing database query"); | ||
|
|
||
| // Nested span for connection management | ||
| { | ||
| let conn_span = span!(tracing::Level::DEBUG, "acquire_connection", pool_size = 10); | ||
| let _conn_guard = conn_span.enter(); | ||
|
|
||
| debug!("Acquiring database connection from pool"); | ||
| std::thread::sleep(std::time::Duration::from_millis(5)); | ||
| } | ||
|
|
||
| std::thread::sleep(std::time::Duration::from_millis(20)); | ||
| info!("Database query completed"); | ||
| } | ||
|
|
||
| // Simulate some processing work | ||
| { | ||
| let process_span = span!( | ||
| tracing::Level::DEBUG, | ||
| "process_data", | ||
| records_count = 150, | ||
| batch_size = 50 | ||
| ); | ||
| let _process_guard = process_span.enter(); | ||
|
|
||
| debug!("Processing user data"); | ||
|
|
||
| for batch in 1..=3 { | ||
| let batch_span = span!( | ||
| tracing::Level::DEBUG, | ||
| "process_batch", | ||
| batch_number = batch, | ||
| batch_size = 50 | ||
| ); | ||
| let _batch_guard = batch_span.enter(); | ||
|
|
||
| debug!("Processing batch {}", batch); | ||
| std::thread::sleep(std::time::Duration::from_millis(8)); | ||
| } | ||
|
|
||
| info!("Data processing completed"); | ||
| } | ||
|
|
||
| warn!("Application work completed"); | ||
| } | ||
|
|
||
| fn main() { | ||
| println!( | ||
| "🚀 OpenTelemetryContext Example: Extracting OpenTelemetry Contexts from Separate Layer" | ||
| ); | ||
| println!("{}", "=".repeat(80)); | ||
|
|
||
| // Setup tracing with our custom layer | ||
| let (subscriber, provider, analysis_layer) = setup_tracing(); | ||
|
|
||
| tracing::subscriber::with_default(subscriber, || { | ||
| // Simulate application work that generates spans | ||
| simulate_application_work(); | ||
| }); | ||
|
|
||
| // Ensure all spans are flushed | ||
| drop(provider); | ||
|
|
||
| // Display the analysis results | ||
| println!("\n📊 Span Analysis Results:"); | ||
| println!("{}", "-".repeat(80)); | ||
|
|
||
| let results = analysis_layer.get_analysis_results(); | ||
| for (i, analysis) in results.iter().enumerate() { | ||
| println!( | ||
| "{}. Span: '{}'\n Trace ID: {}\n Span ID: {}\n Sampled: {}\n", | ||
| i + 1, | ||
| analysis.span_name, | ||
| analysis.trace_id, | ||
| analysis.span_id, | ||
| analysis.is_sampled | ||
| ); | ||
| } | ||
|
|
||
| println!( | ||
| "✅ Example completed! Total spans analyzed: {}", | ||
| results.len() | ||
| ); | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can
fdirectly takeopentelemetry::Context?Right now, we give a wrapper, that contains an end time (if someone wants to change that, they can directly get the otel data, no need to do anything around context activation) and a state which we pretty much guarantee to contain a case with
opentelemetry::Contextinside.This then leads to stuff like
which is the
OpenTelemetryContext::contextimplementation and is a bit weird considering we just made sure the context was started.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to do that since we'll probably want to hide
OtelDataagain in the next breaking release. We made it public again just so people got access to trace/span IDs inside tracing layers and with this PR getting just theContextis sufficient.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these things are
pub(crate)soOtelDatadoesn't leak, but I can change the signature.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do, it still doesn't make sense that the provided callback needs to match over
data.statewhen the whole point is guaranteeing that there is acontext.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It’s already changed. That method is now only used internally.