Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
- Added `Resource::get_ref(&self, key: &Key) -> Option<&Value>` to allow retrieving a reference to a resource value without cloning.
- **Breaking** Removed the following public hidden methods from the `SdkTracer` [#3227][3227]:
- `id_generator`, `should_sample`
- **Fix**: ObservableCounter and ObservableUpDownCounter now correctly report only data points from the current measurement cycle,
removing stale attribute combinations that are no longer observed. [#3213](https://github.com/open-telemetry/opentelemetry-rust/pull/3248)

[3227]: https://github.com/open-telemetry/opentelemetry-rust/pull/3227

Expand Down
10 changes: 8 additions & 2 deletions opentelemetry-sdk/src/metrics/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ where
}

/// Iterate through all attribute sets and populate `DataPoints` in readonly mode.
/// This is used in Cumulative temporality mode, where [`ValueMap`] is not cleared.
/// This is used for synchronous instruments (Counter, Histogram, etc.) in Cumulative temporality mode,
/// where attribute sets persist across collection cycles and [`ValueMap`] is not cleared.
pub(crate) fn collect_readonly<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
Expand All @@ -179,7 +180,12 @@ where
}

/// Iterate through all attribute sets, populate `DataPoints` and reset.
/// This is used in Delta temporality mode, where [`ValueMap`] is reset after collection.
/// This is used for:
/// - Synchronous instruments in Delta temporality mode
/// - Asynchronous instruments (Observable) in both Delta and Cumulative temporality modes
///
/// For asynchronous instruments, this removes stale attribute sets that were not observed
/// in the current callback, ensuring only currently active attributes are reported.
pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-sdk/src/metrics/internal/precomputed_sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ impl<T: Number> PrecomputedSum<T> {
s_data.temporality = Temporality::Cumulative;
s_data.is_monotonic = self.monotonic;

// Use collect_and_reset to remove stale attributes (not observed in current callback)
// For cumulative, report absolute values (no delta calculation needed)
self.value_map
.collect_readonly(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| SumDataPoint {
attributes,
value: aggr.value.get_value(),
exemplars: vec![],
Expand Down
17 changes: 3 additions & 14 deletions opentelemetry-sdk/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1457,17 +1457,12 @@ mod tests {
// Run this test with stdout enabled to see output.
// cargo test asynchronous_instruments_cumulative_data_points_only_from_last_measurement --features=testing -- --nocapture

asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper("gauge");
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
"gauge", true,
);
// TODO fix: all asynchronous instruments should not emit data points if not measured
// but these implementations are still buggy
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
"counter", false,
"counter",
);
asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
"updown_counter",
false,
);
}

Expand Down Expand Up @@ -1782,7 +1777,6 @@ mod tests {

fn asynchronous_instruments_cumulative_data_points_only_from_last_measurement_helper(
instrument_name: &'static str,
should_not_emit: bool,
) {
let mut test_context = TestContext::new(Temporality::Cumulative);
let attributes = Arc::new([KeyValue::new("key1", "value1")]);
Expand Down Expand Up @@ -1844,12 +1838,7 @@ mod tests {

test_context.flush_metrics();

if should_not_emit {
test_context.check_no_metrics();
} else {
// Test that latest export has the same data as the previous one
assert_correct_export(&mut test_context, instrument_name);
}
test_context.check_no_metrics();

fn assert_correct_export(test_context: &mut TestContext, instrument_name: &'static str) {
match instrument_name {
Expand Down
5 changes: 2 additions & 3 deletions opentelemetry-sdk/src/metrics/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,8 @@ fn aggregate_fn<T: Number>(
}
Aggregation::Sum => {
let fns = match kind {
// TODO implement: observable instruments should not report data points on every collect
// from SDK: For asynchronous instruments with Delta or Cumulative aggregation temporality,
// MetricReader.Collect MUST only receive data points with measurements recorded since the previous collection
// Observable instruments use collect_and_reset to report only data points
// measured in the current callback, removing stale attributes
InstrumentKind::ObservableCounter => b.precomputed_sum(true),
InstrumentKind::ObservableUpDownCounter => b.precomputed_sum(false),
InstrumentKind::Counter | InstrumentKind::Histogram => b.sum(true),
Expand Down