Skip to content

Commit 6576790

Browse files
committed
init
1 parent 43a8a7a commit 6576790

File tree

6 files changed

+84
-5
lines changed

6 files changed

+84
-5
lines changed

crates/sdk-core-c-bridge/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,5 @@ thiserror = { workspace = true }
5555
cbindgen = { version = "0.29", default-features = false }
5656

5757
[features]
58+
antithesis_assertions = ["temporalio-sdk-core/antithesis_assertions"]
5859
xz2-static = ["xz2/static"]

crates/sdk-core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ tokio-console = ["console-subscriber"]
2727
ephemeral-server = ["dep:flate2", "dep:reqwest", "dep:tar", "dep:zip"]
2828
debug-plugin = ["dep:reqwest"]
2929
test-utilities = ["dep:assert_matches", "dep:bimap"]
30+
antithesis_assertions = ["dep:antithesis_sdk"]
3031

3132
[dependencies]
3233
anyhow = "1.0"
34+
antithesis_sdk = { version = "0.2.1", optional = true, default-features = false, features = ["full"] }
3335
assert_matches = { version = "1.5", optional = true }
3436
bimap = { version = "0.6.3", optional = true }
3537
async-trait = "0.1"

crates/sdk-core/src/abstractions.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -422,11 +422,30 @@ impl<SK: SlotKind> OwnedMeteredSemPermit<SK> {
422422
#[derive(Debug)]
423423
pub(crate) struct UsedMeteredSemPermit<SK: SlotKind>(#[allow(dead_code)] OwnedMeteredSemPermit<SK>);
424424

425+
#[cfg(feature = "antithesis_assertions")]
425426
macro_rules! dbg_panic {
426-
($($arg:tt)*) => {
427-
error!($($arg)*);
428-
debug_assert!(false, $($arg)*);
429-
};
427+
($($arg:tt)*) => {{
428+
let message = format!($($arg)*);
429+
error!("{}", message);
430+
crate::antithesis::assert_always_failure(
431+
"dbg_panic invariant triggered",
432+
::serde_json::json!({
433+
"message": message.clone(),
434+
"file": file!(),
435+
"line": line!(),
436+
"module": module_path!(),
437+
}),
438+
);
439+
debug_assert!(false, "{}", message);
440+
}};
441+
}
442+
443+
#[cfg(not(feature = "antithesis_assertions"))]
444+
macro_rules! dbg_panic {
445+
($($arg:tt)*) => {{
446+
error!($($arg)*);
447+
debug_assert!(false, $($arg)*);
448+
}};
430449
}
431450
pub(crate) use dbg_panic;
432451

crates/sdk-core/src/antithesis.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#[cfg(feature = "antithesis_assertions")]
2+
use std::sync::OnceLock;
3+
4+
#[cfg(feature = "antithesis_assertions")]
5+
use serde_json::Value;
6+
7+
/// Ensure Antithesis is initialized exactly once.
8+
#[cfg(feature = "antithesis_assertions")]
9+
pub(crate) fn ensure_init() {
10+
static INIT: OnceLock<()> = OnceLock::new();
11+
INIT.get_or_init(|| {
12+
::antithesis_sdk::antithesis_init();
13+
});
14+
}
15+
16+
#[cfg(not(feature = "antithesis_assertions"))]
17+
#[inline]
18+
pub(crate) fn ensure_init() {}
19+
20+
/// Record an always-true assertion failure to Antithesis when an invariant is
21+
/// violated.
22+
#[cfg(feature = "antithesis_assertions")]
23+
pub(crate) fn assert_always_failure(name: &str, metadata: Value) {
24+
::antithesis_sdk::assert_always!(false, name, &metadata);
25+
}

crates/sdk-core/src/lib.rs

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22
#![allow(clippy::upper_case_acronyms)]
33

44
//! This crate provides a basis for creating new Temporal SDKs without completely starting from
5-
//! scratch
5+
//! scratch.
6+
//!
7+
//! ## Optional features
8+
//! - `antithesis_assertions`: Enables integration with Antithesis' Rust SDK. When active, core
9+
//! initialization calls `antithesis_sdk::antithesis_init` and critical invariants emit
10+
//! Antithesis assertions to aid fuzzing campaigns.
611
712
#[cfg(test)]
813
#[macro_use]
@@ -12,6 +17,7 @@ extern crate tracing;
1217
extern crate core;
1318

1419
mod abstractions;
20+
mod antithesis;
1521
#[cfg(feature = "debug-plugin")]
1622
pub mod debug_client;
1723
#[cfg(feature = "ephemeral-server")]
@@ -86,8 +92,18 @@ pub fn init_worker<CT>(
8692
where
8793
CT: Into<sealed::AnyClient>,
8894
{
95+
crate::antithesis::ensure_init();
8996
let namespace = worker_config.namespace.clone();
9097
if namespace.is_empty() {
98+
#[cfg(feature = "antithesis_assertions")]
99+
crate::antithesis::assert_always_failure(
100+
"worker namespace must not be empty",
101+
::serde_json::json!({
102+
"namespace": namespace.clone(),
103+
"task_queue": worker_config.task_queue.clone(),
104+
"has_identity_override": worker_config.client_identity_override.is_some(),
105+
}),
106+
);
91107
bail!("Worker namespace cannot be empty");
92108
}
93109

@@ -103,6 +119,14 @@ where
103119
let sticky_q = sticky_q_name_for_worker(&client_ident, worker_config.max_cached_workflows);
104120

105121
if client_ident.is_empty() {
122+
#[cfg(feature = "antithesis_assertions")]
123+
crate::antithesis::assert_always_failure(
124+
"client identity must not be empty",
125+
::serde_json::json!({
126+
"namespace": namespace.clone(),
127+
"task_queue": worker_config.task_queue.clone(),
128+
}),
129+
);
106130
bail!("Client identity cannot be empty. Either lang or user should be setting this value");
107131
}
108132

@@ -132,6 +156,7 @@ pub fn init_replay_worker<I>(rwi: ReplayWorkerInput<I>) -> Result<Worker, anyhow
132156
where
133157
I: Stream<Item = HistoryForReplay> + Send + 'static,
134158
{
159+
crate::antithesis::ensure_init();
135160
info!(
136161
task_queue = rwi.config.task_queue.as_str(),
137162
"Registering replay worker"
@@ -290,6 +315,7 @@ impl CoreRuntime {
290315
where
291316
F: Fn() + Send + Sync + 'static,
292317
{
318+
crate::antithesis::ensure_init();
293319
let telemetry = telemetry_init(runtime_options.telemetry_options)?;
294320
let subscriber = telemetry.trace_subscriber();
295321
let runtime = tokio_builder
@@ -317,6 +343,7 @@ impl CoreRuntime {
317343
/// # Panics
318344
/// If there is no currently active Tokio runtime
319345
pub fn new_assume_tokio(runtime_options: RuntimeOptions) -> Result<Self, anyhow::Error> {
346+
crate::antithesis::ensure_init();
320347
let telemetry = telemetry_init(runtime_options.telemetry_options)?;
321348
Ok(Self::new_assume_tokio_initialized_telem(
322349
telemetry,
@@ -333,6 +360,7 @@ impl CoreRuntime {
333360
telemetry: TelemetryInstance,
334361
heartbeat_interval: Option<Duration>,
335362
) -> Self {
363+
crate::antithesis::ensure_init();
336364
let runtime_handle = tokio::runtime::Handle::current();
337365
if let Some(sub) = telemetry.trace_subscriber() {
338366
set_trace_subscriber_for_current_thread(sub);

crates/sdk/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,9 @@ version = "0.1"
4242
path = "../client"
4343
version = "0.1"
4444

45+
[features]
46+
default = []
47+
antithesis_assertions = ["temporalio-sdk-core/antithesis_assertions"]
48+
4549
[lints]
4650
workspace = true

0 commit comments

Comments
 (0)