Skip to content

Commit 0aa4a7f

Browse files
authored
Fix temporal prefix not applied to resource tuner metrics (#1043)
1 parent be69557 commit 0aa4a7f

File tree

3 files changed

+73
-19
lines changed

3 files changed

+73
-19
lines changed

crates/sdk-core/src/worker/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ impl AllPermitsTracker {
156156

157157
#[derive(Clone)]
158158
pub(crate) struct WorkerTelemetry {
159-
metric_meter: Option<TemporalMeter>,
160159
temporal_metric_meter: Option<TemporalMeter>,
161160
trace_subscriber: Option<Arc<dyn Subscriber + Send + Sync>>,
162161
}
@@ -313,7 +312,6 @@ impl Worker {
313312
info!(task_queue=%config.task_queue, namespace=%config.namespace, "Initializing worker");
314313

315314
let worker_telemetry = telem_instance.map(|telem| WorkerTelemetry {
316-
metric_meter: telem.get_metric_meter(),
317315
temporal_metric_meter: telem.get_temporal_metric_meter(),
318316
trace_subscriber: telem.trace_subscriber(),
319317
});
@@ -382,7 +380,7 @@ impl Worker {
382380
config.task_queue.clone(),
383381
wt.temporal_metric_meter.clone(),
384382
),
385-
wt.metric_meter.clone(),
383+
wt.temporal_metric_meter.clone(),
386384
)
387385
} else {
388386
(MetricsContext::no_op(), None)

crates/sdk-core/src/worker/tuner/resource_based.rs

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,10 @@ impl PidControllers {
233233

234234
impl MetricInstruments {
235235
fn new(meter: TemporalMeter) -> Self {
236-
let mem_usage = meter.inner.gauge_f64("resource_slots_mem_usage".into());
237-
let cpu_usage = meter.inner.gauge_f64("resource_slots_cpu_usage".into());
238-
let mem_pid_output = meter
239-
.inner
240-
.gauge_f64("resource_slots_mem_pid_output".into());
241-
let cpu_pid_output = meter
242-
.inner
243-
.gauge_f64("resource_slots_cpu_pid_output".into());
236+
let mem_usage = meter.gauge_f64("resource_slots_mem_usage".into());
237+
let cpu_usage = meter.gauge_f64("resource_slots_cpu_usage".into());
238+
let mem_pid_output = meter.gauge_f64("resource_slots_mem_pid_output".into());
239+
let cpu_pid_output = meter.gauge_f64("resource_slots_cpu_pid_output".into());
244240
let attribs = meter.inner.new_attributes(meter.default_attribs);
245241
Self {
246242
attribs,
@@ -732,15 +728,17 @@ impl CGroupCpuFileSystem for CgroupV2CpuFileSystem {
732728
mod tests {
733729
use super::*;
734730
use crate::{abstractions::MeteredPermitDealer, telemetry::metrics::MetricsContext};
735-
use std::cell::RefCell;
736-
use std::env;
737-
use std::hint::black_box;
738-
use std::rc::Rc;
739-
use std::sync::{
740-
Arc,
741-
atomic::{AtomicU64, Ordering},
731+
use std::{
732+
cell::RefCell,
733+
env,
734+
hint::black_box,
735+
rc::Rc,
736+
sync::{
737+
Arc,
738+
atomic::{AtomicU64, Ordering},
739+
},
740+
thread::sleep,
742741
};
743-
use std::thread::sleep;
744742
use temporalio_common::worker::WorkflowSlotKind;
745743

746744
struct FakeMIS {

crates/sdk-core/tests/integ_tests/metrics_tests.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,3 +1415,61 @@ async fn sticky_queue_label_strategy(
14151415
_ => unreachable!("Test only covers UseNormal and UseNormalAndSticky"),
14161416
}
14171417
}
1418+
1419+
#[tokio::test]
1420+
async fn resource_based_tuner_metrics() {
1421+
use temporalio_sdk_core::ResourceBasedTuner;
1422+
1423+
let (telemopts, addr, _aborter) = prom_metrics(None);
1424+
let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap();
1425+
let wf_name = "resource_based_tuner_metrics";
1426+
let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt);
1427+
starter.worker_config.no_remote_activities(true);
1428+
starter.worker_config.clear_max_outstanding_opts();
1429+
1430+
// Create a resource-based tuner with reasonable thresholds
1431+
let tuner = ResourceBasedTuner::new(0.8, 0.8);
1432+
starter.worker_config.tuner(Arc::new(tuner));
1433+
1434+
let mut worker = starter.worker().await;
1435+
1436+
worker.register_wf(wf_name.to_string(), |ctx: WfContext| async move {
1437+
ctx.timer(Duration::from_millis(100)).await;
1438+
Ok(().into())
1439+
});
1440+
1441+
worker
1442+
.submit_wf(
1443+
wf_name.to_owned(),
1444+
wf_name.to_owned(),
1445+
vec![],
1446+
WorkflowOptions::default(),
1447+
)
1448+
.await
1449+
.unwrap();
1450+
1451+
worker.run_until_done().await.unwrap();
1452+
1453+
// Give metrics time to be recorded (metrics are emitted every 1 second)
1454+
tokio::time::sleep(Duration::from_millis(1500)).await;
1455+
1456+
let body = get_text(format!("http://{addr}/metrics")).await;
1457+
1458+
// Verify that the resource-based tuner metrics are present
1459+
assert!(
1460+
body.contains("temporal_resource_slots_mem_usage"),
1461+
"Memory usage metric should be present"
1462+
);
1463+
assert!(
1464+
body.contains("temporal_resource_slots_cpu_usage"),
1465+
"CPU usage metric should be present"
1466+
);
1467+
assert!(
1468+
body.contains("temporal_resource_slots_mem_pid_output"),
1469+
"Memory PID output metric should be present"
1470+
);
1471+
assert!(
1472+
body.contains("temporal_resource_slots_cpu_pid_output"),
1473+
"CPU PID output metric should be present"
1474+
);
1475+
}

0 commit comments

Comments
 (0)