Skip to content

Commit 4a99cee

Browse files
Add support for gRPC binary metadata values (#1791)
1 parent 6fed7d4 commit 4a99cee

File tree

9 files changed

+313
-326
lines changed

9 files changed

+313
-326
lines changed

packages/core-bridge/Cargo.lock

Lines changed: 132 additions & 291 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core-bridge/sdk-core

Submodule sdk-core updated 144 files

packages/core-bridge/src/client.rs

Lines changed: 86 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ use std::time::Duration;
33
use std::{collections::HashMap, sync::Arc};
44

55
use neon::prelude::*;
6-
use tonic::metadata::MetadataKey;
6+
use tonic::metadata::{BinaryMetadataValue, MetadataKey};
77

88
use temporal_sdk_core::{ClientOptions as CoreClientOptions, CoreRuntime, RetryClient};
99

1010
use bridge_macros::{TryFromJs, js_function};
11-
use temporal_client::{ClientInitError, ConfiguredClient, TemporalServiceClientWithMetrics};
11+
use temporal_client::{ClientInitError, ConfiguredClient, TemporalServiceClient};
1212

1313
use crate::runtime::Runtime;
1414
use crate::{helpers::*, runtime::RuntimeExt as _};
@@ -38,7 +38,7 @@ pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult<
3838
Ok(())
3939
}
4040

41-
type CoreClient = RetryClient<ConfiguredClient<TemporalServiceClientWithMetrics>>;
41+
type CoreClient = RetryClient<ConfiguredClient<TemporalServiceClient>>;
4242

4343
pub struct Client {
4444
// These fields are pub because they are accessed from Worker::new
@@ -61,6 +61,10 @@ pub fn client_new(
6161

6262
let core_client = match res {
6363
Ok(core_client) => core_client,
64+
Err(ClientInitError::InvalidHeaders(e)) => Err(BridgeError::TypeError {
65+
message: format!("Invalid metadata key: {e}"),
66+
field: None,
67+
})?,
6468
Err(ClientInitError::SystemInfoCallError(e)) => Err(BridgeError::TransportError(
6569
format!("Failed to call GetSystemInfo: {e}"),
6670
))?,
@@ -84,13 +88,27 @@ pub fn client_new(
8488
#[js_function]
8589
pub fn client_update_headers(
8690
client: OpaqueInboundHandle<Client>,
87-
headers: HashMap<String, String>,
91+
headers: HashMap<String, MetadataValue>,
8892
) -> BridgeResult<()> {
93+
let (ascii_headers, bin_headers) = config::partition_headers(Some(headers));
94+
client
95+
.borrow()?
96+
.core_client
97+
.get_client()
98+
.set_headers(ascii_headers.unwrap_or_default())
99+
.map_err(|err| BridgeError::TypeError {
100+
message: format!("Invalid metadata key: {err}"),
101+
field: None,
102+
})?;
89103
client
90104
.borrow()?
91105
.core_client
92106
.get_client()
93-
.set_headers(headers);
107+
.set_binary_headers(bin_headers.unwrap_or_default())
108+
.map_err(|err| BridgeError::TypeError {
109+
message: format!("Invalid metadata key: {err}"),
110+
field: None,
111+
})?;
94112
Ok(())
95113
}
96114

@@ -122,10 +140,16 @@ pub struct RpcCall {
122140
pub rpc: String,
123141
pub req: Vec<u8>,
124142
pub retry: bool,
125-
pub metadata: HashMap<String, String>,
143+
pub metadata: HashMap<String, MetadataValue>,
126144
pub timeout: Option<Duration>,
127145
}
128146

147+
#[derive(Debug, Clone, TryFromJs)]
148+
pub enum MetadataValue {
149+
Ascii { value: String },
150+
Binary { value: Vec<u8> },
151+
}
152+
129153
/// Send a request to the Workflow Service using the provided Client
130154
#[js_function]
131155
pub fn client_send_workflow_service_request(
@@ -576,16 +600,29 @@ fn rpc_req<P: prost::Message + Default>(call: RpcCall) -> BridgeResult<tonic::Re
576600

577601
let mut req = tonic::Request::new(proto);
578602
for (k, v) in call.metadata {
579-
req.metadata_mut().insert(
580-
MetadataKey::from_str(k.as_str()).map_err(|err| BridgeError::TypeError {
581-
field: None,
582-
message: format!("Invalid metadata key: {err}"),
583-
})?,
584-
v.parse().map_err(|err| BridgeError::TypeError {
585-
field: None,
586-
message: format!("Invalid metadata value: {err}"),
587-
})?,
588-
);
603+
match v {
604+
MetadataValue::Ascii { value: v } => {
605+
req.metadata_mut().insert(
606+
MetadataKey::from_str(k.as_str()).map_err(|err| BridgeError::TypeError {
607+
field: None,
608+
message: format!("Invalid metadata key: {err}"),
609+
})?,
610+
v.parse().map_err(|err| BridgeError::TypeError {
611+
field: None,
612+
message: format!("Invalid metadata value: {err}"),
613+
})?,
614+
);
615+
}
616+
MetadataValue::Binary { value: v } => {
617+
req.metadata_mut().insert_bin(
618+
MetadataKey::from_str(k.as_str()).map_err(|err| BridgeError::TypeError {
619+
field: None,
620+
message: format!("Invalid metadata key: {err}"),
621+
})?,
622+
BinaryMetadataValue::from_bytes(&v),
623+
);
624+
}
625+
}
589626
}
590627

591628
if let Some(timeout) = call.timeout {
@@ -620,7 +657,7 @@ mod config {
620657

621658
use bridge_macros::TryFromJs;
622659

623-
use crate::helpers::*;
660+
use crate::{client::MetadataValue, helpers::*};
624661

625662
#[derive(Debug, Clone, TryFromJs)]
626663
pub(super) struct ClientOptions {
@@ -629,7 +666,7 @@ mod config {
629666
client_version: String,
630667
tls: Option<TlsConfig>,
631668
http_connect_proxy: Option<HttpConnectProxy>,
632-
headers: Option<HashMap<String, String>>,
669+
headers: Option<HashMap<String, MetadataValue>>,
633670
api_key: Option<String>,
634671
disable_error_code_metric_tags: bool,
635672
}
@@ -669,13 +706,16 @@ mod config {
669706
builder.tls_cfg(tls.into());
670707
}
671708

709+
let (ascii_headers, bin_headers) = partition_headers(self.headers);
710+
672711
let client_options = builder
673712
.target_url(self.target_url)
674713
.client_name(self.client_name)
675714
.client_version(self.client_version)
676715
// tls_cfg -- above
677716
.http_connect_proxy(self.http_connect_proxy.map(Into::into))
678-
.headers(self.headers)
717+
.headers(ascii_headers)
718+
.binary_headers(bin_headers)
679719
.api_key(self.api_key)
680720
.disable_error_code_metric_tags(self.disable_error_code_metric_tags)
681721
// identity -- skipped: will be set on worker
@@ -711,4 +751,31 @@ mod config {
711751
}
712752
}
713753
}
754+
755+
pub(super) fn partition_headers(
756+
headers: Option<HashMap<String, MetadataValue>>,
757+
) -> (
758+
Option<HashMap<String, String>>,
759+
Option<HashMap<String, Vec<u8>>>,
760+
) {
761+
let Some(headers) = headers else {
762+
return (None, None);
763+
};
764+
let mut ascii_headers = HashMap::default();
765+
let mut bin_headers = HashMap::default();
766+
for (k, v) in headers {
767+
match v {
768+
MetadataValue::Ascii { value: v } => {
769+
ascii_headers.insert(k, v);
770+
}
771+
MetadataValue::Binary { value: v } => {
772+
bin_headers.insert(k, v);
773+
}
774+
}
775+
}
776+
(
777+
(!ascii_headers.is_empty()).then_some(ascii_headers),
778+
(!bin_headers.is_empty()).then_some(bin_headers),
779+
)
780+
}
714781
}

packages/core-bridge/src/helpers/try_from_js.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use neon::{
1111
};
1212
use temporal_sdk_core::Url;
1313

14-
use super::{BridgeError, BridgeResult};
14+
use super::{AppendFieldContext, BridgeError, BridgeResult};
1515

1616
/// Trait for Rust types that can be created from JavaScript values, possibly throwing an error.
1717
pub trait TryFromJs: Sized {
@@ -175,8 +175,9 @@ impl<T: TryFromJs> TryFromJs for HashMap<String, T> {
175175
let mut map = Self::new();
176176
for key_handle in props {
177177
let key = key_handle.to_string(cx)?.value(cx);
178-
let value = obj.get_value(cx, key_handle)?;
179-
map.insert(key, T::try_from_js(cx, value)?);
178+
let js_value = obj.get_value(cx, key_handle)?;
179+
let value = T::try_from_js(cx, js_value).field(&key)?;
180+
map.insert(key, value);
180181
}
181182
Ok(map)
182183
}

packages/core-bridge/ts/native.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ export interface OtelMetricsExporterOptions {
100100

101101
export declare function newClient(runtime: Runtime, clientOptions: ClientOptions): Promise<Client>;
102102

103-
export declare function clientUpdateHeaders(client: Client, headers: Record<string, string>): void;
103+
export declare function clientUpdateHeaders(client: Client, headers: Record<string, MetadataValue>): void;
104104

105105
export declare function clientUpdateApiKey(client: Client, apiKey: string): void;
106106

@@ -124,7 +124,7 @@ export interface ClientOptions {
124124
clientVersion: string;
125125
tls: Option<TLSConfig>;
126126
httpConnectProxy: Option<HttpConnectProxy>;
127-
headers: Option<Record<string, string>>;
127+
headers: Option<Record<string, MetadataValue>>;
128128
apiKey: Option<string>;
129129
disableErrorCodeMetricTags: boolean;
130130
}
@@ -157,7 +157,7 @@ export interface RpcCall {
157157
rpc: string;
158158
req: Buffer;
159159
retry: boolean;
160-
metadata: Record<string, string>;
160+
metadata: Record<string, MetadataValue>;
161161
timeout: Option<number>;
162162
}
163163

@@ -191,6 +191,16 @@ export interface Worker {
191191
type: 'worker';
192192
}
193193

194+
export type MetadataValue =
195+
| {
196+
type: 'ascii';
197+
value: string;
198+
}
199+
| {
200+
type: 'binary';
201+
value: Buffer;
202+
};
203+
194204
export interface WorkerOptions {
195205
identity: string;
196206
buildId: string;

packages/test/src/test-client-connection.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ async function bindLocalhostTls(server: grpc.Server): Promise<number> {
6060

6161
test('withMetadata / withDeadline / withAbortSignal set the CallContext for RPC call', async (t) => {
6262
let gotTestHeaders = false;
63+
let gotStaticBinValue;
64+
let gotOtherBinValue;
6365
let gotDeadline = false;
6466
const authTokens: string[] = [];
6567
const deadline = Date.now() + 10000;
@@ -89,6 +91,8 @@ test('withMetadata / withDeadline / withAbortSignal set the CallContext for RPC
8991
) {
9092
gotTestHeaders = true;
9193
}
94+
gotStaticBinValue = call.metadata.get('staticKey-bin');
95+
gotOtherBinValue = call.metadata.get('otherKey-bin');
9296
const receivedDeadline = call.getDeadline();
9397
// For some reason the deadline the server gets is slightly different from the one we send in the client
9498
if (typeof receivedDeadline === 'number' && receivedDeadline >= deadline && receivedDeadline - deadline < 1000) {
@@ -108,16 +112,18 @@ test('withMetadata / withDeadline / withAbortSignal set the CallContext for RPC
108112
const port = await bindLocalhost(server);
109113
const conn = await Connection.connect({
110114
address: `127.0.0.1:${port}`,
111-
metadata: { staticKey: 'set' },
115+
metadata: { staticKey: 'set', 'staticKey-bin': Buffer.from([0x00]) },
112116
apiKey: 'test-token',
113117
});
114118
await conn.withMetadata({ test: 'true' }, () =>
115-
conn.withMetadata({ otherKey: 'set' }, () =>
119+
conn.withMetadata({ otherKey: 'set', 'otherKey-bin': Buffer.from([0x01]) }, () =>
116120
conn.withDeadline(deadline, () => conn.workflowService.registerNamespace({}))
117121
)
118122
);
119123
t.true(gotTestHeaders);
120124
t.true(gotDeadline);
125+
t.deepEqual(gotStaticBinValue, [Buffer.from([0x00])]);
126+
t.deepEqual(gotOtherBinValue, [Buffer.from([0x01])]);
121127
await conn.withApiKey('tt-2', () => conn.workflowService.startWorkflowExecution({}));
122128
conn.setApiKey('tt-3');
123129
await conn.workflowService.startWorkflowExecution({});

packages/test/src/test-native-connection.ts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,13 +203,18 @@ test('withMetadata and withDeadline propagate metadata and deadline', async (t)
203203
);
204204
const connection = await NativeConnection.connect({
205205
address: `127.0.0.1:${port}`,
206+
metadata: { 'default-bin': Buffer.from([0x00]) },
206207
});
207208

208209
await connection.withDeadline(Date.now() + 10_000, () =>
209-
connection.withMetadata({ test: 'true' }, () => connection.workflowService.getSystemInfo({}))
210+
connection.withMetadata({ test: 'true', 'other-bin': Buffer.from([0x01]) }, () =>
211+
connection.workflowService.getSystemInfo({})
212+
)
210213
);
211214
t.is(requests.length, 2);
212215
t.is(requests[1].metadata.get('test').toString(), 'true');
216+
t.deepEqual(requests[1].metadata.get('default-bin'), [Buffer.from([0x00])]);
217+
t.deepEqual(requests[1].metadata.get('other-bin'), [Buffer.from([0x01])]);
213218
t.true(typeof requests[1].deadline === 'number' && requests[1].deadline > 5_000);
214219
await connection.close();
215220
server.forceShutdown();
@@ -434,3 +439,39 @@ test('can power workflow client calls', async (t) => {
434439
await env.teardown();
435440
}
436441
});
442+
443+
test('setMetadata accepts binary headers', async (t) => {
444+
const requests = new Array<{ metadata: grpc.Metadata; deadline: grpc.Deadline }>();
445+
const server = new grpc.Server();
446+
server.addService(workflowServiceProtoDescriptor.temporal.api.workflowservice.v1.WorkflowService.service, {
447+
getSystemInfo(
448+
call: grpc.ServerUnaryCall<
449+
temporal.api.workflowservice.v1.IGetSystemInfoRequest,
450+
temporal.api.workflowservice.v1.IGetSystemInfoResponse
451+
>,
452+
callback: grpc.sendUnaryData<temporal.api.workflowservice.v1.IGetSystemInfoResponse>
453+
) {
454+
requests.push({ metadata: call.metadata, deadline: call.getDeadline() });
455+
callback(null, {});
456+
},
457+
});
458+
459+
const port = await util.promisify(server.bindAsync.bind(server))(
460+
'localhost:0',
461+
grpc.ServerCredentials.createInsecure()
462+
);
463+
const connection = await NativeConnection.connect({
464+
address: `127.0.0.1:${port}`,
465+
metadata: { 'start-ascii': 'a', 'start-bin': Buffer.from([0x00]) },
466+
});
467+
468+
await connection.setMetadata({ 'end-bin': Buffer.from([0x01]) });
469+
470+
await connection.workflowService.getSystemInfo({});
471+
t.is(requests.length, 2);
472+
t.deepEqual(requests[1].metadata.get('start-bin'), []);
473+
t.deepEqual(requests[1].metadata.get('start-ascii'), []);
474+
t.deepEqual(requests[1].metadata.get('end-bin'), [Buffer.from([0x01])]);
475+
await connection.close();
476+
server.forceShutdown();
477+
});

0 commit comments

Comments
 (0)