Skip to content

Commit 5d3da3d

Browse files
feat: support for DSM (#672)
* set checkpoint for all supported dsm queue types * removed not needed checkpoint set * clean * clearer return type * add tests * add the manual checkpoint parameter * Correct approach to ensure public api is used * remove unused variable * match logging patterns of repo * lint * lint * add False manual_checkpoint false * fix formatting * optimization * clean * add prettier ignore * add optional chaining * add log debug for no arn case * merge commit * add checkpoints for headers * Update SQS to work with batch processing * Update Kinesis to work with batch processing * Update SNS-SQS to work with batch processing * Prettier changes * Fix test for set sqs-sns * Batch processing for sns * Add a more extensive DD_DATA_STREAMS_ENABLED function * Early continue when there is context for batch processing for all queues, rather than just kinesis * Break out of the loop of records in batch processing when there is no DSM for speed * Move the env var check on data streams enabled to config and break out of the loop of records earlier when data streams is not enabled * test because i think the integration test failed for flake reasons * Move from getConfig() to passed in config for dataStreamsEnabled * Update the extraction functions to separate the setting of dsm checkpoints with the context stuff completely * Move the config of dataStreamsEnabled out of the trace-wrapper but in the extractors * Optimize such that you only parse the first messages headers once in the case where DSM is instrumented * Update the dd-trace version * Fix test description to be accurate for DSM is disabled * empty --------- Co-authored-by: Eric Firth <eric.firth@datadoghq.com>
1 parent 1619bce commit 5d3da3d

File tree

16 files changed

+1497
-213
lines changed

16 files changed

+1497
-213
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"@types/node": "^20.12.10",
3030
"@types/promise-retry": "^1.1.3",
3131
"@types/shimmer": "^1.0.1",
32-
"dd-trace": "^5.58.0",
32+
"dd-trace": "^5.62.0",
3333
"jest": "^27.0.1",
3434
"mock-fs": "4.14.0",
3535
"nock": "13.5.4",

src/index.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ export const minColdStartTraceDurationEnvVar = "DD_MIN_COLD_START_DURATION";
5252
export const coldStartTraceSkipLibEnvVar = "DD_COLD_START_TRACE_SKIP_LIB";
5353
export const localTestingEnvVar = "DD_LOCAL_TESTING";
5454
export const addSpanPointersEnvVar = "DD_TRACE_AWS_ADD_SPAN_POINTERS";
55+
export const dataStreamsEnabledEnvVar = "DD_DATA_STREAMS_ENABLED";
5556

5657
interface GlobalConfig {
5758
/**
@@ -97,6 +98,7 @@ export const defaultConfig: Config = {
9798
coldStartTraceSkipLib: "",
9899
localTesting: false,
99100
addSpanPointers: true,
101+
dataStreamsEnabled: false,
100102
} as const;
101103

102104
export const _metricsQueue: MetricsQueue = new MetricsQueue();
@@ -419,6 +421,12 @@ function getConfig(userConfig?: Partial<Config>): Config {
419421
config.addSpanPointers = result === "true";
420422
}
421423

424+
if (userConfig === undefined || userConfig.dataStreamsEnabled === undefined) {
425+
const result = getEnvValue(dataStreamsEnabledEnvVar, "false").toLowerCase();
426+
const validEnabledValues = new Set(["true", "1", "yes", "y", "on"]);
427+
config.dataStreamsEnabled = validEnabledValues.has(result);
428+
}
429+
422430
return config;
423431
}
424432

src/trace/context/extractor.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,13 @@ export class TraceContextExtractor {
8686
return new HTTPEventTraceExtractor(this.tracerWrapper, this.config.decodeAuthorizerContext);
8787
}
8888

89-
if (EventValidator.isSNSEvent(event)) return new SNSEventTraceExtractor(this.tracerWrapper);
90-
if (EventValidator.isSNSSQSEvent(event)) return new SNSSQSEventTraceExtractor(this.tracerWrapper);
89+
if (EventValidator.isSNSEvent(event)) return new SNSEventTraceExtractor(this.tracerWrapper, this.config);
90+
if (EventValidator.isSNSSQSEvent(event)) return new SNSSQSEventTraceExtractor(this.tracerWrapper, this.config);
9191
if (EventValidator.isEventBridgeSQSEvent(event)) return new EventBridgeSQSEventTraceExtractor(this.tracerWrapper);
9292
if (EventValidator.isAppSyncResolverEvent(event)) return new AppSyncEventTraceExtractor(this.tracerWrapper);
93-
if (EventValidator.isSQSEvent(event)) return new SQSEventTraceExtractor(this.tracerWrapper);
94-
if (EventValidator.isKinesisStreamEvent(event)) return new KinesisEventTraceExtractor(this.tracerWrapper);
93+
if (EventValidator.isSQSEvent(event)) return new SQSEventTraceExtractor(this.tracerWrapper, this.config);
94+
if (EventValidator.isKinesisStreamEvent(event))
95+
return new KinesisEventTraceExtractor(this.tracerWrapper, this.config);
9596
if (EventValidator.isEventBridgeEvent(event)) return new EventBridgeEventTraceExtractor(this.tracerWrapper);
9697

9798
return;

src/trace/context/extractors/kinesis.spec.ts

Lines changed: 229 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@ import { TracerWrapper } from "../../tracer-wrapper";
22
import { KinesisEventTraceExtractor } from "./kinesis";
33

44
let mockSpanContext: any = null;
5+
let mockDataStreamsCheckpointer: any = {
6+
setConsumeCheckpoint: jest.fn(),
7+
};
8+
9+
jest.mock("dd-trace/packages/dd-trace/src/datastreams/checkpointer", () => {
10+
return {
11+
DataStreamsCheckpointer: jest.fn().mockImplementation(() => mockDataStreamsCheckpointer),
12+
};
13+
});
514

615
// Mocking extract is needed, due to dd-trace being a No-op
716
// if the detected environment is testing. This is expected, since
@@ -12,18 +21,31 @@ jest.mock("dd-trace", () => {
1221
...ddTrace,
1322
_tracer: { _service: {} },
1423
extract: (_carrier: any, _headers: any) => mockSpanContext,
24+
dataStreamsCheckpointer: mockDataStreamsCheckpointer,
1525
};
1626
});
1727
const spyTracerWrapper = jest.spyOn(TracerWrapper.prototype, "extract");
1828

1929
describe("KinesisEventTraceExtractor", () => {
30+
const mockConfig = {
31+
autoPatchHTTP: true,
32+
captureLambdaPayload: false,
33+
captureLambdaPayloadMaxDepth: 10,
34+
createInferredSpan: true,
35+
encodeAuthorizerContext: true,
36+
decodeAuthorizerContext: true,
37+
mergeDatadogXrayTraces: false,
38+
injectLogContext: false,
39+
minColdStartTraceDuration: 3,
40+
coldStartTraceSkipLib: "",
41+
addSpanPointers: true,
42+
dataStreamsEnabled: true,
43+
};
44+
2045
describe("extract", () => {
2146
beforeEach(() => {
2247
mockSpanContext = null;
23-
});
24-
25-
afterEach(() => {
26-
jest.resetModules();
48+
mockDataStreamsCheckpointer.setConsumeCheckpoint.mockClear();
2749
});
2850

2951
it("extracts trace context with valid payload", () => {
@@ -43,7 +65,7 @@ describe("KinesisEventTraceExtractor", () => {
4365
kinesisSchemaVersion: "1.0",
4466
partitionKey: "cdbfd750-cec0-4f0f-a4b0-82ae6152c7fb",
4567
sequenceNumber: "49625698045709644136382874226371117765484751339579768834",
46-
data: "eyJJJ20gbWFkZSBvZiB3YXgsIExhcnJ5IjoiV2hhdCBhcmUgeW91IG1hZGUgb2Y/IiwiX2RhdGFkb2ciOnsieC1kYXRhZG9nLXRyYWNlLWlkIjoiNjY3MzA5NTE0MjIxMDM1NTM4IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6IjEzNTA3MzUwMzU0OTc4MTE4MjgiLCJ4LWRhdGFkb2ctc2FtcGxlZCI6IjEiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIxIn19",
68+
data: "eyJJJ20gbWFkZSBvZiB3YXgsIExhcnJ5IjoiV2hhdCBhcmUgeW91IG1hZGUgb2Y/IiwiX2RhdGFkb2ciOnsieC1kYXRhZG9nLXRyYWNlLWlkIjoiNjY3MzA5NTE0MjIxMDM1NTM4IiwieC1kYXRhZG9nLXBhcmVudC1pZCI6IjEzNTA3MzUwMzU0OTc4MTE4MjgiLCJ4LWRhdGFkb2ctc2FtcGxlZCI6IjEiLCJ4LWRhdGFkb2ctc2FtcGxpbmctcHJpb3JpdHkiOiIxIiwiZGQtcGF0aHdheS1jdHgtYmFzZTY0Ijoic29tZS1iYXNlNjQtZW5jb2RlZC1jb250ZXh0In19Cg==",
4769
approximateArrivalTimestamp: 1642518727.248,
4870
},
4971
eventSource: "aws:kinesis",
@@ -57,7 +79,7 @@ describe("KinesisEventTraceExtractor", () => {
5779
],
5880
};
5981

60-
const extractor = new KinesisEventTraceExtractor(tracerWrapper);
82+
const extractor = new KinesisEventTraceExtractor(tracerWrapper, mockConfig);
6183

6284
const traceContext = extractor.extract(payload);
6385
expect(traceContext).not.toBeNull();
@@ -67,25 +89,151 @@ describe("KinesisEventTraceExtractor", () => {
6789
"x-datadog-sampled": "1",
6890
"x-datadog-sampling-priority": "1",
6991
"x-datadog-trace-id": "667309514221035538",
92+
"dd-pathway-ctx-base64": "some-base64-encoded-context",
7093
});
7194

7295
expect(traceContext?.toTraceId()).toBe("667309514221035538");
7396
expect(traceContext?.toSpanId()).toBe("1350735035497811828");
7497
expect(traceContext?.sampleMode()).toBe("1");
7598
expect(traceContext?.source).toBe("event");
99+
100+
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith(
101+
"kinesis",
102+
"arn:aws:kinesis:EXAMPLE",
103+
{
104+
"x-datadog-parent-id": "1350735035497811828",
105+
"x-datadog-sampled": "1",
106+
"x-datadog-sampling-priority": "1",
107+
"x-datadog-trace-id": "667309514221035538",
108+
"dd-pathway-ctx-base64": "some-base64-encoded-context",
109+
},
110+
false,
111+
);
76112
});
77113

78114
it.each([
79-
["Records", {}],
80-
["Records first entry", { Records: [] }],
81-
["valid data in kinesis", { Records: [{ kinesis: { data: "{" } }] }], // JSON.parse should fail
82-
["_datadog in data", { Records: [{ kinesis: { data: "e30=" } }] }],
83-
])("returns null and skips extracting when payload is missing '%s'", (_, payload) => {
115+
["Records", {}, 0],
116+
["Records first entry", { Records: [] }, 0],
117+
["valid data in kinesis", { Records: [{ kinesis: { data: "{" }, eventSourceARN: "arn:aws:kinesis:test" }] }, 1], // JSON.parse should fail
118+
["_datadog in data", { Records: [{ kinesis: { data: "e30=" }, eventSourceARN: "arn:aws:kinesis:test" }] }, 1],
119+
])("returns null and skips extracting when payload is missing '%s'", (_, payload, dsmCalls) => {
84120
const tracerWrapper = new TracerWrapper();
85-
const extractor = new KinesisEventTraceExtractor(tracerWrapper);
121+
const extractor = new KinesisEventTraceExtractor(tracerWrapper, mockConfig);
86122

87123
const traceContext = extractor.extract(payload as any);
88124
expect(traceContext).toBeNull();
125+
126+
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledTimes(dsmCalls);
127+
128+
if (dsmCalls > 0) {
129+
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledWith(
130+
"kinesis",
131+
"arn:aws:kinesis:test",
132+
null,
133+
false,
134+
);
135+
}
136+
});
137+
138+
it("calls setConsumeCheckpoint for each record", () => {
139+
mockSpanContext = {
140+
toTraceId: () => "667309514221035538",
141+
toSpanId: () => "1350735035497811828",
142+
_sampling: {
143+
priority: "1",
144+
},
145+
};
146+
const tracerWrapper = new TracerWrapper();
147+
148+
const makeKinesisRecord = (eventSourceARN: string, hasDatadogHeaders: boolean) => {
149+
const baseData = { some: "data" };
150+
const dataWithHeaders = hasDatadogHeaders
151+
? {
152+
...baseData,
153+
_datadog: {
154+
"x-datadog-trace-id": "667309514221035538",
155+
"x-datadog-parent-id": "1350735035497811828",
156+
"x-datadog-sampled": "1",
157+
"x-datadog-sampling-priority": "1",
158+
"dd-pathway-ctx-base64": "some-base64-encoded-context",
159+
},
160+
}
161+
: baseData;
162+
163+
const encodedData = Buffer.from(JSON.stringify(dataWithHeaders)).toString("base64");
164+
165+
return {
166+
kinesis: {
167+
kinesisSchemaVersion: "1.0",
168+
partitionKey: "test-partition",
169+
sequenceNumber: "12345",
170+
data: encodedData,
171+
approximateArrivalTimestamp: 1642518727.248,
172+
},
173+
eventSource: "aws:kinesis",
174+
eventID: "test-event-id",
175+
invokeIdentityArn: "arn:aws:iam::EXAMPLE",
176+
eventVersion: "1.0",
177+
eventName: "aws:kinesis:record",
178+
eventSourceARN: eventSourceARN,
179+
awsRegion: "us-east-1",
180+
};
181+
};
182+
183+
const payload = {
184+
Records: [
185+
makeKinesisRecord("arn:aws:kinesis:us-east-1:123456789012:stream/stream-1", true),
186+
makeKinesisRecord("arn:aws:kinesis:us-east-1:123456789012:stream/stream-2", false),
187+
makeKinesisRecord("arn:aws:kinesis:us-east-1:123456789012:stream/stream-3", true),
188+
makeKinesisRecord("arn:aws:kinesis:us-east-1:123456789012:stream/stream-4", false),
189+
],
190+
};
191+
192+
const extractor = new KinesisEventTraceExtractor(tracerWrapper, mockConfig);
193+
const traceContext = extractor.extract(payload);
194+
expect(traceContext).not.toBeNull();
195+
196+
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledTimes(4);
197+
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenNthCalledWith(
198+
1,
199+
"kinesis",
200+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream-1",
201+
{
202+
"x-datadog-trace-id": "667309514221035538",
203+
"x-datadog-parent-id": "1350735035497811828",
204+
"x-datadog-sampled": "1",
205+
"x-datadog-sampling-priority": "1",
206+
"dd-pathway-ctx-base64": "some-base64-encoded-context",
207+
},
208+
false,
209+
);
210+
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenNthCalledWith(
211+
2,
212+
"kinesis",
213+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream-2",
214+
null,
215+
false,
216+
);
217+
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenNthCalledWith(
218+
3,
219+
"kinesis",
220+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream-3",
221+
{
222+
"x-datadog-trace-id": "667309514221035538",
223+
"x-datadog-parent-id": "1350735035497811828",
224+
"x-datadog-sampled": "1",
225+
"x-datadog-sampling-priority": "1",
226+
"dd-pathway-ctx-base64": "some-base64-encoded-context",
227+
},
228+
false,
229+
);
230+
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenNthCalledWith(
231+
4,
232+
"kinesis",
233+
"arn:aws:kinesis:us-east-1:123456789012:stream/stream-4",
234+
null,
235+
false,
236+
);
89237
});
90238

91239
it("returns null when extracted span context by tracer is null", () => {
@@ -112,10 +260,78 @@ describe("KinesisEventTraceExtractor", () => {
112260
],
113261
};
114262

115-
const extractor = new KinesisEventTraceExtractor(tracerWrapper);
263+
const extractor = new KinesisEventTraceExtractor(tracerWrapper, mockConfig);
116264

117265
const traceContext = extractor.extract(payload);
118266
expect(traceContext).toBeNull();
119267
});
268+
269+
it("extracts trace context when DSM is disabled and does not call setConsumeCheckpoint", () => {
270+
mockSpanContext = {
271+
toTraceId: () => "667309514221035538",
272+
toSpanId: () => "1350735035497811828",
273+
_sampling: {
274+
priority: "1",
275+
},
276+
};
277+
const tracerWrapper = new TracerWrapper();
278+
279+
const makeKinesisRecord = (eventSourceARN: string, hasDatadogHeaders: boolean) => {
280+
const baseData = { some: "data" };
281+
const dataWithHeaders = hasDatadogHeaders
282+
? {
283+
...baseData,
284+
_datadog: {
285+
"x-datadog-trace-id": "667309514221035538",
286+
"x-datadog-parent-id": "1350735035497811828",
287+
"x-datadog-sampled": "1",
288+
"x-datadog-sampling-priority": "1",
289+
"dd-pathway-ctx-base64": "some-base64-encoded-context",
290+
},
291+
}
292+
: baseData;
293+
294+
const encodedData = Buffer.from(JSON.stringify(dataWithHeaders)).toString("base64");
295+
296+
return {
297+
kinesis: {
298+
kinesisSchemaVersion: "1.0",
299+
partitionKey: "test-partition",
300+
sequenceNumber: "12345",
301+
data: encodedData,
302+
approximateArrivalTimestamp: 1642518727.248,
303+
},
304+
eventSource: "aws:kinesis",
305+
eventID: "test-event-id",
306+
invokeIdentityArn: "arn:aws:iam::EXAMPLE",
307+
eventVersion: "1.0",
308+
eventName: "aws:kinesis:record",
309+
eventSourceARN: eventSourceARN,
310+
awsRegion: "us-east-1",
311+
};
312+
};
313+
314+
const payload = {
315+
Records: [
316+
makeKinesisRecord("arn:aws:kinesis:us-east-1:123456789012:stream/stream-1", true),
317+
makeKinesisRecord("arn:aws:kinesis:us-east-1:123456789012:stream/stream-2", false),
318+
makeKinesisRecord("arn:aws:kinesis:us-east-1:123456789012:stream/stream-3", true),
319+
],
320+
};
321+
322+
const disabledConfig = { ...mockConfig, dataStreamsEnabled: false };
323+
const extractor = new KinesisEventTraceExtractor(tracerWrapper, disabledConfig);
324+
const traceContext = extractor.extract(payload);
325+
326+
// Should still extract trace context from first record
327+
expect(traceContext).not.toBeNull();
328+
expect(traceContext?.toTraceId()).toBe("667309514221035538");
329+
expect(traceContext?.toSpanId()).toBe("1350735035497811828");
330+
expect(traceContext?.sampleMode()).toBe("1");
331+
expect(traceContext?.source).toBe("event");
332+
333+
// Should NOT call setConsumeCheckpoint when DSM is disabled
334+
expect(mockDataStreamsCheckpointer.setConsumeCheckpoint).toHaveBeenCalledTimes(0);
335+
});
120336
});
121337
});

0 commit comments

Comments
 (0)