Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl LogProcessor for EnrichWithBaggageLogProcessor {
});
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand All @@ -132,7 +132,7 @@ impl LogProcessor for EnrichWithBaggageLogProcessor {
#[derive(Debug)]
struct EnrichWithBaggageSpanProcessor;
impl SpanProcessor for EnrichWithBaggageSpanProcessor {
fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-appender-tracing/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ const fn severity_of_level(level: &Level) -> Severity {

#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::layer;
use opentelemetry::logs::Severity;
use opentelemetry::trace::TracerProvider;
Expand Down Expand Up @@ -931,7 +932,7 @@ mod tests {
true
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ pub mod tonic {

#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::transform::common::tonic::ResourceAttributesWithSchema;
use opentelemetry::logs::LogRecord as _;
use opentelemetry::logs::Logger;
Expand All @@ -238,7 +239,7 @@ mod tests {
impl LogProcessor for MockProcessor {
fn emit(&self, _record: &mut SdkLogRecord, _instrumentation: &InstrumentationScope) {}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down
20 changes: 8 additions & 12 deletions opentelemetry-sdk/src/logs/batch_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ pub struct BatchLogProcessor {
logs_sender: SyncSender<LogsData>, // Data channel to store log records and instrumentation scopes
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages for the worker thread
handle: Mutex<Option<thread::JoinHandle<()>>>,
forceflush_timeout: Duration,
export_log_message_sent: Arc<AtomicBool>,
current_batch_size: Arc<AtomicUsize>,
max_export_batch_size: usize,
Expand Down Expand Up @@ -221,21 +220,19 @@ impl LogProcessor for BatchLogProcessor {
}
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
let (sender, receiver) = mpsc::sync_channel(1);
match self
.message_sender
.try_send(BatchMessage::ForceFlush(sender))
{
Ok(_) => receiver
.recv_timeout(self.forceflush_timeout)
.map_err(|err| {
if err == RecvTimeoutError::Timeout {
OTelSdkError::Timeout(self.forceflush_timeout)
} else {
OTelSdkError::InternalFailure(format!("{err}"))
}
})?,
Ok(_) => receiver.recv_timeout(timeout).map_err(|err| {
if err == RecvTimeoutError::Timeout {
OTelSdkError::Timeout(timeout)
} else {
OTelSdkError::InternalFailure(format!("{err}"))
}
})?,
Err(mpsc::TrySendError::Full(_)) => {
// If the control message could not be sent, emit a warning.
otel_debug!(
Expand Down Expand Up @@ -489,7 +486,6 @@ impl BatchLogProcessor {
logs_sender,
message_sender,
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
export_log_message_sent: Arc::new(AtomicBool::new(false)),
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/concurrent_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl<T: LogExporter> LogProcessor for SimpleConcurrentLogProcessor<T> {
}
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
// TODO: invoke flush on exporter
// once https://github.com/open-telemetry/opentelemetry-rust/issues/2261
// is resolved
Expand Down
11 changes: 8 additions & 3 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ pub trait LogProcessor: Send + Sync + Debug {
/// - `instrumentation`: The instrumentation scope associated with the log record.
fn emit(&self, data: &mut SdkLogRecord, instrumentation: &InstrumentationScope);
/// Force the logs lying in the cache to be exported.
fn force_flush(&self) -> OTelSdkResult;
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult;
/// Force the logs lying in the cache to be exported with default timeout.
fn force_flush(&self) -> OTelSdkResult {
self.force_flush_with_timeout(Duration::from_secs(5))
}
/// Shuts down the processor.
/// After shutdown returns the log processor should stop processing any logs.
/// It's up to the implementation on when to drop the LogProcessor.
Expand Down Expand Up @@ -103,6 +107,7 @@ pub(crate) mod tests {
use opentelemetry::logs::{Logger, LoggerProvider};
use opentelemetry::{InstrumentationScope, Key};
use std::sync::{Arc, Mutex};
use std::time::Duration;

#[derive(Debug, Clone)]
pub(crate) struct MockLogExporter {
Expand Down Expand Up @@ -152,7 +157,7 @@ pub(crate) mod tests {
.push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data.
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down Expand Up @@ -182,7 +187,7 @@ pub(crate) mod tests {
.push((record.clone(), instrumentation.clone()));
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
}
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
let (res_sender, res_receiver) = oneshot::channel();
self.message_sender
.try_send(BatchMessage::Flush(Some(res_sender)))
Expand Down Expand Up @@ -625,7 +625,7 @@ mod tests {
.push((record.clone(), instrumentation.clone())); //clone as the LogProcessor is storing the data.
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down Expand Up @@ -655,7 +655,7 @@ mod tests {
.push((record.clone(), instrumentation.clone()));
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down
17 changes: 11 additions & 6 deletions opentelemetry-sdk/src/logs/logger_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ impl SdkLoggerProvider {
}

/// Force flush all remaining logs in log processors and return results.
pub fn force_flush(&self) -> OTelSdkResult {
pub fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
let result: Vec<_> = self
.log_processors()
.iter()
.map(|processor| processor.force_flush())
.map(|processor| processor.force_flush_with_timeout(timeout))
.collect();
if result.iter().all(|r| r.is_ok()) {
Ok(())
Expand All @@ -96,6 +96,11 @@ impl SdkLoggerProvider {
}
}

/// Force flush all remaining logs with default timeout.
pub fn force_flush(&self) -> OTelSdkResult {
self.force_flush_with_timeout(Duration::from_secs(5))
}

/// Shuts down this `LoggerProvider`
pub fn shutdown_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
otel_debug!(
Expand Down Expand Up @@ -340,7 +345,7 @@ mod tests {
.expect("lock poisoned");
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down Expand Up @@ -393,7 +398,7 @@ mod tests {
// nothing to do.
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down Expand Up @@ -913,7 +918,7 @@ mod tests {
// nothing to do.
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
*self.flush_called.lock().unwrap() = true;
Ok(())
}
Expand Down Expand Up @@ -944,7 +949,7 @@ mod tests {
// nothing to do
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
*self.flush_called.lock().unwrap() = true;
Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ mod tests {
use std::borrow::Borrow;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;

#[test]
fn logging_sdk_test() {
Expand Down Expand Up @@ -167,7 +168,7 @@ mod tests {
});
}

fn force_flush(&self) -> crate::error::OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down Expand Up @@ -273,7 +274,7 @@ mod tests {
}
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/logs/simple_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
}
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down
7 changes: 6 additions & 1 deletion opentelemetry-sdk/src/trace/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ pub trait SpanExporter: Send + Sync + Debug {
/// implemented as a blocking API or an asynchronous API which notifies the caller via
/// a callback or an event. OpenTelemetry client authors can decide if they want to
/// make the flush timeout configurable.
fn force_flush(&mut self) -> OTelSdkResult {
fn force_flush_with_timeout(&mut self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

/// Force flush the exporter with default timeout.
fn force_flush(&mut self) -> OTelSdkResult {
self.force_flush_with_timeout(Duration::from_secs(5))
}

/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
}
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ mod tests {
// let _c = Context::current();
}

fn force_flush(&self) -> crate::error::OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down
13 changes: 9 additions & 4 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl SdkTracerProvider {
self.inner.is_shutdown.load(Ordering::Relaxed)
}

/// Force flush all remaining spans in span processors and return results.
/// Force flush all remaining spans in span processors with a default timeout and return results.
///
/// # Examples
///
Expand Down Expand Up @@ -228,10 +228,15 @@ impl SdkTracerProvider {
/// }
/// ```
pub fn force_flush(&self) -> OTelSdkResult {
self.force_flush_with_timeout(Duration::from_secs(5))
}

/// force flush processors with a specified timeout
pub fn force_flush_with_timeout(&self, timeout: Duration) -> OTelSdkResult {
let result: Vec<_> = self
.span_processors()
.iter()
.map(|processor| processor.force_flush())
.map(|processor| processor.force_flush_with_timeout(timeout))
.collect();
if result.iter().all(|r| r.is_ok()) {
Ok(())
Expand Down Expand Up @@ -530,7 +535,7 @@ mod tests {
// ignore
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
if self.success {
Ok(())
} else {
Expand Down Expand Up @@ -793,7 +798,7 @@ mod tests {
// No operation needed for this processor
}

fn force_flush(&self) -> OTelSdkResult {
fn force_flush_with_timeout(&self, _timeout: Duration) -> OTelSdkResult {
Ok(())
}

Expand Down
Loading
Loading