Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static datadog.trace.instrumentation.aws.v2.eventbridge.TextMapInjectAdapter.SETTER;

import datadog.context.Context;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.api.datastreams.PathwayContext;
Expand Down Expand Up @@ -34,7 +35,8 @@ public class EventBridgeInterceptor implements ExecutionInterceptor {

@Override
public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes executionAttributes) {
if (!(context.request() instanceof PutEventsRequest)) {
if (!(context.request() instanceof PutEventsRequest)
|| !Config.get().isEventbridgeInjectDatadogAttributeEnabled()) {
return context.request();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import datadog.trace.agent.test.InstrumentationSpecification
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.config.GeneralConfig
import groovy.json.JsonSlurper
import java.time.Duration
import java.util.concurrent.CompletableFuture
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
Expand All @@ -17,9 +19,6 @@ import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import spock.lang.Shared

import java.time.Duration
import java.util.concurrent.CompletableFuture

class EventBridgeClientTest extends InstrumentationSpecification {
static final LOCALSTACK = new GenericContainer(DockerImageName.parse("localstack/localstack:4.2.0"))
.withExposedPorts(4566)
Expand Down Expand Up @@ -495,4 +494,31 @@ class EventBridgeClientTest extends InstrumentationSpecification {
'tracestate'
]
}

def "datadog context is not injected when eventbridgeInjectDatadogAttribute is disabled"() {
setup:
injectSysConfig("eventbridge.inject.datadog.attribute.enabled", "false")

when:
TEST_WRITER.clear()
eventBridgeClient.putEvents { req ->
req.entries(
PutEventsRequestEntry.builder()
.source("com.example")
.detailType("test-no-inject")
.detail('{"message":"no-inject"}')
.eventBusName(testBusARN)
.build()
)
}

def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0)
def messageBody = new JsonSlurper().parseText(message.body())

then:
def detail = messageBody["detail"]
assert detail instanceof Map
assert detail["message"] == "no-inject"
assert detail["_datadog"] == null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static datadog.trace.bootstrap.instrumentation.api.AgentSpan.fromContext;

import datadog.context.Context;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.InstanceStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import software.amazon.awssdk.core.SdkRequest;
Expand All @@ -23,10 +24,14 @@ public SfnInterceptor() {}

@Override
public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes executionAttributes) {
SdkRequest request = context.request();
if (!Config.get().isSfnInjectDatadogAttributeEnabled()) {
return request;
}
try {
return modifyRequestImpl(context, executionAttributes);
} catch (Exception e) {
return context.request();
return request;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan

import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.agent.test.utils.TraceUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.instrumentation.api.Tags
import groovy.json.JsonSlurper
import java.time.Duration
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sfn.SfnClient
import software.amazon.awssdk.services.sfn.model.SfnException
import software.amazon.awssdk.services.sfn.model.StartExecutionResponse
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import spock.lang.Shared

import java.time.Duration

import static datadog.trace.agent.test.utils.TraceUtils.basicSpan


abstract class SfnClientTest extends VersionedNamingTestBase {
@Shared GenericContainer localStack
@Shared SfnClient sfnClient
Expand Down Expand Up @@ -117,6 +115,28 @@ abstract class SfnClientTest extends VersionedNamingTestBase {
input["_datadog"]["x-datadog-tags"] != null
}

def "datadog context is not injected when SfnInjectDatadogAttribute is disabled"() {
setup:
injectSysConfig("sfn.inject.datadog.attribute.enabled", "false")

when:
StartExecutionResponse response = sfnClient.startExecution { builder ->
builder.stateMachineArn(testStateMachineARN)
.input("{\"key\": \"value\"}")
.build()
}

then:
def execution = sfnClient.describeExecution { builder ->
builder.executionArn(response.executionArn())
.build()
}

def input = new JsonSlurper().parseText(execution.input())
assert input["key"] == "value"
assert input["_datadog"] == null
}

def "AWS rejects invalid JSON but instrumentation does not error"() {
when:
sfnClient.startExecution { b ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.amazonaws.services.sns.model.PublishBatchRequestEntry;
import com.amazonaws.services.sns.model.PublishRequest;
import datadog.context.Context;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.bootstrap.ContextStore;
Expand Down Expand Up @@ -49,6 +50,9 @@ private ByteBuffer getMessageAttributeValueToInject(

@Override
public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request) {
if (!Config.get().isSnsInjectDatadogAttributeEnabled()) {
return request;
}
// Injecting the trace context into SNS messageAttributes.
if (request instanceof PublishRequest) {
PublishRequest pRequest = (PublishRequest) request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import datadog.trace.api.config.GeneralConfig
import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.core.datastreams.StatsGroup
import groovy.json.JsonSlurper
import java.time.Duration
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
Expand All @@ -22,9 +23,6 @@ import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import spock.lang.Shared

import java.time.Duration


abstract class SnsClientTest extends VersionedNamingTestBase {

static final LOCALSTACK = new GenericContainer(DockerImageName.parse("localstack/localstack:4.2.0"))
Expand Down Expand Up @@ -210,6 +208,26 @@ abstract class SnsClientTest extends VersionedNamingTestBase {
!traceContextInJson['dd-pathway-ctx-base64'].toString().isBlank()
}


def "datadog context is not injected when SnsInjectDatadogAttribute is disabled"() {
setup:
TEST_WRITER.clear()
injectSysConfig("sns.inject.datadog.attribute.enabled", "false")

when:
snsClient.publish(testTopicARN, 'sometext')

def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0)
def jsonSlurper = new JsonSlurper()
def messageBody = jsonSlurper.parseText(message.body())
if (isDataStreamsEnabled()) {
TEST_DATA_STREAMS_WRITER.waitForGroups(1)
}
then:
assert messageBody["Message"] == "sometext"
assert messageBody["MessageAttributes"] == null
}

def "SNS message to phone number doesn't leak exception"() {
when:
snsClient.publish(new PublishRequest().withPhoneNumber("+19995550123").withMessage('sometext'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static datadog.trace.instrumentation.aws.v2.sns.TextMapInjectAdapter.SETTER;

import datadog.context.Context;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.bootstrap.InstanceStore;
Expand Down Expand Up @@ -50,6 +51,9 @@ public SnsInterceptor() {}

@Override
public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes executionAttributes) {
if (!Config.get().isSnsInjectDatadogAttributeEnabled()) {
return context.request();
}
// Injecting the trace context into SNS messageAttributes.
if (context.request() instanceof PublishRequest) {
PublishRequest request = (PublishRequest) context.request();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,25 @@ abstract class SnsClientTest extends VersionedNamingTestBase {
!traceContextInJson['dd-pathway-ctx-base64'].toString().isBlank()
}

def "datadog context is not injected when SnsInjectDatadogAttribute is disabled"() {
setup:
TEST_WRITER.clear()
injectSysConfig("sns.inject.datadog.attribute.enabled", "false")

when:
snsClient.publish { it.message("sometext").topicArn(testTopicARN)}

def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0)
def jsonSlurper = new JsonSlurper()
def messageBody = jsonSlurper.parseText(message.body())
if (isDataStreamsEnabled()) {
TEST_DATA_STREAMS_WRITER.waitForGroups(1)
}
then:
assert messageBody["Message"] == "sometext"
assert messageBody["MessageAttributes"] == null
}

def "SNS message to phone number doesn't leak exception"() {
when:
snsClient.publish { it.message("sometext").phoneNumber("+19995550123") }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.amazonaws.services.sqs.model.MessageAttributeValue;
import datadog.context.propagation.CarrierSetter;
import datadog.trace.api.Config;
import java.util.Map;

public class MessageAttributeInjector implements CarrierSetter<Map<String, MessageAttributeValue>> {
Expand All @@ -13,7 +14,9 @@ public class MessageAttributeInjector implements CarrierSetter<Map<String, Messa
@Override
public void set(
final Map<String, MessageAttributeValue> carrier, final String key, final String value) {
if (carrier.size() < 10 && !carrier.containsKey(DATADOG_KEY)) {
if (carrier.size() < 10
&& !carrier.containsKey(DATADOG_KEY)
&& Config.get().isSqsInjectDatadogAttributeEnabled()) {
String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value);
carrier.put(
DATADOG_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datadog.context.Context;
import datadog.context.propagation.Propagator;
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.bootstrap.ContextStore;
Expand Down Expand Up @@ -69,7 +70,8 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request
}
} else if (request instanceof ReceiveMessageRequest) {
ReceiveMessageRequest rmRequest = (ReceiveMessageRequest) request;
if (rmRequest.getMessageAttributeNames().size() < 10
if (Config.get().isSqsInjectDatadogAttributeEnabled()
&& rmRequest.getMessageAttributeNames().size() < 10
&& !rmRequest.getMessageAttributeNames().contains(DATADOG_KEY)) {
List<String> attributeNames = new ArrayList<>(rmRequest.getMessageAttributeNames());
attributeNames.add(DATADOG_KEY);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static java.nio.charset.StandardCharsets.UTF_8

import com.amazon.sqs.javamessaging.ProviderConfiguration
import com.amazon.sqs.javamessaging.SQSConnectionFactory
import com.amazonaws.SDKGlobalConfiguration
Expand All @@ -23,17 +26,13 @@ import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags
import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.core.datastreams.StatsGroup
import datadog.trace.instrumentation.aws.v1.sqs.TracingList
import java.nio.ByteBuffer
import java.nio.charset.Charset
import javax.jms.Session
import org.elasticmq.rest.sqs.SQSRestServerBuilder
import spock.lang.IgnoreIf
import spock.lang.Shared

import javax.jms.Session
import java.nio.ByteBuffer
import java.nio.charset.Charset

import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static java.nio.charset.StandardCharsets.UTF_8

abstract class SqsClientTest extends VersionedNamingTestBase {

def setup() {
Expand Down Expand Up @@ -189,6 +188,31 @@ abstract class SqsClientTest extends VersionedNamingTestBase {
client.shutdown()
}

def "dadatog context is not injected if SqsInjectDatadogAttribute is disabled"() {
setup:
injectSysConfig("sqs.inject.datadog.attribute.enabled", "false")
def client = AmazonSQSClientBuilder.standard()
.withEndpointConfiguration(endpoint)
.withCredentials(credentialsProvider)
.build()
def queueUrl = client.createQueue('somequeue').queueUrl
TEST_WRITER.clear()

when:
client.sendMessage(queueUrl, 'sometext')
def messages = client.receiveMessage(queueUrl).messages

if (isDataStreamsEnabled()) {
TEST_DATA_STREAMS_WRITER.waitForGroups(1)
}

then:
assert !messages[0].messageAttributes.containsKey("_datadog")

cleanup:
client.shutdown()
}

@IgnoreIf({ !instance.isDataStreamsEnabled() })
def "propagation even when message attributes are readonly"() {
setup:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static datadog.trace.api.datastreams.PathwayContext.DATADOG_KEY;

import datadog.context.propagation.CarrierSetter;
import datadog.trace.api.Config;
import java.util.Map;
import javax.annotation.ParametersAreNonnullByDefault;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
Expand All @@ -15,7 +16,10 @@ public class MessageAttributeInjector implements CarrierSetter<Map<String, Messa
@Override
public void set(
final Map<String, MessageAttributeValue> carrier, final String key, final String value) {
if (carrier.size() < 10 && !carrier.containsKey(DATADOG_KEY)) {
if (carrier.size() < 10
&& !carrier.containsKey(DATADOG_KEY)
&& Config.get().isSqsInjectDatadogAttributeEnabled()) {

String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value);
carrier.put(
DATADOG_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import datadog.context.Context;
import datadog.context.propagation.Propagator;
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.DataStreamsTags;
import datadog.trace.bootstrap.InstanceStore;
Expand Down Expand Up @@ -77,7 +78,8 @@ public SdkRequest modifyRequest(ModifyRequest context, ExecutionAttributes execu
} else if (context.request() instanceof ReceiveMessageRequest) {
ReceiveMessageRequest request = (ReceiveMessageRequest) context.request();
if (request.messageAttributeNames().size() < 10
&& !request.messageAttributeNames().contains(DATADOG_KEY)) {
&& !request.messageAttributeNames().contains(DATADOG_KEY)
&& Config.get().isSqsInjectDatadogAttributeEnabled()) {
List<String> messageAttributeNames = new ArrayList<>(request.messageAttributeNames());
messageAttributeNames.add(DATADOG_KEY);
return request.toBuilder().messageAttributeNames(messageAttributeNames).build();
Expand Down
Loading