Skip to content

Commit b140f49

Browse files
committed
Remove stale attribute combinations that are no longer observed in ObservableCounter and ObservableUpDownCounter
1 parent 95af815 commit b140f49

File tree

5 files changed

+18
-20
lines changed

5 files changed

+18
-20
lines changed

opentelemetry-sdk/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
- Added `Resource::get_ref(&self, key: &Key) -> Option<&Value>` to allow retrieving a reference to a resource value without cloning.
66
- **Breaking** Removed the following public hidden methods from the `SdkTracer` [#3227][3227]:
77
- `id_generator`, `should_sample`
8+
- **Fix**: ObservableCounter and ObservableUpDownCounter now correctly report only data points from the current measurement cycle,
9+
removing stale attribute combinations that are no longer observed. [#3213](https://github.com/open-telemetry/opentelemetry-rust/pull/3248)
810

911
[3227]: https://github.com/open-telemetry/opentelemetry-rust/pull/3227
1012

opentelemetry-sdk/src/metrics/internal/mod.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,8 @@ where
156156
}
157157

158158
/// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
159-
/// This is used in Cumulative temporality mode, where [`ValueMap`] is not cleared.
159+
/// This is used for synchronous instruments (Counter, Histogram, etc.) in Cumulative temporality mode,
160+
/// where attribute sets persist across collection cycles and [`ValueMap`] is not cleared.
160161
pub(crate) fn collect_readonly<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
161162
where
162163
MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
@@ -179,7 +180,12 @@ where
179180
}
180181

181182
/// Iterate through all attribute sets, populate `DataPoints` and reset.
182-
/// This is used in Delta temporality mode, where [`ValueMap`] is reset after collection.
183+
/// This is used for:
184+
/// - Synchronous instruments in Delta temporality mode
185+
/// - Asynchronous instruments (Observable) in both Delta and Cumulative temporality modes
186+
///
187+
/// For asynchronous instruments, this removes stale attribute sets that were not observed
188+
/// in the current callback, ensuring only currently active attributes are reported.
183189
pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
184190
where
185191
MapFn: FnMut(Vec<KeyValue>, A) -> Res,

opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,10 @@ impl<T: Number> PrecomputedSum<T> {
116116
s_data.temporality = Temporality::Cumulative;
117117
s_data.is_monotonic = self.monotonic;
118118

119+
// Use collect_and_reset to remove stale attributes (not observed in current callback)
120+
// For cumulative, report absolute values (no delta calculation needed)
119121
self.value_map
120-
.collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
122+
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
121123
attributes,
122124
value: aggr.value.get_value(),
123125
exemplars: vec![],

opentelemetry-sdk/src/metrics/mod.rs

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1457,17 +1457,12 @@ mod tests {
14571457
// Run this test with stdout enabled to see output.
14581458
// cargo test asynchronous_instruments_cumulative_data_points_only_from_last_measurement --features=testing -- --nocapture
14591459

1460+
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper("gauge");
14601461
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1461-
"gauge", true,
1462-
);
1463-
// TODO fix: all asynchronous instruments should not emit data points if not measured
1464-
// but these implementations are still buggy
1465-
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
1466-
"counter", false,
1462+
"counter",
14671463
);
14681464
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
14691465
"updown_counter",
1470-
false,
14711466
);
14721467
}
14731468

@@ -1782,7 +1777,6 @@ mod tests {
17821777

17831778
fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
17841779
instrument_name: &'static str,
1785-
should_not_emit: bool,
17861780
) {
17871781
let mut test_context = TestContext::new(Temporality::Cumulative);
17881782
let attributes = Arc::new([KeyValue::new("key1", "value1")]);
@@ -1844,12 +1838,7 @@ mod tests {
18441838

18451839
test_context.flush_metrics();
18461840

1847-
if should_not_emit {
1848-
test_context.check_no_metrics();
1849-
} else {
1850-
// Test that latest export has the same data as the previous one
1851-
assert_correct_export(&mut test_context, instrument_name);
1852-
}
1841+
test_context.check_no_metrics();
18531842

18541843
fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
18551844
match instrument_name {

opentelemetry-sdk/src/metrics/pipeline.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -524,9 +524,8 @@ fn aggregate_fn<T: Number>(
524524
}
525525
Aggregation::Sum => {
526526
let fns = match kind {
527-
// TODO implement: observable instruments should not report data points on every collect
528-
// from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
529-
// MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
527+
// Observable instruments use collect_and_reset to report only data points
528+
// measured in the current callback, removing stale attributes
530529
InstrumentKind::ObservableCounter => b.precomputed_sum(true),
531530
InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false),
532531
InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),

0 commit comments

Comments
 (0)