Skip to content

Commit 15cd5b6

Browse files
feat: add span pointer attributes for websocket context propagation (#15153)
## Description <!-- Provide an overview of the change and motivation for the change --> From [RFC](https://docs.google.com/document/d/1RhIoUJhY7nawH9pkdbtbNy_G-7P02IR76Ka-SyJQ4f8/edit?tab=t.0): > There is no direct context extraction on the incoming messages traced since they already hold a relationship (a link) to the trace where the context propagation happens (the handshake). The same applies to the send message spans (for context injection). This PR implements span pointers to connect outgoing and incoming messages over websocket. Span pointer attributes: - link.name: span-pointer-down (if outgoing) / span-pointer-up (if incoming) - dd.kind: span-pointer - ptr.kind: websocket - ptr.dir: d (if outgoing) / u (if incoming) - ptr.hash: S<128 bit hex handshake trace id><64 bit hex handshake parent id><32 bit hex counter> (if outgoing on server or incoming on client) / C<128 bit hex handshake trace id><64 bit hex handshake parent id><32 bit hex counter> (if outgoing on client or incoming on server) ## Testing <!-- Describe your testing strategy or note what tests are included --> See `test_websocket_context_propagation` generated snapshot file `test_websocket_context_propagation` flame graph: `websocket.receive` parent and `websocket.send` / `websocket.close` <img width="929" height="204" alt="Screenshot 2025-11-10 at 3 28 26 PM" src="https://github.com/user-attachments/assets/4c2b6656-c38c-4953-b569-ece83ed1db72" /> <img width="929" height="247" alt="Screenshot 2025-11-10 at 3 28 58 PM" src="https://github.com/user-attachments/assets/f4cada27-e508-4444-9341-267a37ecac5a" /> ## Risks <!-- Note any risks associated with this change, or "None" if no risks --> ## Additional Notes <!-- Any other information that would be helpful for reviewers --> --------- Co-authored-by: Brett Langdon <brett.langdon@datadoghq.com>
1 parent 3fc3fac commit 15cd5b6

File tree

5 files changed

+226
-87
lines changed

5 files changed

+226
-87
lines changed

ddtrace/_trace/_span_pointer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ class _SpanPointerDirection(Enum):
2424
DOWNSTREAM = "d"
2525

2626

27+
class _SpanPointerDirectionName(Enum):
28+
UPSTREAM = "span-pointer-up"
29+
DOWNSTREAM = "span-pointer-down"
30+
31+
2732
class _SpanPointerDescription(NamedTuple):
2833
# Not to be confused with _SpanPointer. This class describes the parameters
2934
# required to attach a span pointer to a Span. It lets us decouple code

ddtrace/_trace/trace_handlers.py

Lines changed: 145 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
import ddtrace
1616
from ddtrace import config
1717
from ddtrace._trace._inferred_proxy import create_inferred_proxy_span_if_headers_exist
18+
from ddtrace._trace._span_link import SpanLinkKind as _SpanLinkKind
1819
from ddtrace._trace._span_pointer import _SpanPointerDescription
20+
from ddtrace._trace._span_pointer import _SpanPointerDirection
21+
from ddtrace._trace._span_pointer import _SpanPointerDirectionName
1922
from ddtrace._trace.span import Span
2023
from ddtrace._trace.utils import extract_DD_context_from_messages
2124
from ddtrace.constants import _SPAN_MEASURED_KEY
@@ -31,6 +34,7 @@
3134
from ddtrace.contrib.internal.trace_utils import _set_url_tag
3235
from ddtrace.ext import SpanKind
3336
from ddtrace.ext import SpanLinkKind
37+
from ddtrace.ext import SpanTypes
3438
from ddtrace.ext import db
3539
from ddtrace.ext import http
3640
from ddtrace.ext import net
@@ -43,6 +47,7 @@
4347
from ddtrace.internal.constants import FLASK_ENDPOINT
4448
from ddtrace.internal.constants import FLASK_URL_RULE
4549
from ddtrace.internal.constants import FLASK_VIEW_ARGS
50+
from ddtrace.internal.constants import HTTP_REQUEST_UPGRADED
4651
from ddtrace.internal.constants import MESSAGING_BATCH_COUNT
4752
from ddtrace.internal.constants import MESSAGING_DESTINATION_NAME
4853
from ddtrace.internal.constants import MESSAGING_MESSAGE_ID
@@ -57,6 +62,9 @@
5762

5863
log = get_logger(__name__)
5964

65+
_WEBSOCKET_LINK_ATTRS_EXECUTED = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED}
66+
_WEBSOCKET_LINK_ATTRS_RESUMING = {SPAN_LINK_KIND: SpanLinkKind.RESUMING}
67+
6068

6169
class _TracedIterable(wrapt.ObjectProxy):
6270
def __init__(self, wrapped, span, parent_span, wrapped_is_iterator=False):
@@ -992,12 +1000,109 @@ def _set_client_ip_tags(scope: Mapping[str, Any], span: Span):
9921000
log.debug("Could not validate client IP address for websocket send message: %s", str(e))
9931001

9941002

1003+
def _init_websocket_message_counters(scope: Dict[str, Any]) -> None:
1004+
if "datadog" not in scope:
1005+
scope["datadog"] = {}
1006+
if "websocket_receive_counter" not in scope["datadog"]:
1007+
scope["datadog"]["websocket_receive_counter"] = 0
1008+
if "websocket_send_counter" not in scope["datadog"]:
1009+
scope["datadog"]["websocket_send_counter"] = 0
1010+
1011+
1012+
def _increment_websocket_counter(scope: Dict[str, Any], counter_type: str) -> int:
1013+
"""
1014+
Increment and return websocket message counter (either websocket_receive_counter or websocket_send_counter)
1015+
"""
1016+
scope["datadog"][counter_type] += 1
1017+
return scope["datadog"][counter_type]
1018+
1019+
1020+
def _build_websocket_span_pointer_hash(
1021+
handshake_trace_id: int,
1022+
handshake_span_id: int,
1023+
counter: int,
1024+
is_server: bool,
1025+
is_incoming: bool,
1026+
) -> str:
1027+
"""
1028+
Build websocket span pointer hash.
1029+
1030+
Format: <prefix><128 bit hex trace id><64 bit hex span id><32 bit hex counter>
1031+
Prefix: 'S' for server outgoing or client incoming, 'C' for server incoming or client outgoing
1032+
"""
1033+
if (is_server and not is_incoming) or (not is_server and is_incoming):
1034+
prefix = "S"
1035+
else:
1036+
prefix = "C"
1037+
1038+
trace_id_hex = f"{handshake_trace_id:032x}"
1039+
span_id_hex = f"{handshake_span_id:016x}"
1040+
counter_hex = f"{counter:08x}"
1041+
1042+
return f"{prefix}{trace_id_hex}{span_id_hex}{counter_hex}"
1043+
1044+
1045+
def _has_distributed_tracing_context(span: Span) -> bool:
1046+
"""
1047+
Check if the handshake span has extracted distributed tracing context.
1048+
1049+
A websocket server must not set the span pointer if the handshake has not extracted a context
1050+
1051+
A span has distributed tracing context if it has a parent context that was
1052+
extracted from headers.
1053+
"""
1054+
if not span or not span._parent_context:
1055+
return False
1056+
return span._parent_context._is_remote
1057+
1058+
1059+
def _add_websocket_span_pointer_attributes(
1060+
link_attributes: Dict[str, Any],
1061+
integration_config: Any,
1062+
handshake_span: Span,
1063+
scope: Dict[str, Any],
1064+
is_incoming: bool,
1065+
) -> None:
1066+
"""
1067+
Add span pointer attributes to link_attributes for websocket message correlation.
1068+
"""
1069+
1070+
if not integration_config.distributed_tracing or not _has_distributed_tracing_context(handshake_span):
1071+
return
1072+
1073+
# Increment counter based on message direction
1074+
counter_type = "websocket_receive_counter" if is_incoming else "websocket_send_counter"
1075+
counter = _increment_websocket_counter(scope, counter_type)
1076+
1077+
ptr_hash = _build_websocket_span_pointer_hash(
1078+
handshake_trace_id=handshake_span.trace_id,
1079+
handshake_span_id=handshake_span.span_id,
1080+
counter=counter,
1081+
is_server=True,
1082+
is_incoming=is_incoming,
1083+
)
1084+
1085+
if is_incoming:
1086+
link_name = _SpanPointerDirectionName.UPSTREAM
1087+
ptr_direction = _SpanPointerDirection.UPSTREAM
1088+
else:
1089+
link_name = _SpanPointerDirectionName.DOWNSTREAM
1090+
ptr_direction = _SpanPointerDirection.DOWNSTREAM
1091+
1092+
link_attributes.update(
1093+
{
1094+
"link.name": link_name,
1095+
"dd.kind": _SpanLinkKind.SPAN_POINTER.value,
1096+
"ptr.kind": SpanTypes.WEBSOCKET,
1097+
"ptr.dir": ptr_direction,
1098+
"ptr.hash": ptr_hash,
1099+
}
1100+
)
1101+
1102+
9951103
def _on_asgi_websocket_receive_message(ctx, scope, message):
9961104
"""
9971105
Handle websocket receive message events.
998-
999-
This handler is called when a websocket receive message event is dispatched.
1000-
It sets up the span with appropriate tags, metrics, and links.
10011106
"""
10021107
span = ctx.span
10031108
integration_config = ctx.get_item("integration_config")
@@ -1011,24 +1116,24 @@ def _on_asgi_websocket_receive_message(ctx, scope, message):
10111116
span.set_metric(websocket.MESSAGE_FRAMES, 1)
10121117

10131118
if hasattr(ctx, "parent") and ctx.parent.span:
1014-
span.set_link(
1015-
trace_id=ctx.parent.span.trace_id,
1016-
span_id=ctx.parent.span.span_id,
1017-
attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED},
1119+
handshake_span = ctx.parent.span
1120+
link_attributes = _WEBSOCKET_LINK_ATTRS_EXECUTED.copy()
1121+
1122+
_add_websocket_span_pointer_attributes(
1123+
link_attributes, integration_config, handshake_span, scope, is_incoming=True
10181124
)
10191125

1126+
span.link_span(handshake_span.context, link_attributes)
1127+
10201128
if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True):
1021-
_inherit_sampling_tags(span, ctx.parent.span._local_root)
1129+
_inherit_sampling_tags(span, handshake_span._local_root)
10221130

1023-
_copy_trace_level_tags(span, ctx.parent.span)
1131+
_copy_trace_level_tags(span, handshake_span)
10241132

10251133

10261134
def _on_asgi_websocket_send_message(ctx, scope, message):
10271135
"""
10281136
Handle websocket send message events.
1029-
1030-
This handler is called when a websocket send message event is dispatched.
1031-
It sets up the span with appropriate tags, metrics, and links.
10321137
"""
10331138
span = ctx.span
10341139
integration_config = ctx.get_item("integration_config")
@@ -1041,19 +1146,19 @@ def _on_asgi_websocket_send_message(ctx, scope, message):
10411146
span.set_metric(websocket.MESSAGE_FRAMES, 1)
10421147

10431148
if hasattr(ctx, "parent") and ctx.parent.span:
1044-
span.set_link(
1045-
trace_id=ctx.parent.span.trace_id,
1046-
span_id=ctx.parent.span.span_id,
1047-
attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING},
1149+
handshake_span = ctx.parent.span
1150+
link_attributes = _WEBSOCKET_LINK_ATTRS_RESUMING.copy()
1151+
1152+
_add_websocket_span_pointer_attributes(
1153+
link_attributes, integration_config, handshake_span, scope, is_incoming=False
10481154
)
10491155

1156+
span.link_span(handshake_span.context, link_attributes)
1157+
10501158

10511159
def _on_asgi_websocket_close_message(ctx, scope, message):
10521160
"""
10531161
Handle websocket close message events.
1054-
1055-
This handler is called when a websocket close message event is dispatched.
1056-
It sets up the span with appropriate tags, metrics, and links.
10571162
"""
10581163
span = ctx.span
10591164
integration_config = ctx.get_item("integration_config")
@@ -1068,21 +1173,21 @@ def _on_asgi_websocket_close_message(ctx, scope, message):
10681173
_set_websocket_close_tags(span, message)
10691174

10701175
if hasattr(ctx, "parent") and ctx.parent.span:
1071-
span.set_link(
1072-
trace_id=ctx.parent.span.trace_id,
1073-
span_id=ctx.parent.span.span_id,
1074-
attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING},
1176+
handshake_span = ctx.parent.span
1177+
link_attributes = _WEBSOCKET_LINK_ATTRS_RESUMING.copy()
1178+
1179+
_add_websocket_span_pointer_attributes(
1180+
link_attributes, integration_config, handshake_span, scope, is_incoming=False
10751181
)
10761182

1077-
_copy_trace_level_tags(span, ctx.parent.span)
1183+
span.link_span(handshake_span.context, link_attributes)
1184+
1185+
_copy_trace_level_tags(span, handshake_span)
10781186

10791187

10801188
def _on_asgi_websocket_disconnect_message(ctx, scope, message):
10811189
"""
10821190
Handle websocket disconnect message events.
1083-
1084-
This handler is called when a websocket disconnect message event is dispatched.
1085-
It sets up the span with appropriate tags, metrics, and links.
10861191
"""
10871192
span = ctx.span
10881193
integration_config = ctx.get_item("integration_config")
@@ -1093,16 +1198,19 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message):
10931198
_set_websocket_close_tags(span, message)
10941199

10951200
if hasattr(ctx, "parent") and ctx.parent.span:
1096-
span.set_link(
1097-
trace_id=ctx.parent_span.trace_id,
1098-
span_id=ctx.parent_span.span_id,
1099-
attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED},
1201+
handshake_span = ctx.parent.span
1202+
link_attributes = _WEBSOCKET_LINK_ATTRS_EXECUTED.copy()
1203+
1204+
_add_websocket_span_pointer_attributes(
1205+
link_attributes, integration_config, handshake_span, scope, is_incoming=True
11001206
)
11011207

1208+
span.link_span(handshake_span.context, link_attributes)
1209+
11021210
if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True):
1103-
_inherit_sampling_tags(span, ctx.parent.span._local_root)
1211+
_inherit_sampling_tags(span, handshake_span._local_root)
11041212

1105-
_copy_trace_level_tags(span, ctx.parent.span)
1213+
_copy_trace_level_tags(span, handshake_span)
11061214

11071215

11081216
def _on_asgi_request(ctx: core.ExecutionContext) -> None:
@@ -1115,14 +1223,15 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None:
11151223
span = _start_span(ctx)
11161224
ctx.set_item("req_span", span)
11171225

1118-
if scope["type"] == "websocket":
1119-
span._set_tag_str("http.upgraded", "websocket")
1120-
11211226
if "datadog" not in scope:
11221227
scope["datadog"] = {"request_spans": [span]}
11231228
else:
11241229
scope["datadog"]["request_spans"].append(span)
11251230

1231+
if scope["type"] == "websocket":
1232+
span._set_tag_str(HTTP_REQUEST_UPGRADED, SpanTypes.WEBSOCKET)
1233+
_init_websocket_message_counters(scope)
1234+
11261235

11271236
def listen():
11281237
core.on("wsgi.request.prepare", _on_request_prepare)

ddtrace/internal/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
HTTP_REQUEST_HEADER = "http.request.header"
6868
HTTP_REQUEST_PARAMETER = "http.request.parameter"
6969
HTTP_REQUEST_BODY = "http.request.body"
70+
HTTP_REQUEST_UPGRADED = "http.upgraded"
7071
HTTP_REQUEST_PATH_PARAMETER = "http.request.path.parameter"
7172
REQUEST_PATH_PARAMS = "http.request.path_params"
7273
STATUS_403_TYPE_AUTO = {"status_code": 403, "type": "auto"}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
features:
3+
- |
4+
feat(asgi): Enable context propagation between websocket message spans.

0 commit comments

Comments
 (0)