Skip to content

Commit ffa0bdc

Browse files
break up
1 parent 491efbd commit ffa0bdc

File tree

7 files changed

+46
-6
lines changed

7 files changed

+46
-6
lines changed

packages/interceptors-opentelemetry/src/instrumentation.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ const payloadConverter = defaultPayloadConverter;
1515
/**
1616
* If found, return an otel Context deserialized from the provided headers
1717
*/
18-
export async function extractContextFromHeaders(headers: Headers): Promise<otel.Context | undefined> {
18+
export function extractContextFromHeaders(headers: Headers): otel.Context | undefined {
1919
const encodedSpanContext = headers[TRACE_HEADER];
2020
if (encodedSpanContext === undefined) {
2121
return undefined;

packages/interceptors-opentelemetry/src/worker/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class OpenTelemetryActivityInboundInterceptor implements ActivityInboundC
3434
}
3535

3636
async execute(input: ActivityExecuteInput, next: Next<ActivityInboundCallsInterceptor, 'execute'>): Promise<unknown> {
37-
const context = await extractContextFromHeaders(input.headers);
37+
const context = extractContextFromHeaders(input.headers);
3838
const spanName = `${SpanName.ACTIVITY_EXECUTE}${SPAN_DELIMITER}${this.ctx.info.activityType}`;
3939
return await instrument({ tracer: this.tracer, spanName, fn: () => next(input), context });
4040
}

packages/interceptors-opentelemetry/src/workflow/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInte
5656
input: WorkflowExecuteInput,
5757
next: Next<WorkflowInboundCallsInterceptor, 'execute'>
5858
): Promise<unknown> {
59-
const context = await extractContextFromHeaders(input.headers);
59+
const context = await Promise.resolve(extractContextFromHeaders(input.headers));
6060
return await instrument({
6161
tracer: this.tracer,
6262
spanName: `${SpanName.WORKFLOW_EXECUTE}${SPAN_DELIMITER}${workflowInfo().workflowType}`,
@@ -70,7 +70,7 @@ export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInte
7070
input: SignalInput,
7171
next: Next<WorkflowInboundCallsInterceptor, 'handleSignal'>
7272
): Promise<void> {
73-
const context = await extractContextFromHeaders(input.headers);
73+
const context = extractContextFromHeaders(input.headers);
7474
return await instrument({
7575
tracer: this.tracer,
7676
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,

packages/test/src/test-otel.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ test('Can replay otel history from 1.13.1', async (t) => {
630630
workflowInterceptorModules: [require.resolve('./workflows/signal-start-otel')],
631631
}),
632632
interceptors: {
633-
workflowModules: [require.resolve('./workflows/otel-interceptors')],
633+
workflowModules: [require.resolve('./workflows/signal-start-otel')],
634634
activity: [
635635
(ctx) => ({
636636
inbound: new OpenTelemetryActivityInboundInterceptor(ctx),

packages/worker/src/workflow/vm-shared.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,7 @@ export abstract class BaseVMWorkflow implements Workflow {
357357
},
358358
}));
359359
this.activator.addKnownFlags(activation.availableInternalFlags ?? []);
360+
if (activation.lastSdkVersion) this.activator.sdkVersion = activation.lastSdkVersion;
360361

361362
// Initialization of the workflow must happen before anything else. Yet, keep the init job in
362363
// place in the list as we'll use it as a marker to know when to start the workflow function.

packages/workflow/src/flags.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ export const SdkFlags = {
4444
* to implicitely have this flag on.
4545
*/
4646
ProcessWorkflowActivationJobsAsSingleBatch: defineFlag(2, true, [buildIdSdkVersionMatches(/1\.11\.[01]/)]),
47+
48+
OpenTelemetryInterceptorInsertYieldPoint: defineFlag(3, false, [({ info }) => false]),
4749
} as const;
4850

4951
function defineFlag(id: number, def: boolean, alternativeConditions?: AltConditionFn[]): SdkFlag {

packages/workflow/src/internals.ts

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,8 @@ export class Activator implements ActivationHandler {
434434

435435
private readonly knownFlags = new Set<number>();
436436

437+
sdkVersion?: string;
438+
437439
/**
438440
* Buffered sink calls per activation
439441
*/
@@ -978,7 +980,22 @@ export class Activator implements ActivationHandler {
978980

979981
const signalExecutionNum = this.signalHandlerExecutionSeq++;
980982
this.inProgressSignals.set(signalExecutionNum, { name: signalName, unfinishedPolicy });
981-
const execute = composeInterceptors(interceptors, 'handleSignal', this.signalWorkflowNextHandler.bind(this));
983+
const injectYield = shouldInjectYield(this.sdkVersion);
984+
const addedInterceptor: WorkflowInterceptors['inbound'] = injectYield
985+
? [
986+
{
987+
handleSignal: async (input, next) => {
988+
await Promise.resolve();
989+
return next(input);
990+
},
991+
},
992+
]
993+
: [];
994+
const execute = composeInterceptors(
995+
[...addedInterceptor, ...interceptors],
996+
'handleSignal',
997+
this.signalWorkflowNextHandler.bind(this)
998+
);
982999
execute({
9831000
args: arrayFromPayloads(this.payloadConverter, activation.input),
9841001
signalName,
@@ -1305,3 +1322,23 @@ then you can disable this warning by passing an option when setting the handler:
13051322
Array.from(names.entries()).map(([name, count]) => ({ name, count }))
13061323
)}`;
13071324
}
1325+
1326+
function shouldInjectYield(version?: string): boolean {
1327+
if (!version) {
1328+
return false;
1329+
}
1330+
const [major, minor, patch] = version.split('.');
1331+
// 1.11.5 - 1.13.1: need to inject
1332+
if (major !== '1') return false;
1333+
1334+
switch (minor) {
1335+
case '11':
1336+
return patch === '5';
1337+
case '12':
1338+
return true;
1339+
case '13':
1340+
return patch === '1';
1341+
default:
1342+
return false;
1343+
}
1344+
}

0 commit comments

Comments
 (0)