Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion opentelemetry-sdk/src/logs/batch_log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ pub(crate) const OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT: usize = 2_048;
pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE: &str = "OTEL_BLRP_MAX_EXPORT_BATCH_SIZE";
/// Default maximum batch size.
pub(crate) const OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT: usize = 512;
/// Force flush timeout
pub(crate) const OTEL_BLRP_FORCE_FLUSH_TIMEOUT: &str = "OTEL_BLRP_FORCE_FLUSH_TIMEOUT";
/// Default force flush timeout
pub(crate) const OTEL_BLRP_FORCE_FLUSH_TIMEOUT_DEFAULT: Duration = Duration::from_millis(5_000);

/// Messages sent between application thread and batch log processor's work thread.
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -339,6 +343,7 @@ impl BatchLogProcessor {
let max_export_batch_size = config.max_export_batch_size;
let current_batch_size = Arc::new(AtomicUsize::new(0));
let current_batch_size_for_thread = current_batch_size.clone();
let forceflush_timeout = config.forceflush_timeout;

let handle = thread::Builder::new()
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
Expand Down Expand Up @@ -489,7 +494,7 @@ impl BatchLogProcessor {
logs_sender,
message_sender,
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
forceflush_timeout,
dropped_logs_count: AtomicUsize::new(0),
max_queue_size,
export_log_message_sent: Arc::new(AtomicBool::new(false)),
Expand Down Expand Up @@ -586,6 +591,9 @@ pub struct BatchConfig {
/// is 512.
pub(crate) max_export_batch_size: usize,

/// The maximum duration to wait when force flushing.
pub(crate) forceflush_timeout: Duration,

/// The maximum duration to export a batch of data.
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
pub(crate) max_export_timeout: Duration,
Expand All @@ -603,6 +611,7 @@ pub struct BatchConfigBuilder {
max_queue_size: usize,
scheduled_delay: Duration,
max_export_batch_size: usize,
forceflush_timeout: Duration,
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
max_export_timeout: Duration,
}
Expand All @@ -622,6 +631,7 @@ impl Default for BatchConfigBuilder {
max_queue_size: OTEL_BLRP_MAX_QUEUE_SIZE_DEFAULT,
scheduled_delay: OTEL_BLRP_SCHEDULE_DELAY_DEFAULT,
max_export_batch_size: OTEL_BLRP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
forceflush_timeout: OTEL_BLRP_FORCE_FLUSH_TIMEOUT_DEFAULT,
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
max_export_timeout: OTEL_BLRP_EXPORT_TIMEOUT_DEFAULT,
}
Expand Down Expand Up @@ -682,6 +692,17 @@ impl BatchConfigBuilder {
self
}

/// Set forceflush_timeout for [`BatchConfigBuilder`].
/// The default value is 5000 milliseconds.
///
/// Corresponding environment variable: `OTEL_BLRP_FORCE_FLUSH_TIMEOUT`.
///
/// Note: Programmatically setting this will override any value set via the environment variable.
pub fn with_forceflush_timeout(mut self, forceflush_timeout: Duration) -> Self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for shutdown, we allow users to pass timeout in each shutdown call itself. Could we stick with the same for flush also?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its best to be consistent with shutdown, if at all we are adding timeout to flush.
i.e Add this all the way from provider itself.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I'll refactor the code to mirror the propagation of the provider's timeout like we do for shutdown_with_timeout.

self.forceflush_timeout = forceflush_timeout;
self
}

/// Builds a `BatchConfig` enforcing the following invariants:
/// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
pub fn build(self) -> BatchConfig {
Expand All @@ -692,6 +713,7 @@ impl BatchConfigBuilder {
BatchConfig {
max_queue_size: self.max_queue_size,
scheduled_delay: self.scheduled_delay,
forceflush_timeout: self.forceflush_timeout,
#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
max_export_timeout: self.max_export_timeout,
max_export_batch_size,
Expand Down Expand Up @@ -720,6 +742,13 @@ impl BatchConfigBuilder {
self.scheduled_delay = Duration::from_millis(scheduled_delay);
}

if let Some(forceflush_timeout) = env::var(OTEL_BLRP_FORCE_FLUSH_TIMEOUT)
.ok()
.and_then(|s| u64::from_str(&s).ok())
{
self.forceflush_timeout = Duration::from_millis(forceflush_timeout);
}

#[cfg(feature = "experimental_logs_batch_log_processor_with_async_runtime")]
if let Some(max_export_timeout) = env::var(OTEL_BLRP_EXPORT_TIMEOUT)
.ok()
Expand Down
31 changes: 30 additions & 1 deletion opentelemetry-sdk/src/trace/span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ pub(crate) const OTEL_BSP_EXPORT_TIMEOUT_DEFAULT: Duration = Duration::from_mill
pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS: &str = "OTEL_BSP_MAX_CONCURRENT_EXPORTS";
/// Default max concurrent exports for BSP
pub(crate) const OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT: usize = 1;
/// Force flush timeout
pub(crate) const OTEL_BSP_FORCE_FLUSH_TIMEOUT: &str = "OTEL_BSP_FORCE_FLUSH_TIMEOUT";
/// Default force flush timeout
pub(crate) const OTEL_BSP_FORCE_FLUSH_TIMEOUT_DEFAULT: Duration = Duration::from_millis(5_000);

/// `SpanProcessor` is an interface which allows hooks for span start and end
/// method invocations. The span processors are invoked only when is_recording
Expand Down Expand Up @@ -312,6 +316,7 @@ impl BatchSpanProcessor {
let max_export_batch_size = config.max_export_batch_size;
let current_batch_size = Arc::new(AtomicUsize::new(0));
let current_batch_size_for_thread = current_batch_size.clone();
let forceflush_timeout = config.forceflush_timeout;

let handle = thread::Builder::new()
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
Expand Down Expand Up @@ -424,7 +429,7 @@ impl BatchSpanProcessor {
span_sender,
message_sender,
handle: Mutex::new(Some(handle)),
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
forceflush_timeout,
dropped_spans_count: AtomicUsize::new(0),
max_queue_size,
export_span_message_sent: Arc::new(AtomicBool::new(false)),
Expand Down Expand Up @@ -757,6 +762,9 @@ pub struct BatchConfig {
/// by an exporter. A value of 1 will cause exports to be performed
/// synchronously on the BatchSpanProcessor task.
pub(crate) max_concurrent_exports: usize,

/// The maximum duration to wait when force flushing.
pub(crate) forceflush_timeout: Duration,
}

impl Default for BatchConfig {
Expand All @@ -773,6 +781,7 @@ pub struct BatchConfigBuilder {
max_export_batch_size: usize,
max_export_timeout: Duration,
max_concurrent_exports: usize,
forceflush_timeout: Duration,
}

impl Default for BatchConfigBuilder {
Expand All @@ -793,6 +802,7 @@ impl Default for BatchConfigBuilder {
max_export_batch_size: OTEL_BSP_MAX_EXPORT_BATCH_SIZE_DEFAULT,
max_export_timeout: OTEL_BSP_EXPORT_TIMEOUT_DEFAULT,
max_concurrent_exports: OTEL_BSP_MAX_CONCURRENT_EXPORTS_DEFAULT,
forceflush_timeout: OTEL_BSP_FORCE_FLUSH_TIMEOUT_DEFAULT,
}
.init_from_env_vars()
}
Expand Down Expand Up @@ -868,6 +878,17 @@ impl BatchConfigBuilder {
self
}

/// Set forceflush_timeout for [`BatchConfigBuilder`].
/// The default value is 5000 milliseconds.
///
/// Corresponding environment variable: `OTEL_BSP_FORCE_FLUSH_TIMEOUT`.
///
/// Note: Programmatically setting this will override any value set via the environment variable.
pub fn with_forceflush_timeout(mut self, forceflush_timeout: Duration) -> Self {
self.forceflush_timeout = forceflush_timeout;
self
}

/// Builds a `BatchConfig` enforcing the following invariants:
/// * `max_export_batch_size` must be less than or equal to `max_queue_size`.
pub fn build(self) -> BatchConfig {
Expand All @@ -880,6 +901,7 @@ impl BatchConfigBuilder {
scheduled_delay: self.scheduled_delay,
max_export_timeout: self.max_export_timeout,
max_concurrent_exports: self.max_concurrent_exports,
forceflush_timeout: self.forceflush_timeout,
max_export_batch_size,
}
}
Expand Down Expand Up @@ -926,6 +948,13 @@ impl BatchConfigBuilder {
self.max_export_timeout = Duration::from_millis(max_export_timeout);
}

if let Some(forceflush_timeout) = env::var(OTEL_BSP_FORCE_FLUSH_TIMEOUT)
.ok()
.and_then(|s| u64::from_str(&s).ok())
{
self.forceflush_timeout = Duration::from_millis(forceflush_timeout);
}

self
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ mod tests {
scheduled_delay: Duration::from_secs(3600), // effectively disabled
max_export_timeout: Duration::from_secs(5),
max_concurrent_exports: 2, // what we want to verify
forceflush_timeout: Duration::from_secs(5),
};

// Spawn the processor.
Expand Down Expand Up @@ -685,6 +686,7 @@ mod tests {
scheduled_delay: Duration::from_secs(3600),
max_export_timeout: Duration::from_secs(5),
max_concurrent_exports: 1, // what we want to verify
forceflush_timeout: Duration::from_secs(5),
};

let processor = BatchSpanProcessor::new(exporter, config, runtime::Tokio);
Expand Down
Loading