Skip to content

Commit a1952bc

Browse files
clean up tests and flags
1 parent b8dadee commit a1952bc

File tree

5 files changed

+108
-107
lines changed

5 files changed

+108
-107
lines changed

packages/interceptors-opentelemetry/src/instrumentation.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export function extractContextFromHeaders(headers: Headers): otel.Context | unde
3232
/**
3333
* Given headers, return new headers with the current otel context inserted
3434
*/
35-
export async function headersWithContext(headers: Headers): Promise<Headers> {
35+
export function headersWithContext(headers: Headers): Headers {
3636
const carrier = {};
3737
otel.propagation.inject(otel.context.active(), carrier, otel.defaultTextMapSetter);
3838
return { ...headers, [TRACE_HEADER]: payloadConverter.toPayload(carrier) };

packages/interceptors-opentelemetry/src/workflow/index.ts

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,12 @@ export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInte
6666
const { getActivator, SdkFlags } = getSdkFlagsChecking();
6767
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryInterceptorInsertYield);
6868
const context = extractContextFromHeaders(input.headers);
69+
if (shouldInjectYield) await Promise.resolve();
6970
return await instrument({
7071
tracer: this.tracer,
7172
spanName: `${SpanName.WORKFLOW_EXECUTE}${SPAN_DELIMITER}${workflowInfo().workflowType}`,
7273
fn: () => next(input),
73-
context: shouldInjectYield ? await Promise.resolve(context) : context,
74+
context,
7475
acceptableErrors: (err) => err instanceof ContinueAsNew,
7576
});
7677
}
@@ -82,11 +83,12 @@ export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInte
8283
const { getActivator, SdkFlags } = getSdkFlagsChecking();
8384
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryHandleSignalInterceptorInsertYield);
8485
const context = extractContextFromHeaders(input.headers);
86+
if (shouldInjectYield) await Promise.resolve();
8587
return await instrument({
8688
tracer: this.tracer,
8789
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,
8890
fn: () => next(input),
89-
context: shouldInjectYield ? await Promise.resolve(context) : context,
91+
context,
9092
});
9193
}
9294
}
@@ -109,11 +111,14 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
109111
input: ActivityInput,
110112
next: Next<WorkflowOutboundCallsInterceptor, 'scheduleActivity'>
111113
): Promise<unknown> {
114+
const { getActivator, SdkFlags } = getSdkFlagsChecking();
115+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryInterceptorInsertYield);
112116
return await instrument({
113117
tracer: this.tracer,
114118
spanName: `${SpanName.ACTIVITY_START}${SPAN_DELIMITER}${input.activityType}`,
115119
fn: async () => {
116-
const headers = await headersWithContext(input.headers);
120+
const headers = headersWithContext(input.headers);
121+
if (shouldInjectYield) await Promise.resolve();
117122
return next({
118123
...input,
119124
headers,
@@ -126,11 +131,14 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
126131
input: LocalActivityInput,
127132
next: Next<WorkflowOutboundCallsInterceptor, 'scheduleLocalActivity'>
128133
): Promise<unknown> {
134+
const { getActivator, SdkFlags } = getSdkFlagsChecking();
135+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryScheduleLocalActivityInterceptorInsertYield);
129136
return await instrument({
130137
tracer: this.tracer,
131138
spanName: `${SpanName.ACTIVITY_START}${SPAN_DELIMITER}${input.activityType}`,
132139
fn: async () => {
133-
const headers = await headersWithContext(input.headers);
140+
const headers = headersWithContext(input.headers);
141+
if (shouldInjectYield) await Promise.resolve();
134142
return next({
135143
...input,
136144
headers,
@@ -143,11 +151,14 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
143151
input: StartChildWorkflowExecutionInput,
144152
next: Next<WorkflowOutboundCallsInterceptor, 'startChildWorkflowExecution'>
145153
): Promise<[Promise<string>, Promise<unknown>]> {
154+
const { getActivator, SdkFlags } = getSdkFlagsChecking();
155+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryInterceptorInsertYield);
146156
return await instrument({
147157
tracer: this.tracer,
148158
spanName: `${SpanName.CHILD_WORKFLOW_START}${SPAN_DELIMITER}${input.workflowType}`,
149159
fn: async () => {
150-
const headers = await headersWithContext(input.headers);
160+
const headers = headersWithContext(input.headers);
161+
if (shouldInjectYield) await Promise.resolve();
151162
return next({
152163
...input,
153164
headers,
@@ -161,11 +172,14 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
161172
next: Next<WorkflowOutboundCallsInterceptor, 'continueAsNew'>
162173
): Promise<never> {
163174
const { ContinueAsNew } = getWorkflowModule();
175+
const { getActivator, SdkFlags } = getSdkFlagsChecking();
176+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryInterceptorInsertYield);
164177
return await instrument({
165178
tracer: this.tracer,
166179
spanName: `${SpanName.CONTINUE_AS_NEW}${SPAN_DELIMITER}${input.options.workflowType}`,
167180
fn: async () => {
168-
const headers = await headersWithContext(input.headers);
181+
const headers = headersWithContext(input.headers);
182+
if (shouldInjectYield) await Promise.resolve();
169183
return next({
170184
...input,
171185
headers,
@@ -179,11 +193,14 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
179193
input: SignalWorkflowInput,
180194
next: Next<WorkflowOutboundCallsInterceptor, 'signalWorkflow'>
181195
): Promise<void> {
196+
const { getActivator, SdkFlags } = getSdkFlagsChecking();
197+
const shouldInjectYield = getActivator().hasFlag(SdkFlags.OpenTelemetryInterceptorInsertYield);
182198
return await instrument({
183199
tracer: this.tracer,
184200
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,
185201
fn: async () => {
186-
const headers = await headersWithContext(input.headers);
202+
const headers = headersWithContext(input.headers);
203+
if (shouldInjectYield) await Promise.resolve();
187204
return next({
188205
...input,
189206
headers,

packages/test/src/test-flags.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ function composeConditions(conditions: Conditions): NonNullable<Conditions>[numb
1616

1717
test('OpenTelemetryHandleSignalInterceptorInsertYield enabled by version', (t) => {
1818
const cases = [
19+
{ version: undefined, expected: false },
1920
{ version: '1.0.0', expected: false },
2021
{ version: '1.11.3', expected: false },
2122
{ version: '1.11.5', expected: true },
@@ -40,6 +41,8 @@ test('OpenTelemetryHandleSignalInterceptorInsertYield enabled by version', (t) =
4041

4142
test('OpenTelemetryInterceptorInsertYield enabled by version', (t) => {
4243
const cases = [
44+
// If there isn't any SDK version available we enable this flag as these yields were present since the initial version of the OTEL interceptors
45+
{ version: undefined, expected: true },
4346
{ version: '0.1.0', expected: true },
4447
{ version: '1.0.0', expected: true },
4548
{ version: '1.9.0-rc.0', expected: true },
@@ -57,3 +60,30 @@ test('OpenTelemetryInterceptorInsertYield enabled by version', (t) => {
5760
t.is(actual, expected, `Expected OpenTelemetryInterceptorInsertYield on ${version} to evaluate as ${expected}`);
5861
}
5962
});
63+
64+
test('OpenTelemetryScheduleLocalActivityInterceptorInsertYield enabled by version', (t) => {
65+
const cases = [
66+
{ version: undefined, expected: false },
67+
{ version: '1.0.0', expected: false },
68+
{ version: '1.11.3', expected: false },
69+
{ version: '1.11.5', expected: false },
70+
{ version: '1.11.6', expected: true },
71+
{ version: '1.12.0', expected: true },
72+
{ version: '1.13.1', expected: true },
73+
{ version: '1.13.2', expected: false },
74+
{ version: '1.14.0', expected: false },
75+
];
76+
for (const { version, expected } of cases) {
77+
const actual = composeConditions(
78+
SdkFlags.OpenTelemetryScheduleLocalActivityInterceptorInsertYield.alternativeConditions
79+
)({
80+
info: {} as WorkflowInfo,
81+
sdkVersion: version,
82+
});
83+
t.is(
84+
actual,
85+
expected,
86+
`Expected OpenTelemetryScheduleLocalActivityInterceptorInsertYield on ${version} to evaluate as ${expected}`
87+
);
88+
}
89+
});

packages/test/src/test-otel.ts

Lines changed: 2 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {
2424
import { OpenTelemetrySinks, SpanName, SPAN_DELIMITER } from '@temporalio/interceptors-opentelemetry/lib/workflow';
2525
import { DefaultLogger, InjectedSinks, Runtime } from '@temporalio/worker';
2626
import * as activities from './activities';
27-
import { loadHistory, RUN_INTEGRATION_TESTS, saveHistory, TestWorkflowEnvironment, Worker } from './helpers';
27+
import { loadHistory, RUN_INTEGRATION_TESTS, TestWorkflowEnvironment, Worker } from './helpers';
2828
import * as workflows from './workflows';
2929
import { createTestWorkflowBundle } from './helpers-integration';
3030

@@ -559,40 +559,7 @@ test('Can replay otel history from 1.13.1', async (t) => {
559559
});
560560

561561
test('Can replay smorgasbord from 1.13.1', async (t) => {
562-
/*
563-
const staticResource = new opentelemetry.resources.Resource({
564-
[SemanticResourceAttributes.SERVICE_NAME]: 'ts-test-otel-worker',
565-
});
566-
const worker = await Worker.create({
567-
workflowsPath: require.resolve('./workflows'),
568-
activities,
569-
taskQueue: 'test-otel-inbound',
570-
sinks: {
571-
exporter: makeWorkflowExporter(new InMemorySpanExporter(), staticResource),
572-
},
573-
interceptors: {
574-
workflowModules: [require.resolve('./workflows/otel-interceptors')],
575-
activity: [
576-
(ctx) => ({
577-
inbound: new OpenTelemetryActivityInboundInterceptor(ctx),
578-
}),
579-
],
580-
},
581-
});
582-
const client = new WorkflowClient();
583-
584-
const result = await worker.runUntil(async () => {
585-
const handle = await client.start(workflows.smorgasbord, {
586-
taskQueue: 'test-otel-inbound',
587-
workflowId: uuid4(),
588-
});
589-
const result = await handle.result();
590-
const history = await handle.fetchHistory();
591-
await saveHistory('smorg_with_otel.json', history);
592-
return result;
593-
});
594-
*/
595-
562+
// This test will trigger NDE if yield points for `scheduleActivity` and `startChildWorkflowExecution` are not inserted
596563
const hist = await loadHistory('otel_smorgasbord_1_13_1.json');
597564
await t.notThrowsAsync(async () => {
598565
await Worker.runReplayHistory(
@@ -616,41 +583,6 @@ test('Can replay smorgasbord from 1.13.1', async (t) => {
616583
});
617584

618585
test('Can replay signal workflow from 1.13.1', async (t) => {
619-
/*
620-
const staticResource = new opentelemetry.resources.Resource({
621-
[SemanticResourceAttributes.SERVICE_NAME]: 'ts-test-otel-worker',
622-
});
623-
const worker = await Worker.create({
624-
workflowsPath: require.resolve('./workflows/signal-workflow'),
625-
activities: [],
626-
taskQueue: 'test-otel-inbound',
627-
sinks: {
628-
exporter: makeWorkflowExporter(new InMemorySpanExporter(), staticResource),
629-
},
630-
interceptors: {
631-
workflowModules: [require.resolve('./workflows/otel-interceptors')],
632-
activity: [
633-
(ctx) => ({
634-
inbound: new OpenTelemetryActivityInboundInterceptor(ctx),
635-
}),
636-
],
637-
},
638-
});
639-
const client = new WorkflowClient();
640-
641-
const result = await worker.runUntil(async () => {
642-
const handle = await client.start(workflows.topSecretGreeting, {
643-
args: ['Temporal'],
644-
taskQueue: 'test-otel-inbound',
645-
workflowId: uuid4(),
646-
});
647-
const result = await handle.result();
648-
const history = await handle.fetchHistory();
649-
await saveHistory('signal_workflow_1_13_1.json', history);
650-
return result;
651-
});
652-
*/
653-
654586
const hist = await loadHistory('signal_workflow_1_13_1.json');
655587
await t.notThrowsAsync(async () => {
656588
await Worker.runReplayHistory(
@@ -671,5 +603,4 @@ test('Can replay signal workflow from 1.13.1', async (t) => {
671603
hist
672604
);
673605
});
674-
t.pass();
675606
});

packages/workflow/src/flags.ts

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,25 @@ export const SdkFlags = {
6363
* The interceptors provided by @temporalio/interceptors-opentelemetry initially had unnecessary yield points.
6464
* If replaying a workflow created from these versions a yield point is injected to prevent any NDE.
6565
*
66+
* If the history does not include the SDK version, default to enabled since the yields were present since the OTEL
67+
* package was created.
68+
*
69+
* @since Introduced in 1.13.2
70+
*/
71+
OpenTelemetryInterceptorInsertYield: defineFlag(3, false, [isBefore({ major: 1, minor: 13, patch: 2 }, true)]),
72+
/**
73+
* In 1.11.6, the `scheduleLocalActivity` interceptor was added to
74+
* `@temporalio/interceptors-opentelemetry` which added a yield point to the
75+
* outbound interceptor. This yield point was removed in 1.13.2.
76+
*
77+
* If replaying a workflow from 1.11.6 up to 1.13.1, we insert a yield point
78+
* in the interceptor to match the behavior.
79+
*
6680
* @since Introduced in 1.13.2
6781
*/
68-
OpenTelemetryInterceptorInsertYield: defineFlag(3, false, [isBefore({ major: 1, minor: 13, patch: 2 })]),
82+
OpenTelemetryScheduleLocalActivityInterceptorInsertYield: defineFlag(4, false, [
83+
isBetween({ major: 1, minor: 11, patch: 5 }, { major: 1, minor: 13, patch: 2 }),
84+
]),
6985
} as const;
7086

7187
function defineFlag(id: number, def: boolean, alternativeConditions?: AltConditionFn[]): SdkFlag {
@@ -103,6 +119,40 @@ type SemVer = {
103119
patch: number;
104120
};
105121

122+
/**
123+
* Creates an `AltConditionFn` that checks if the SDK version is before the provided version.
124+
* An optional default can be provided in case the SDK version is not available.
125+
*/
126+
function isBefore(compare: SemVer, missingDefault?: boolean): AltConditionFn {
127+
return isCompared(compare, 1, missingDefault);
128+
}
129+
130+
/**
131+
* Creates an `AltConditionFn` that checks if the SDK version is after the provided version.
132+
* An optional default can be provided in case the SDK version is not available.
133+
*/
134+
function isAfter(compare: SemVer, missingDefault?: boolean): AltConditionFn {
135+
return isCompared(compare, -1, missingDefault);
136+
}
137+
138+
/**
139+
* Creates an `AltConditionFn` that checks if the SDK version is between the provided versions.
140+
* The range check is exclusive.
141+
* An optional default can be provided in case the SDK version is not available.
142+
*/
143+
function isBetween(lowEnd: SemVer, highEnd: SemVer, missingDefault?: boolean): AltConditionFn {
144+
return (ctx) => isAfter(lowEnd, missingDefault)(ctx) && isBefore(highEnd, missingDefault)(ctx);
145+
}
146+
147+
function isCompared(compare: SemVer, comparison: -1 | 0 | 1, missingDefault: boolean = false): AltConditionFn {
148+
return ({ sdkVersion }) => {
149+
if (!sdkVersion) return missingDefault;
150+
const version = parseSemver(sdkVersion);
151+
if (!version) return missingDefault;
152+
return compareSemver(compare, version) === comparison;
153+
};
154+
}
155+
106156
function parseSemver(version: string): SemVer | undefined {
107157
const matches = version.match(/(\d+)\.(\d+)\.(\d+)/);
108158
if (!matches) return undefined;
@@ -132,30 +182,3 @@ function compareSemver(a: SemVer, b: SemVer): -1 | 0 | 1 {
132182
if (a.patch > b.patch) return 1;
133183
return 0;
134184
}
135-
136-
function isCompared(compare: SemVer, comparison: -1 | 0 | 1): AltConditionFn {
137-
return ({ sdkVersion }) => {
138-
if (!sdkVersion) throw new Error('no sdk version');
139-
if (!sdkVersion) return false;
140-
const version = parseSemver(sdkVersion);
141-
if (!version) throw new Error(`no version for ${sdkVersion}`);
142-
if (!version) return false;
143-
return compareSemver(compare, version) === comparison;
144-
};
145-
}
146-
147-
function isBefore(compare: SemVer): AltConditionFn {
148-
return isCompared(compare, 1);
149-
}
150-
151-
function isEqual(compare: SemVer): AltConditionFn {
152-
return isCompared(compare, 0);
153-
}
154-
155-
function isAfter(compare: SemVer): AltConditionFn {
156-
return isCompared(compare, -1);
157-
}
158-
159-
function isBetween(lowEnd: SemVer, highEnd: SemVer): AltConditionFn {
160-
return (ctx) => isAfter(lowEnd)(ctx) && isBefore(highEnd)(ctx);
161-
}

0 commit comments

Comments
 (0)