Skip to content

Commit 4c84fc8

Browse files
authored
deflake activity pause/reset tests (#1808)
1 parent ab35135 commit 4c84fc8

File tree

3 files changed

+80
-165
lines changed

3 files changed

+80
-165
lines changed

packages/test/src/activities/heartbeat-cancellation-details.ts

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,64 @@
11
import { ActivityCancellationDetails } from '@temporalio/common';
22
import * as activity from '@temporalio/activity';
33

4+
export interface ActivityState {
5+
pause?: boolean;
6+
unpause?: boolean;
7+
reset?: boolean;
8+
shouldRetry?: boolean;
9+
}
10+
411
export async function heartbeatCancellationDetailsActivity(
5-
catchErr: boolean
12+
state: ActivityState
613
): Promise<ActivityCancellationDetails | undefined> {
14+
const info = activity.activityInfo();
715
// Exit early if we've already run this activity.
8-
if (activity.activityInfo().heartbeatDetails === 'finally-complete') {
16+
if (info.attempt > 1) {
917
return activity.cancellationDetails();
1018
}
19+
20+
// Otherwise, either pause or reset this activity (or both).
21+
const client = activity.getClient();
22+
const req = {
23+
namespace: client.options.namespace,
24+
execution: {
25+
workflowId: info.workflowExecution.workflowId,
26+
runId: info.workflowExecution.runId,
27+
},
28+
id: info.activityId,
29+
};
30+
// Pause AND reset the activity.
31+
if (state.pause && state.reset) {
32+
await Promise.all([client.workflowService.pauseActivity(req), client.workflowService.resetActivity(req)]);
33+
// Just pause.
34+
} else if (state.pause) {
35+
await client.workflowService.pauseActivity(req);
36+
// Just reset.
37+
} else if (state.reset) {
38+
await client.workflowService.resetActivity(req);
39+
}
40+
1141
// eslint-disable-next-line no-constant-condition
1242
while (true) {
1343
try {
14-
activity.heartbeat('heartbeated');
44+
// Heartbeat to propagate cancellation signals from pause/reset.
45+
activity.heartbeat();
1546
await activity.sleep(300);
1647
} catch (err) {
17-
if (err instanceof activity.CancelledFailure && catchErr) {
48+
// If we encountered an unexpected, non-cancellation failure,
49+
// throw a non-retryable error to fail the activity.
50+
if (!(err instanceof activity.CancelledFailure)) {
51+
throw activity.ApplicationFailure.nonRetryable('Unexpected failure', 'Error', err);
52+
}
53+
// If we don't want the activity to retry, return the cancellation details immediately.
54+
if (!state.shouldRetry) {
1855
return activity.cancellationDetails();
1956
}
20-
activity.heartbeat('finally-complete');
57+
// Unpause if requested (a paused activity with not retry).
58+
if (state.unpause) {
59+
await client.workflowService.unpauseActivity(req);
60+
}
61+
// Re-throw the cancellation to retry the activity
2162
throw err;
2263
}
2364
}

packages/test/src/helpers-integration.ts

Lines changed: 1 addition & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import {
2929
import * as workflow from '@temporalio/workflow';
3030
import { temporal } from '@temporalio/proto';
3131
import { defineSearchAttributeKey, SearchAttributeType } from '@temporalio/common/lib/search-attributes';
32-
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions, waitUntil } from './helpers';
32+
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions } from './helpers';
3333

3434
export interface Context {
3535
env: TestWorkflowEnvironment;
@@ -296,89 +296,6 @@ export function configurableHelpers<T>(
296296
};
297297
}
298298

299-
export async function setActivityState(
300-
handle: WorkflowHandle,
301-
activityId: string,
302-
state: 'pause' | 'unpause' | 'reset' | 'pause & reset'
303-
): Promise<void> {
304-
const desc = await handle.describe();
305-
const req = {
306-
namespace: handle.client.options.namespace,
307-
execution: {
308-
workflowId: desc.raw.workflowExecutionInfo?.execution?.workflowId,
309-
runId: desc.raw.workflowExecutionInfo?.execution?.runId,
310-
},
311-
id: activityId,
312-
};
313-
if (state === 'pause') {
314-
await handle.client.workflowService.pauseActivity(req);
315-
} else if (state === 'unpause') {
316-
await handle.client.workflowService.unpauseActivity(req);
317-
} else if (state === 'reset') {
318-
await handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true });
319-
} else {
320-
await Promise.all([
321-
handle.client.workflowService.pauseActivity(req),
322-
handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true }),
323-
]);
324-
}
325-
await waitUntil(async () => {
326-
const { raw } = await handle.describe();
327-
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
328-
// If we are pausing: success when either
329-
// • paused flag is true OR
330-
// • the activity vanished (it completed / retried)
331-
if (state === 'pause') {
332-
if (!activityInfo) {
333-
return true; // Activity vanished (completed/retried)
334-
}
335-
return activityInfo.paused ?? false;
336-
} else if (state === 'unpause') {
337-
// If we are unpausing: success when either
338-
// • paused flag is false OR
339-
// • the activity vanished (already completed)
340-
return activityInfo ? !activityInfo.paused : true;
341-
} else if (state === 'reset') {
342-
// If we are resetting, success when either
343-
// • heartbeat details have been reset OR
344-
// • the activity vanished (completed / retried)
345-
return activityInfo ? activityInfo.heartbeatDetails === null : true;
346-
} else {
347-
// If we are pausing & resetting, success when either
348-
// • activity is paused AND heartbeat details have been reset OR
349-
// • the activity vanished (completed / retried)
350-
if (!activityInfo) {
351-
return true; // Activity vanished (completed/retried)
352-
}
353-
const isPaused = activityInfo.paused ?? false;
354-
const isHeartbeatReset = activityInfo.heartbeatDetails === null;
355-
return isPaused && isHeartbeatReset;
356-
}
357-
}, 15000);
358-
}
359-
360-
// Helper function to check if an activity has heartbeated
361-
export async function hasActivityHeartbeat(
362-
handle: WorkflowHandle,
363-
activityId: string,
364-
expectedContent?: string
365-
): Promise<boolean> {
366-
const { raw } = await handle.describe();
367-
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
368-
const heartbeatData = activityInfo?.heartbeatDetails?.payloads?.[0]?.data;
369-
if (!heartbeatData) return false;
370-
371-
// If no expected content specified, just check that heartbeat data exists
372-
if (!expectedContent) return true;
373-
374-
try {
375-
const decoded = Buffer.from(heartbeatData).toString();
376-
return decoded.includes(expectedContent);
377-
} catch {
378-
return false;
379-
}
380-
}
381-
382299
export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
383300
return configurableHelpers(t, t.context.workflowBundle, testEnv);
384301
}

packages/test/src/test-integration-workflows.ts

Lines changed: 33 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,9 @@ import { encode } from '@temporalio/common/lib/encoding';
4242
import { signalSchedulingWorkflow } from './activities/helpers';
4343
import { activityStartedSignal } from './workflows/definitions';
4444
import * as workflows from './workflows';
45-
import {
46-
Context,
47-
createLocalTestEnvironment,
48-
hasActivityHeartbeat,
49-
helpers,
50-
makeTestFunction,
51-
setActivityState,
52-
} from './helpers-integration';
45+
import { Context, createLocalTestEnvironment, helpers, makeTestFunction } from './helpers-integration';
5346
import { overrideSdkInternalFlag } from './mock-internal-flags';
54-
import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details';
47+
import { ActivityState, heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details';
5548
import { loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers';
5649

5750
const test = makeTestFunction({
@@ -1441,51 +1434,33 @@ test('Workflow can return root workflow', async (t) => {
14411434
});
14421435
});
14431436

1444-
export async function heartbeatPauseWorkflow(
1445-
activityId: string,
1446-
catchErr: boolean,
1447-
maximumAttempts: number
1437+
export async function heartbeatCancellationWorkflow(
1438+
state: ActivityState
14481439
): Promise<ActivityCancellationDetails | undefined> {
14491440
const { heartbeatCancellationDetailsActivity } = workflow.proxyActivities({
14501441
startToCloseTimeout: '5s',
1451-
activityId,
14521442
retry: {
1453-
maximumAttempts,
1443+
maximumAttempts: 2,
14541444
},
14551445
heartbeatTimeout: '1s',
14561446
});
14571447

1458-
return await heartbeatCancellationDetailsActivity(catchErr);
1448+
return await heartbeatCancellationDetailsActivity(state);
14591449
}
14601450

14611451
test('Activity pause returns expected cancellation details', async (t) => {
1462-
const { createWorker, startWorkflow } = helpers(t);
1463-
1452+
const { createWorker, executeWorkflow } = helpers(t);
14641453
const worker = await createWorker({
14651454
activities: {
14661455
heartbeatCancellationDetailsActivity,
14671456
},
14681457
});
14691458

14701459
await worker.runUntil(async () => {
1471-
const testActivityId = randomUUID();
1472-
const handle = await startWorkflow(heartbeatPauseWorkflow, {
1473-
args: [testActivityId, true, 1],
1460+
const result = await executeWorkflow(heartbeatCancellationWorkflow, {
1461+
args: [{ pause: true }],
14741462
});
14751463

1476-
// Wait for activity to appear in pending activities AND start heartbeating
1477-
await waitUntil(async () => {
1478-
const { raw } = await handle.describe();
1479-
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId);
1480-
// Check both: activity exists and has heartbeated
1481-
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
1482-
}, 10000);
1483-
1484-
// Now pause the activity
1485-
await setActivityState(handle, testActivityId, 'pause');
1486-
// Get the result - should contain pause cancellation details
1487-
const result = await handle.result();
1488-
14891464
t.deepEqual(result, {
14901465
cancelRequested: false,
14911466
notFound: false,
@@ -1498,7 +1473,7 @@ test('Activity pause returns expected cancellation details', async (t) => {
14981473
});
14991474

15001475
test('Activity can be cancelled via pause and retry after unpause', async (t) => {
1501-
const { createWorker, startWorkflow } = helpers(t);
1476+
const { createWorker, executeWorkflow } = helpers(t);
15021477

15031478
const worker = await createWorker({
15041479
activities: {
@@ -1507,46 +1482,25 @@ test('Activity can be cancelled via pause and retry after unpause', async (t) =>
15071482
});
15081483

15091484
await worker.runUntil(async () => {
1510-
const testActivityId = randomUUID();
1511-
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, false, 2] });
1512-
1513-
// Wait for it to exist and heartbeat
1514-
await waitUntil(async () => {
1515-
const { raw } = await handle.describe();
1516-
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId);
1517-
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
1518-
}, 10000);
1519-
1520-
await setActivityState(handle, testActivityId, 'pause');
1521-
await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'finally-complete'), 10000);
1522-
await setActivityState(handle, testActivityId, 'unpause');
1523-
1524-
const result = await handle.result();
1485+
const result = await executeWorkflow(heartbeatCancellationWorkflow, {
1486+
args: [{ pause: true, unpause: true, shouldRetry: true }],
1487+
});
1488+
// Note that we expect the result to be null because unpausing an activity
1489+
// resets the activity context (akin to starting the activity anew)
15251490
t.true(result == null);
15261491
});
15271492
});
15281493

1529-
test('Activity reset returns expected cancellation details', async (t) => {
1530-
const { createWorker, startWorkflow } = helpers(t);
1494+
test('Activity reset without retry returns expected cancellation details', async (t) => {
1495+
const { createWorker, executeWorkflow } = helpers(t);
15311496
const worker = await createWorker({
15321497
activities: {
15331498
heartbeatCancellationDetailsActivity,
15341499
},
15351500
});
15361501

15371502
await worker.runUntil(async () => {
1538-
const testActivityId = randomUUID();
1539-
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] });
1540-
1541-
// Wait for it to exist and heartbeat
1542-
await waitUntil(async () => {
1543-
const { raw } = await handle.describe();
1544-
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId);
1545-
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
1546-
}, 10000);
1547-
1548-
await setActivityState(handle, testActivityId, 'reset');
1549-
const result = await handle.result();
1503+
const result = await executeWorkflow(heartbeatCancellationWorkflow, { args: [{ reset: true }] });
15501504
t.deepEqual(result, {
15511505
cancelRequested: false,
15521506
notFound: false,
@@ -1558,27 +1512,30 @@ test('Activity reset returns expected cancellation details', async (t) => {
15581512
});
15591513
});
15601514

1561-
test('Activity set as both paused and reset returns expected cancellation details', async (t) => {
1562-
const { createWorker, startWorkflow } = helpers(t);
1515+
test('Activity reset with retry returns expected cancellation details', async (t) => {
1516+
const { createWorker, executeWorkflow } = helpers(t);
15631517
const worker = await createWorker({
15641518
activities: {
15651519
heartbeatCancellationDetailsActivity,
15661520
},
15671521
});
15681522

15691523
await worker.runUntil(async () => {
1570-
const testActivityId = randomUUID();
1571-
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] });
1524+
const result = await executeWorkflow(heartbeatCancellationWorkflow, { args: [{ reset: true, shouldRetry: true }] });
1525+
t.true(result == null);
1526+
});
1527+
});
15721528

1573-
// Wait for it to exist and heartbeat
1574-
await waitUntil(async () => {
1575-
const { raw } = await handle.describe();
1576-
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId);
1577-
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
1578-
}, 10000);
1529+
test('Activity paused and reset returns expected cancellation details', async (t) => {
1530+
const { createWorker, executeWorkflow } = helpers(t);
1531+
const worker = await createWorker({
1532+
activities: {
1533+
heartbeatCancellationDetailsActivity,
1534+
},
1535+
});
15791536

1580-
await setActivityState(handle, testActivityId, 'pause & reset');
1581-
const result = await handle.result();
1537+
await worker.runUntil(async () => {
1538+
const result = await executeWorkflow(heartbeatCancellationWorkflow, { args: [{ pause: true, reset: true }] });
15821539
t.deepEqual(result, {
15831540
cancelRequested: false,
15841541
notFound: false,

0 commit comments

Comments
 (0)