Skip to content

Commit 497029c

Browse files
authored
feat: eventbridge to sqs support (#412)
* feat: eventbridge to sqs support * feat: Dry up SNS span creation * fix: Remap nonconforming variables * feat: Specs for eb -> sqs events
1 parent c0c6e83 commit 497029c

File tree

6 files changed

+167
-51
lines changed

6 files changed

+167
-51
lines changed

event_samples/eventbridge-sqs.json

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"Records": [
3+
{
4+
"messageId": "e995e54f-1724-41fa-82c0-8b81821f854e",
5+
"receiptHandle": "AQEB4mIfRcyqtzn1X5Ss+ConhTejVGc+qnAcmu3/Z9ZvbNkaPcpuDLX/bzvPD/ZkAXJUXZcemGSJmd7L3snZHKMP2Ck8runZiyl4mubiLb444pZvdiNPuGRJ6a3FvgS/GQPzho/9nNMyOi66m8Viwh70v4EUCPGO4JmD3TTDAUrrcAnqU4WSObjfC/NAp9bI6wH2CEyAYEfex6Nxplbl/jBf9ZUG0I3m3vQd0Q4l4gd4jIR4oxQUglU2Tldl4Kx5fMUAhTRLAENri6HsY81avBkKd9FAuxONlsITB5uj02kOkvLlRGEcalqsKyPJ7AFaDLrOLaL3U+yReroPEJ5R5nwhLOEbeN5HROlZRXeaAwZOIN8BjqdeooYTIOrtvMEVb7a6OPLMdH1XB+ddevtKAH8K9Tm2ZjpaA7dtBGh1zFVHzBk=",
6+
"body": "{\"version\":\"0\",\"id\":\"af718b2a-b987-e8c0-7a2b-a188fad2661a\",\"detail-type\":\"my.Detail\",\"source\":\"my.Source\",\"account\":\"425362996713\",\"time\":\"2023-08-03T22:49:03Z\",\"region\":\"us-east-1\",\"resources\":[],\"detail\":{\"text\":\"Hello, world!\",\"_datadog\":{\"x-datadog-trace-id\":\"7379586022458917877\",\"x-datadog-parent-id\":\"2644033662113726488\",\"x-datadog-sampling-priority\":\"1\",\"x-datadog-tags\":\"_dd.p.dm=-0\",\"traceparent\":\"00-000000000000000066698e63821a03f5-24b17e9b6476c018-01\",\"tracestate\":\"dd=t.dm:-0;s:1\"}}}",
7+
"attributes": {
8+
"ApproximateReceiveCount": "1",
9+
"AWSTraceHeader": "Root=1-64cc2edd-112fbf1701d1355973a11d57;Parent=7d5a9776024b2d42;Sampled=0",
10+
"SentTimestamp": "1691102943638",
11+
"SenderId": "AIDAJXNJGGKNS7OSV23OI",
12+
"ApproximateFirstReceiveTimestamp": "1691102943647"
13+
},
14+
"messageAttributes": {},
15+
"md5OfBody": "93d9f0cd8886d1e000a1a0b7007bffc4",
16+
"eventSource": "aws:sqs",
17+
"eventSourceARN": "arn:aws:sqs:us-east-1:425362996713:lambda-eb-sqs-lambda-dev-demo-queue",
18+
"awsRegion": "us-east-1"
19+
}
20+
]
21+
}

src/trace/context.spec.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,38 @@ describe("readTraceFromEvent", () => {
251251
});
252252
});
253253

254+
it("can parse an EventBridge message in an SQS queue", () => {
255+
const result = readTraceFromEvent({
256+
Records: [
257+
{
258+
messageId: "e995e54f-1724-41fa-82c0-8b81821f854e",
259+
receiptHandle:
260+
"AQEB4mIfRcyqtzn1X5Ss+ConhTejVGc+qnAcmu3/Z9ZvbNkaPcpuDLX/bzvPD/ZkAXJUXZcemGSJmd7L3snZHKMP2Ck8runZiyl4mubiLb444pZvdiNPuGRJ6a3FvgS/GQPzho/9nNMyOi66m8Viwh70v4EUCPGO4JmD3TTDAUrrcAnqU4WSObjfC/NAp9bI6wH2CEyAYEfex6Nxplbl/jBf9ZUG0I3m3vQd0Q4l4gd4jIR4oxQUglU2Tldl4Kx5fMUAhTRLAENri6HsY81avBkKd9FAuxONlsITB5uj02kOkvLlRGEcalqsKyPJ7AFaDLrOLaL3U+yReroPEJ5R5nwhLOEbeN5HROlZRXeaAwZOIN8BjqdeooYTIOrtvMEVb7a6OPLMdH1XB+ddevtKAH8K9Tm2ZjpaA7dtBGh1zFVHzBk=",
261+
body: '{"version":"0","id":"af718b2a-b987-e8c0-7a2b-a188fad2661a","detail-type":"my.Detail","source":"my.Source","account":"425362996713","time":"2023-08-03T22:49:03Z","region":"us-east-1","resources":[],"detail":{"text":"Hello, world!","_datadog":{"x-datadog-trace-id":"7379586022458917877","x-datadog-parent-id":"2644033662113726488","x-datadog-sampling-priority":"1","x-datadog-tags":"_dd.p.dm=-0","traceparent":"00-000000000000000066698e63821a03f5-24b17e9b6476c018-01","tracestate":"dd=t.dm:-0;s:1"}}}',
262+
attributes: {
263+
ApproximateReceiveCount: "1",
264+
AWSTraceHeader: "Root=1-64cc2edd-112fbf1701d1355973a11d57;Parent=7d5a9776024b2d42;Sampled=0",
265+
SentTimestamp: "1691102943638",
266+
SenderId: "AIDAJXNJGGKNS7OSV23OI",
267+
ApproximateFirstReceiveTimestamp: "1691102943647",
268+
},
269+
messageAttributes: {},
270+
md5OfBody: "93d9f0cd8886d1e000a1a0b7007bffc4",
271+
eventSource: "aws:sqs",
272+
eventSourceARN: "arn:aws:sqs:us-east-1:425362996713:lambda-eb-sqs-lambda-dev-demo-queue",
273+
awsRegion: "us-east-1",
274+
},
275+
],
276+
});
277+
278+
expect(result).toEqual({
279+
parentID: "2644033662113726488",
280+
sampleMode: 1,
281+
source: "event",
282+
traceID: "7379586022458917877",
283+
});
284+
});
285+
254286
it("can parse an SNS message source", () => {
255287
const result = readTraceFromEvent({
256288
Records: [

src/trace/context.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
isSNSEvent,
1111
isSNSSQSEvent,
1212
isSQSEvent,
13+
isEBSQSEvent,
1314
} from "../utils/event-type-guards";
1415
import {
1516
authorizingRequestIdHeader,
@@ -309,6 +310,25 @@ export function readTraceFromSNSSQSEvent(event: SQSEvent): TraceContext | undefi
309310
}
310311
}
311312

313+
export function readTraceFromEBSQSEvent(event: SQSEvent): TraceContext | undefined {
314+
if (event?.Records?.[0]?.body) {
315+
try {
316+
const parsedBody = JSON.parse(event.Records[0].body) as EventBridgeEvent<any, any>;
317+
if (parsedBody?.detail?._datadog) {
318+
const trace = exportTraceData(parsedBody.detail._datadog);
319+
320+
logDebug(`extracted trace context from EventBridge SQS event`, { trace, event });
321+
return trace;
322+
}
323+
} catch (err) {
324+
if (err instanceof Error) {
325+
logDebug("Error parsing EventBridge SQS message trace data", err as Error);
326+
}
327+
return;
328+
}
329+
}
330+
}
331+
312332
export function readTraceFromKinesisEvent(event: KinesisStreamEvent): TraceContext | undefined {
313333
if (event?.Records?.[0]?.kinesis?.data) {
314334
try {
@@ -463,6 +483,10 @@ export function readTraceFromEvent(event: any, decodeAuthorizerContext: boolean
463483
return readTraceFromSNSSQSEvent(event);
464484
}
465485

486+
if (isEBSQSEvent(event)) {
487+
return readTraceFromEBSQSEvent(event);
488+
}
489+
466490
if (isAppSyncResolverEvent(event)) {
467491
return readTraceFromAppSyncEvent(event);
468492
}

src/trace/span-inferrer.spec.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const sqsEvent = require("../../event_samples/sqs.json");
88
const ddbEvent = require("../../event_samples/dynamodb.json");
99
const kinesisEvent = require("../../event_samples/kinesis.json");
1010
const eventBridgeEvent = require("../../event_samples/eventbridge.json");
11+
const eventBridgeSQSEvent = require("../../event_samples/eventbridge-sqs.json");
1112
const webSocketEvent = require("../../event_samples/api-gateway-wss.json");
1213
const apiGatewayV1 = require("../../event_samples/api-gateway-v1.json");
1314
const apiGatewayV2 = require("../../event_samples/api-gateway-v2.json");
@@ -568,6 +569,55 @@ describe("SpanInferrer", () => {
568569
});
569570
});
570571

572+
it("creates an inferred span for eventbridge sqs events", () => {
573+
const inferrer = new SpanInferrer(mockWrapper as unknown as TracerWrapper);
574+
inferrer.createInferredSpan(eventBridgeSQSEvent, {} as any, {} as SpanContext);
575+
576+
expect(mockWrapper.startSpan.mock.calls).toEqual([
577+
[
578+
"aws.eventbridge",
579+
{
580+
childOf: {},
581+
startTime: 1691102943000,
582+
tags: {
583+
_inferred_span: { synchronicity: "async", tag_source: "self" },
584+
operation_name: "aws.eventbridge",
585+
"peer.service": "mock-lambda-service",
586+
request_id: undefined,
587+
"resource.name": "my.Source",
588+
resource_names: "my.Source",
589+
service: "eventbridge",
590+
"span.type": "web",
591+
},
592+
},
593+
],
594+
[
595+
"aws.sqs",
596+
{
597+
childOf: undefined,
598+
startTime: 1691102943638,
599+
tags: {
600+
_inferred_span: { synchronicity: "async", tag_source: "self" },
601+
event_source_arn: "arn:aws:sqs:us-east-1:425362996713:lambda-eb-sqs-lambda-dev-demo-queue",
602+
operation_name: "aws.sqs",
603+
"peer.service": "mock-lambda-service",
604+
queuename: "lambda-eb-sqs-lambda-dev-demo-queue",
605+
receipt_handle:
606+
"AQEB4mIfRcyqtzn1X5Ss+ConhTejVGc+qnAcmu3/Z9ZvbNkaPcpuDLX/bzvPD/ZkAXJUXZcemGSJmd7L3snZHKMP2Ck8runZiyl4mubiLb444pZvdiNPuGRJ6a3FvgS/GQPzho/9nNMyOi66m8Viwh70v4EUCPGO4JmD3TTDAUrrcAnqU4WSObjfC/NAp9bI6wH2CEyAYEfex6Nxplbl/jBf9ZUG0I3m3vQd0Q4l4gd4jIR4oxQUglU2Tldl4Kx5fMUAhTRLAENri6HsY81avBkKd9FAuxONlsITB5uj02kOkvLlRGEcalqsKyPJ7AFaDLrOLaL3U+yReroPEJ5R5nwhLOEbeN5HROlZRXeaAwZOIN8BjqdeooYTIOrtvMEVb7a6OPLMdH1XB+ddevtKAH8K9Tm2ZjpaA7dtBGh1zFVHzBk=",
607+
request_id: undefined,
608+
"resource.name": "lambda-eb-sqs-lambda-dev-demo-queue",
609+
resource_names: "lambda-eb-sqs-lambda-dev-demo-queue",
610+
retry_count: 1,
611+
sender_id: "AIDAJXNJGGKNS7OSV23OI",
612+
service: "sqs",
613+
"service.name": "lambda-eb-sqs-lambda-dev-demo-queue",
614+
"span.type": "web",
615+
},
616+
},
617+
],
618+
]);
619+
});
620+
571621
it("creates an inferred span for websocket events", () => {
572622
const inferrer = new SpanInferrer(mockWrapper as unknown as TracerWrapper);
573623
inferrer.createInferredSpan(webSocketEvent, {} as any, {} as SpanContext);

src/trace/span-inferrer.ts

Lines changed: 28 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -280,57 +280,26 @@ export class SpanInferrer {
280280
parentSpanContext: SpanContext | undefined,
281281
): SpanWrapper {
282282
const options: SpanOptions = {};
283-
const { Records } = event as SNSEvent;
284-
const referenceRecord = Records[0];
285-
const {
286-
EventSubscriptionArn,
287-
Sns: { TopicArn, Timestamp, Type, Subject, MessageId },
288-
} = referenceRecord;
289-
const topicName = TopicArn?.split(":").pop() || "";
290-
const resourceName = topicName;
291-
const serviceName = SpanInferrer.determineServiceName(topicName, "lambda_sns", "sns");
292-
options.tags = {
293-
operation_name: "aws.sns",
294-
resource_names: resourceName,
295-
request_id: context?.awsRequestId,
296-
"span.type": "sns",
297-
"resource.name": resourceName,
298-
"peer.service": this.service,
299-
service: serviceName,
300-
_inferred_span: {
301-
tag_source: "self",
302-
synchronicity: "async",
303-
},
304-
type: Type,
305-
subject: Subject,
306-
message_id: MessageId,
307-
topicname: topicName,
308-
topic_arn: TopicArn,
309-
event_subscription_arn: EventSubscriptionArn,
310-
};
311-
if (parentSpanContext) {
312-
options.childOf = parentSpanContext;
283+
284+
let referenceRecord: SNSMessage;
285+
let eventSubscriptionArn = "";
286+
if (event.Records) {
287+
// Full SNS Event into Lambda
288+
const { Records } = event as SNSEvent;
289+
({ Sns: referenceRecord, EventSubscriptionArn: eventSubscriptionArn } = Records[0]);
290+
} else {
291+
// SNS message wrapping an SQS message
292+
referenceRecord = event;
313293
}
314-
options.startTime = Date.parse(Timestamp);
315-
const spanWrapperOptions = {
316-
isAsync: true,
317-
};
318-
return new SpanWrapper(this.traceWrapper.startSpan("aws.sns", options), spanWrapperOptions);
319-
}
294+
const { TopicArn, Timestamp, Type, Subject, MessageId } = referenceRecord;
320295

321-
createInferredSpanForSqsSns(
322-
event: SNSMessage,
323-
context: Context | undefined,
324-
parentSpanContext: SpanContext | undefined,
325-
): SpanWrapper {
326-
const options: SpanOptions = {};
327-
const { TopicArn, Timestamp, Type, Subject, MessageId } = event;
328296
const topicName = TopicArn?.split(":").pop() || "";
329297
const resourceName = topicName;
330298
const serviceName = SpanInferrer.determineServiceName(topicName, "lambda_sns", "sns");
331299
options.tags = {
332300
operation_name: "aws.sns",
333301
resource_names: resourceName,
302+
request_id: context?.awsRequestId,
334303
"span.type": "sns",
335304
"resource.name": resourceName,
336305
"peer.service": this.service,
@@ -345,6 +314,11 @@ export class SpanInferrer {
345314
topicname: topicName,
346315
topic_arn: TopicArn,
347316
};
317+
318+
// EventSubscriptionARN not available for direct integrations to SQS from SNS.
319+
if (eventSubscriptionArn !== "") {
320+
options.tags.event_subscription_arn = eventSubscriptionArn;
321+
}
348322
if (parentSpanContext) {
349323
options.childOf = parentSpanContext;
350324
}
@@ -396,18 +370,21 @@ export class SpanInferrer {
396370
// Check if sqs message was from sns
397371
// If so, unpack and look at timestamp
398372
// create further upstream sns span and finish/attach it here
399-
let upstreamSnsSpan: SpanWrapper | null = null;
373+
let upstreamSpan: SpanWrapper | null = null;
400374
try {
401-
let upstreamSnsMessage: SNSMessage;
402-
upstreamSnsMessage = JSON.parse(body);
403-
if (upstreamSnsMessage && upstreamSnsMessage.TopicArn && upstreamSnsMessage.Timestamp) {
404-
upstreamSnsSpan = this.createInferredSpanForSqsSns(upstreamSnsMessage, context, parentSpanContext);
405-
upstreamSnsSpan.finish(Number(SentTimestamp));
375+
let upstreamMessage: any;
376+
upstreamMessage = JSON.parse(body);
377+
if (upstreamMessage && upstreamMessage.TopicArn && upstreamMessage.Timestamp) {
378+
upstreamSpan = this.createInferredSpanForSns(upstreamMessage, context, parentSpanContext);
379+
upstreamSpan.finish(Number(SentTimestamp));
380+
} else if (upstreamMessage?.detail?._datadog) {
381+
upstreamSpan = this.createInferredSpanForEventBridge(upstreamMessage, context, parentSpanContext);
382+
upstreamSpan.finish(Number(SentTimestamp));
406383
}
407384
} catch (e) {
408385
// Pass, it's a raw SQS message
409386
}
410-
options.childOf = upstreamSnsSpan ? upstreamSnsSpan.span : parentSpanContext;
387+
options.childOf = upstreamSpan ? upstreamSpan.span : parentSpanContext;
411388

412389
options.startTime = Number(SentTimestamp);
413390

@@ -512,7 +489,7 @@ export class SpanInferrer {
512489
}
513490

514491
createInferredSpanForEventBridge(
515-
event: any,
492+
event: EventBridgeEvent<any, any>,
516493
context: Context | undefined,
517494
parentSpanContext: SpanContext | undefined,
518495
): SpanWrapper {

src/utils/event-type-guards.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,18 @@ export function isSNSSQSEvent(event: any): event is SQSEvent {
8383
return false;
8484
}
8585

86+
export function isEBSQSEvent(event: any): event is SQSEvent {
87+
if (Array.isArray(event.Records) && event.Records.length > 0 && event.Records[0].eventSource === "aws:sqs") {
88+
try {
89+
const body = JSON.parse(event.Records[0].body) as EventBridgeEvent<any, any>;
90+
return body["detail-type"] !== undefined;
91+
} catch (e) {
92+
return false;
93+
}
94+
}
95+
return false;
96+
}
97+
8698
export function isAppSyncResolverEvent(event: any): event is AppSyncResolverEvent<any> {
8799
return event.info !== undefined && event.info.selectionSetGraphQL !== undefined;
88100
}

0 commit comments

Comments
 (0)