Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 154 additions & 32 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import ddtrace
from ddtrace import config
from ddtrace._trace._inferred_proxy import create_inferred_proxy_span_if_headers_exist
from ddtrace._trace._span_link import SpanLinkKind as _SpanLinkKind
from ddtrace._trace._span_pointer import _SpanPointerDescription
from ddtrace._trace._span_pointer import _SpanPointerDirection
from ddtrace._trace.span import Span
from ddtrace._trace.utils import extract_DD_context_from_messages
from ddtrace.constants import _SPAN_MEASURED_KEY
Expand All @@ -31,6 +33,7 @@
from ddtrace.contrib.internal.trace_utils import _set_url_tag
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanLinkKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import db
from ddtrace.ext import http
from ddtrace.ext import net
Expand All @@ -49,6 +52,8 @@
from ddtrace.internal.constants import MESSAGING_OPERATION
from ddtrace.internal.constants import MESSAGING_SYSTEM
from ddtrace.internal.constants import SPAN_LINK_KIND
from ddtrace.internal.constants import SPAN_POINTER_DOWN_DIRECTION
from ddtrace.internal.constants import SPAN_POINTER_UP_DIRECTION
from ddtrace.internal.logger import get_logger
from ddtrace.internal.sampling import _inherit_sampling_tags
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
Expand Down Expand Up @@ -992,12 +997,109 @@ def _set_client_ip_tags(scope: Mapping[str, Any], span: Span):
log.debug("Could not validate client IP address for websocket send message: %s", str(e))


def _init_websocket_message_counters(scope: Dict[str, Any]) -> None:
if "datadog" not in scope:
scope["datadog"] = {}
if "websocket_receive_counter" not in scope["datadog"]:
scope["datadog"]["websocket_receive_counter"] = 0
if "websocket_send_counter" not in scope["datadog"]:
scope["datadog"]["websocket_send_counter"] = 0


def _increment_websocket_counter(scope: Dict[str, Any], counter_type: str) -> int:
"""
Increment and return websocket message counter (either websocket_receive_counter or websocket_send_counter)
"""
scope["datadog"][counter_type] += 1
return scope["datadog"][counter_type]


def _build_websocket_span_pointer_hash(
handshake_trace_id: int,
handshake_span_id: int,
counter: int,
is_server: bool,
is_incoming: bool,
) -> str:
"""
Build websocket span pointer hash.

Format: <prefix><128 bit hex trace id><64 bit hex span id><32 bit hex counter>
Prefix: 'S' for server outgoing or client incoming, 'C' for server incoming or client outgoing
"""
if (is_server and not is_incoming) or (not is_server and is_incoming):
prefix = "S"
else:
prefix = "C"

trace_id_hex = f"{handshake_trace_id:032x}"
span_id_hex = f"{handshake_span_id:016x}"
counter_hex = f"{counter:08x}"

return f"{prefix}{trace_id_hex}{span_id_hex}{counter_hex}"


def _has_distributed_tracing_context(span: Span) -> bool:
"""
Check if the handshake span has extracted distributed tracing context.

A websocket server must not set the span pointer if the handshake has not extracted a context

A span has distributed tracing context if it has a parent context that was
extracted from headers.
"""
if not span or not span._parent_context:
return False
return span._parent_context._is_remote


def _add_websocket_span_pointer_attributes(
link_attributes: Dict[str, Any],
integration_config: Any,
handshake_span: Span,
scope: Dict[str, Any],
is_incoming: bool,
) -> None:
"""
Add span pointer attributes to link_attributes for websocket message correlation.
"""

if not integration_config.distributed_tracing or not _has_distributed_tracing_context(handshake_span):
return

# Increment counter based on message direction
counter_type = "websocket_receive_counter" if is_incoming else "websocket_send_counter"
counter = _increment_websocket_counter(scope, counter_type)

ptr_hash = _build_websocket_span_pointer_hash(
handshake_trace_id=handshake_span.trace_id,
handshake_span_id=handshake_span.span_id,
counter=counter,
is_server=True,
is_incoming=is_incoming,
)

if is_incoming:
link_name = SPAN_POINTER_UP_DIRECTION
ptr_direction = _SpanPointerDirection.UPSTREAM
else:
link_name = SPAN_POINTER_DOWN_DIRECTION
ptr_direction = _SpanPointerDirection.DOWNSTREAM

link_attributes.update(
{
"link.name": link_name,
"dd.kind": _SpanLinkKind.SPAN_POINTER.value,
"ptr.kind": SpanTypes.WEBSOCKET,
"ptr.dir": ptr_direction,
"ptr.hash": ptr_hash,
}
)


def _on_asgi_websocket_receive_message(ctx, scope, message):
"""
Handle websocket receive message events.

This handler is called when a websocket receive message event is dispatched.
It sets up the span with appropriate tags, metrics, and links.
"""
span = ctx.span
integration_config = ctx.get_item("integration_config")
Expand All @@ -1011,24 +1113,28 @@ def _on_asgi_websocket_receive_message(ctx, scope, message):
span.set_metric(websocket.MESSAGE_FRAMES, 1)

if hasattr(ctx, "parent") and ctx.parent.span:
handshake_span = ctx.parent.span
link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED}

_add_websocket_span_pointer_attributes(
link_attributes, integration_config, handshake_span, scope, is_incoming=True
)

span.set_link(
trace_id=ctx.parent.span.trace_id,
span_id=ctx.parent.span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED},
trace_id=handshake_span.trace_id,
span_id=handshake_span.span_id,
attributes=link_attributes,
)

if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True):
_inherit_sampling_tags(span, ctx.parent.span._local_root)
_inherit_sampling_tags(span, handshake_span._local_root)

_copy_trace_level_tags(span, ctx.parent.span)
_copy_trace_level_tags(span, handshake_span)


def _on_asgi_websocket_send_message(ctx, scope, message):
"""
Handle websocket send message events.

This handler is called when a websocket send message event is dispatched.
It sets up the span with appropriate tags, metrics, and links.
"""
span = ctx.span
integration_config = ctx.get_item("integration_config")
Expand All @@ -1041,19 +1147,23 @@ def _on_asgi_websocket_send_message(ctx, scope, message):
span.set_metric(websocket.MESSAGE_FRAMES, 1)

if hasattr(ctx, "parent") and ctx.parent.span:
handshake_span = ctx.parent.span
link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING}

_add_websocket_span_pointer_attributes(
link_attributes, integration_config, handshake_span, scope, is_incoming=False
)

span.set_link(
trace_id=ctx.parent.span.trace_id,
span_id=ctx.parent.span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING},
trace_id=handshake_span.trace_id,
span_id=handshake_span.span_id,
attributes=link_attributes,
)


def _on_asgi_websocket_close_message(ctx, scope, message):
"""
Handle websocket close message events.

This handler is called when a websocket close message event is dispatched.
It sets up the span with appropriate tags, metrics, and links.
"""
span = ctx.span
integration_config = ctx.get_item("integration_config")
Expand All @@ -1068,21 +1178,25 @@ def _on_asgi_websocket_close_message(ctx, scope, message):
_set_websocket_close_tags(span, message)

if hasattr(ctx, "parent") and ctx.parent.span:
handshake_span = ctx.parent.span
link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING}

_add_websocket_span_pointer_attributes(
link_attributes, integration_config, handshake_span, scope, is_incoming=False
)

span.set_link(
trace_id=ctx.parent.span.trace_id,
span_id=ctx.parent.span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING},
trace_id=handshake_span.trace_id,
span_id=handshake_span.span_id,
attributes=link_attributes,
)

_copy_trace_level_tags(span, ctx.parent.span)
_copy_trace_level_tags(span, handshake_span)


def _on_asgi_websocket_disconnect_message(ctx, scope, message):
"""
Handle websocket disconnect message events.

This handler is called when a websocket disconnect message event is dispatched.
It sets up the span with appropriate tags, metrics, and links.
"""
span = ctx.span
integration_config = ctx.get_item("integration_config")
Expand All @@ -1093,16 +1207,23 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message):
_set_websocket_close_tags(span, message)

if hasattr(ctx, "parent") and ctx.parent.span:
handshake_span = ctx.parent.span
link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED}

_add_websocket_span_pointer_attributes(
link_attributes, integration_config, handshake_span, scope, is_incoming=True
)

span.set_link(
trace_id=ctx.parent_span.trace_id,
span_id=ctx.parent_span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED},
trace_id=handshake_span.trace_id,
span_id=handshake_span.span_id,
attributes=link_attributes,
)

if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True):
_inherit_sampling_tags(span, ctx.parent.span._local_root)
_inherit_sampling_tags(span, handshake_span._local_root)

_copy_trace_level_tags(span, ctx.parent.span)
_copy_trace_level_tags(span, handshake_span)


def _on_asgi_request(ctx: core.ExecutionContext) -> None:
Expand All @@ -1115,14 +1236,15 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None:
span = _start_span(ctx)
ctx.set_item("req_span", span)

if scope["type"] == "websocket":
span._set_tag_str("http.upgraded", "websocket")

if "datadog" not in scope:
scope["datadog"] = {"request_spans": [span]}
else:
scope["datadog"]["request_spans"].append(span)

if scope["type"] == "websocket":
span._set_tag_str("http.upgraded", SpanTypes.WEBSOCKET)
_init_websocket_message_counters(scope)


def listen():
core.on("wsgi.request.prepare", _on_request_prepare)
Expand Down
2 changes: 2 additions & 0 deletions ddtrace/internal/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
SAMPLING_DECISION_MAKER_RESOURCE = "_dd.dm.resource"
SPAN_LINK_KIND = "dd.kind"
SPAN_LINKS_KEY = "_dd.span_links"
SPAN_POINTER_DOWN_DIRECTION = "span-pointer-down"
SPAN_POINTER_UP_DIRECTION = "span-pointer-up"
SPAN_EVENTS_KEY = "events"
SPAN_API_DATADOG = "datadog"
SPAN_API_OTEL = "otel"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
Adds span pointers for WebSocket message tracing to enable distributed context propagation across client-server boundaries.
Loading
Loading