Skip to content

Commit 03aaf14

Browse files
authored
Support gRPC Headers from ClientOptions and correct Scope Handling (#1039)
Allow additional headers to be added via ClientOptions, similar to TChannel. Additionally support removing headers by setting built-in headers values to null or an empty string. Refactor OpenTracingInterceptor to a separate file and correct Scope handling to resolve Context mismatches. Leaving the Scope open during initialization and then closing it during onClose is incorrect and can cause issues with other interceptors. It's difficult to test this change in isolation.
1 parent 4d2f165 commit 03aaf14

File tree

3 files changed

+301
-120
lines changed

3 files changed

+301
-120
lines changed

src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java

Lines changed: 20 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
package com.uber.cadence.internal.compatibility.proto.serviceclient;
1717

1818
import com.google.common.base.Strings;
19-
import com.google.protobuf.ByteString;
20-
import com.uber.cadence.api.v1.*;
2119
import com.uber.cadence.api.v1.DomainAPIGrpc;
2220
import com.uber.cadence.api.v1.MetaAPIGrpc;
2321
import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIBlockingStub;
@@ -32,7 +30,6 @@
3230
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIBlockingStub;
3331
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIFutureStub;
3432
import com.uber.cadence.internal.Version;
35-
import com.uber.cadence.internal.tracing.TracingPropagator;
3633
import com.uber.cadence.serviceclient.ClientOptions;
3734
import com.uber.cadence.serviceclient.auth.IAuthorizationProvider;
3835
import io.grpc.*;
@@ -41,13 +38,9 @@
4138
import io.opentelemetry.context.Context;
4239
import io.opentelemetry.context.propagation.TextMapPropagator;
4340
import io.opentelemetry.context.propagation.TextMapSetter;
44-
import io.opentracing.Scope;
45-
import io.opentracing.Span;
4641
import io.opentracing.Tracer;
4742
import java.nio.charset.StandardCharsets;
48-
import java.util.HashMap;
4943
import java.util.Map;
50-
import java.util.Objects;
5144
import java.util.concurrent.TimeUnit;
5245
import java.util.concurrent.atomic.AtomicBoolean;
5346
import org.slf4j.Logger;
@@ -116,6 +109,7 @@ final class GrpcServiceStubs implements IGrpcServiceStubs {
116109
if (!Strings.isNullOrEmpty(options.getIsolationGroup())) {
117110
headers.put(ISOLATION_GROUP_HEADER_KEY, options.getIsolationGroup());
118111
}
112+
mergeHeaders(headers, options.getHeaders());
119113

120114
Channel interceptedChannel =
121115
ClientInterceptors.intercept(
@@ -205,117 +199,7 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
205199
}
206200

207201
private ClientInterceptor newOpenTracingInterceptor(Tracer tracer) {
208-
return new ClientInterceptor() {
209-
private final TracingPropagator tracingPropagator = new TracingPropagator(tracer);
210-
private final String OPERATIONFORMAT = "cadence-%s";
211-
212-
@Override
213-
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
214-
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
215-
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
216-
next.newCall(method, callOptions)) {
217-
218-
@Override
219-
public void start(Listener<RespT> responseListener, Metadata headers) {
220-
Span span =
221-
tracingPropagator.spanByServiceMethod(
222-
String.format(OPERATIONFORMAT, method.getBareMethodName()));
223-
Scope scope = tracer.activateSpan(span);
224-
super.start(
225-
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
226-
responseListener) {
227-
@Override
228-
public void onClose(Status status, Metadata trailers) {
229-
try {
230-
super.onClose(status, trailers);
231-
} finally {
232-
span.finish();
233-
scope.close();
234-
}
235-
}
236-
},
237-
headers);
238-
}
239-
240-
@SuppressWarnings("unchecked")
241-
@Override
242-
public void sendMessage(ReqT message) {
243-
if (Objects.equals(method.getBareMethodName(), "StartWorkflowExecution")
244-
&& message instanceof StartWorkflowExecutionRequest) {
245-
StartWorkflowExecutionRequest request = (StartWorkflowExecutionRequest) message;
246-
Header newHeader = addTracingHeaders(request.getHeader());
247-
248-
// cast should not throw error as we are using the builder
249-
message = (ReqT) request.toBuilder().setHeader(newHeader).build();
250-
} else if (Objects.equals(method.getBareMethodName(), "StartWorkflowExecutionAsync")
251-
&& message instanceof StartWorkflowExecutionAsyncRequest) {
252-
StartWorkflowExecutionAsyncRequest request =
253-
(StartWorkflowExecutionAsyncRequest) message;
254-
Header newHeader = addTracingHeaders(request.getRequest().getHeader());
255-
256-
// cast should not throw error as we are using the builder
257-
message =
258-
(ReqT)
259-
request
260-
.toBuilder()
261-
.setRequest(request.getRequest().toBuilder().setHeader(newHeader))
262-
.build();
263-
} else if (Objects.equals(
264-
method.getBareMethodName(), "SignalWithStartWorkflowExecution")
265-
&& message instanceof SignalWithStartWorkflowExecutionRequest) {
266-
SignalWithStartWorkflowExecutionRequest request =
267-
(SignalWithStartWorkflowExecutionRequest) message;
268-
Header newHeader = addTracingHeaders(request.getStartRequest().getHeader());
269-
270-
// cast should not throw error as we are using the builder
271-
message =
272-
(ReqT)
273-
request
274-
.toBuilder()
275-
.setStartRequest(
276-
request.getStartRequest().toBuilder().setHeader(newHeader))
277-
.build();
278-
} else if (Objects.equals(
279-
method.getBareMethodName(), "SignalWithStartWorkflowExecutionAsync")
280-
&& message instanceof SignalWithStartWorkflowExecutionAsyncRequest) {
281-
SignalWithStartWorkflowExecutionAsyncRequest request =
282-
(SignalWithStartWorkflowExecutionAsyncRequest) message;
283-
Header newHeader =
284-
addTracingHeaders(request.getRequest().getStartRequest().getHeader());
285-
286-
// cast should not throw error as we are using the builder
287-
message =
288-
(ReqT)
289-
request
290-
.toBuilder()
291-
.setRequest(
292-
request
293-
.getRequest()
294-
.toBuilder()
295-
.setStartRequest(
296-
request
297-
.getRequest()
298-
.getStartRequest()
299-
.toBuilder()
300-
.setHeader(newHeader)))
301-
.build();
302-
}
303-
super.sendMessage(message);
304-
}
305-
306-
private Header addTracingHeaders(Header header) {
307-
Map<String, byte[]> headers = new HashMap<>();
308-
tracingPropagator.inject(headers);
309-
Header.Builder headerBuilder = header.toBuilder();
310-
headers.forEach(
311-
(k, v) ->
312-
headerBuilder.putFields(
313-
k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build()));
314-
return headerBuilder.build();
315-
}
316-
};
317-
}
318-
};
202+
return new OpenTracingInterceptor(tracer);
319203
}
320204

321205
private ClientInterceptor newTracingInterceptor() {
@@ -488,4 +372,22 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
488372
return next.newCall(method, callOptions.withDeadlineAfter(duration, TimeUnit.MILLISECONDS));
489373
}
490374
}
375+
376+
private static void mergeHeaders(Metadata metadata, Map<String, String> headers) {
377+
if (headers == null) {
378+
return;
379+
}
380+
for (Map.Entry<String, String> entry : headers.entrySet()) {
381+
Metadata.Key<String> key = Metadata.Key.of(entry.getKey(), Metadata.ASCII_STRING_MARSHALLER);
382+
// Allow headers to overwrite any defaults
383+
if (metadata.containsKey(key)) {
384+
metadata.removeAll(key);
385+
}
386+
// Only replace it if they specify a value.
387+
// This allows for removing headers
388+
if (!Strings.isNullOrEmpty(entry.getValue())) {
389+
metadata.put(key, entry.getValue());
390+
}
391+
}
392+
}
491393
}

0 commit comments

Comments
 (0)