fix: context propagation fifo #1530
Open
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
📢 Type of change
📜 Description
Fixes trace context propagation for FIFO queues when using
SqsTemplate.sendAsync(). The issue occurred on the first call to a FIFO queue where trace headers (traceparent, tracestate, etc.) were not propagated due to observation creation happening on the wrong thread after an async operation.💡 Motivation and Context
When using
SqsTemplate.sendAsync()to send messages to a FIFO queue (.fifo), the trace context (traceId/spanId from Micrometer Observation or distributed tracing) was not propagated on the first call, but worked correctly on subsequent calls when queue attributes were cached.Observed Behavior
Root Cause Analysis
Since Spring Cloud AWS 3.0.1 (PR #799),
SqsTemplateautomatically detects whether a FIFO queue hasContentBasedDeduplicationenabled. This detection is asynchronous and involves calling the AWS SQS API to fetch queue attributes.The Bug
The observation (which captures trace context and adds trace headers) was created in
AbstractMessagingTemplate.observeAndSendAsync(), which is called in athenComposecallback:Why It Failed on First Call But Worked on Second Call
CompletableFuture.thenCompose()behavior:sdk-async-*), so thethenComposecallback runs on that thread. The observation is created on the SDK thread, which doesn't have the calling thread's trace context.thenComposeruns synchronously on the calling thread, which has the correct trace context.Move observation creation and trace header capture to before the async preprocessing, ensuring it always happens on the calling thread with the correct trace context:
Key Changes
File:
spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/AbstractMessagingTemplate.javastartObservation()) to execute beforepreProcessMessageForSendAsync()doSendAndCompleteObservation()helper method that sends and completes an existing observation (rather than creating a new one)observeAndSendAsync()💚 How did you test it?
sendAsync_toFifoQueue_shouldPropagateObservationScopeOnFirstCallwill fail.📝 Checklist
🔮 Next steps