From b140f4904dc36390c82ad46c781882c3af162b2d Mon Sep 17 00:00:00 2001 From: taisho6339 Date: Sun, 16 Nov 2025 19:03:10 +0900 Subject: [PATCH] Remove stale attribute combinations that are no longer observed in ObservableCounter and ObservableUpDownCounter --- opentelemetry-sdk/CHANGELOG.md | 2 ++ opentelemetry-sdk/src/metrics/internal/mod.rs | 10 ++++++++-- .../src/metrics/internal/precomputed_sum.rs | 4 +++- opentelemetry-sdk/src/metrics/mod.rs | 17 +++-------------- opentelemetry-sdk/src/metrics/pipeline.rs | 5 ++--- 5 files changed, 18 insertions(+), 20 deletions(-) diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index e5e4ca3295..c3adf55073 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -5,6 +5,8 @@ - Added `Resource::get_ref(&self, key: &Key) -> Option<&Value>` to allow retrieving a reference to a resource value without cloning. - **Breaking** Removed the following public hidden methods from the `SdkTracer` [#3227][3227]: - `id_generator`, `should_sample` +- **Fix**: ObservableCounter and ObservableUpDownCounter now correctly report only data points from the current measurement cycle, + removing stale attribute combinations that are no longer observed. [#3213](https://github.com/open-telemetry/opentelemetry-rust/pull/3248) [3227]: https://github.com/open-telemetry/opentelemetry-rust/pull/3227 diff --git a/opentelemetry-sdk/src/metrics/internal/mod.rs b/opentelemetry-sdk/src/metrics/internal/mod.rs index 9ca9de20c1..733dae2e82 100644 --- a/opentelemetry-sdk/src/metrics/internal/mod.rs +++ b/opentelemetry-sdk/src/metrics/internal/mod.rs @@ -156,7 +156,8 @@ where } /// Iterate through all attribute sets and populate `DataPoints` in readonly mode. - /// This is used in Cumulative temporality mode, where [`ValueMap`] is not cleared. + /// This is used for synchronous instruments (Counter, Histogram, etc.) in Cumulative temporality mode, + /// where attribute sets persist across collection cycles and [`ValueMap`] is not cleared. pub(crate) fn collect_readonly(&self, dest: &mut Vec, mut map_fn: MapFn) where MapFn: FnMut(Vec, &A) -> Res, @@ -179,7 +180,12 @@ where } /// Iterate through all attribute sets, populate `DataPoints` and reset. - /// This is used in Delta temporality mode, where [`ValueMap`] is reset after collection. + /// This is used for: + /// - Synchronous instruments in Delta temporality mode + /// - Asynchronous instruments (Observable) in both Delta and Cumulative temporality modes + /// + /// For asynchronous instruments, this removes stale attribute sets that were not observed + /// in the current callback, ensuring only currently active attributes are reported. pub(crate) fn collect_and_reset(&self, dest: &mut Vec, mut map_fn: MapFn) where MapFn: FnMut(Vec, A) -> Res, diff --git a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs index fc372de007..a35fe5537a 100644 --- a/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs @@ -116,8 +116,10 @@ impl PrecomputedSum { s_data.temporality = Temporality::Cumulative; s_data.is_monotonic = self.monotonic; + // Use collect_and_reset to remove stale attributes (not observed in current callback) + // For cumulative, report absolute values (no delta calculation needed) self.value_map - .collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint { + .collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint { attributes, value: aggr.value.get_value(), exemplars: vec![], diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index fa1375bd1f..e3b694cb1d 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -1457,17 +1457,12 @@ mod tests { // Run this test with stdout enabled to see output. // cargo test asynchronous_instruments_cumulative_data_points_only_from_last_measurement --features=testing -- --nocapture + asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper("gauge"); asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper( - "gauge", true, - ); - // TODO fix: all asynchronous instruments should not emit data points if not measured - // but these implementations are still buggy - asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper( - "counter", false, + "counter", ); asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper( "updown_counter", - false, ); } @@ -1782,7 +1777,6 @@ mod tests { fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper( instrument_name: &'static str, - should_not_emit: bool, ) { let mut test_context = TestContext::new(Temporality::Cumulative); let attributes = Arc::new([KeyValue::new("key1", "value1")]); @@ -1844,12 +1838,7 @@ mod tests { test_context.flush_metrics(); - if should_not_emit { - test_context.check_no_metrics(); - } else { - // Test that latest export has the same data as the previous one - assert_correct_export(&mut test_context, instrument_name); - } + test_context.check_no_metrics(); fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) { match instrument_name { diff --git a/opentelemetry-sdk/src/metrics/pipeline.rs b/opentelemetry-sdk/src/metrics/pipeline.rs index 73b7630994..6187779ddb 100644 --- a/opentelemetry-sdk/src/metrics/pipeline.rs +++ b/opentelemetry-sdk/src/metrics/pipeline.rs @@ -524,9 +524,8 @@ fn aggregate_fn( } Aggregation::Sum => { let fns = match kind { - // TODO implement: observable instruments should not report data points on every collect - // from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality, - // MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection + // Observable instruments use collect_and_reset to report only data points + // measured in the current callback, removing stale attributes InstrumentKind::ObservableCounter => b.precomputed_sum(true), InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false), InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),