Skip to content

Commit 76b66fa

Browse files
authored
Gate worker heartbeating behind namespace capabilities (#1046)
* Gate worker heartbeating behind namespace capabilities * Lazily check namespace capabilities in worker heartbeater task * Shutdown worker whenever we return early * Mock describe namespace for heartbeating unit test * use v1.29.1 branch for new server patch * Revert "use v1.29.1 branch for new server patch" This reverts commit e7499eb.
1 parent 43a8a7a commit 76b66fa

File tree

1 file changed

+38
-1
lines changed

1 file changed

+38
-1
lines changed

crates/sdk-core/src/worker/heartbeat.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,27 @@ impl SharedNamespaceWorker {
6767
let heartbeat_map_clone = heartbeat_map.clone();
6868

6969
tokio::spawn(async move {
70+
match client_clone.describe_namespace().await {
71+
Ok(namespace_resp) => {
72+
if namespace_resp
73+
.namespace_info
74+
.and_then(|info| info.capabilities)
75+
.map(|caps| caps.worker_heartbeats)
76+
!= Some(true)
77+
{
78+
warn!(
79+
"Worker heartbeating configured for runtime, but server version does not support it."
80+
);
81+
worker.shutdown().await;
82+
return;
83+
}
84+
}
85+
Err(e) => {
86+
warn!(error=?e, "Network error while describing namespace for heartbeat capabilities");
87+
worker.shutdown().await;
88+
return;
89+
}
90+
}
7091
let mut ticker = tokio::time::interval(heartbeat_interval);
7192
loop {
7293
tokio::select! {
@@ -82,6 +103,7 @@ impl SharedNamespaceWorker {
82103
}
83104
if let Err(e) = client_clone.record_worker_heartbeat(namespace_clone.clone(), hb_to_send).await {
84105
if matches!(e.code(), tonic::Code::Unimplemented) {
106+
worker.shutdown().await;
85107
return;
86108
}
87109
warn!(error=?e, "Network error while sending worker heartbeat");
@@ -145,7 +167,10 @@ mod tests {
145167
time::Duration,
146168
};
147169
use temporalio_common::{
148-
protos::temporal::api::workflowservice::v1::RecordWorkerHeartbeatResponse,
170+
protos::temporal::api::namespace::v1::{NamespaceInfo, namespace_info::Capabilities},
171+
protos::temporal::api::workflowservice::v1::{
172+
DescribeNamespaceResponse, RecordWorkerHeartbeatResponse,
173+
},
149174
worker::PollerBehavior,
150175
};
151176

@@ -180,6 +205,18 @@ mod tests {
180205
Ok(RecordWorkerHeartbeatResponse {})
181206
},
182207
);
208+
mock.expect_describe_namespace().returning(move || {
209+
Ok(DescribeNamespaceResponse {
210+
namespace_info: Some(NamespaceInfo {
211+
capabilities: Some(Capabilities {
212+
worker_heartbeats: true,
213+
..Capabilities::default()
214+
}),
215+
..NamespaceInfo::default()
216+
}),
217+
..DescribeNamespaceResponse::default()
218+
})
219+
});
183220

184221
let config = test_worker_cfg()
185222
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(1_usize))

0 commit comments

Comments
 (0)