Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ddtrace/_trace/_span_pointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class _SpanPointerDirection(Enum):
DOWNSTREAM = "d"


class _SpanPointerDirectionName(Enum):
UPSTREAM = "span-pointer-up"
DOWNSTREAM = "span-pointer-down"


class _SpanPointerDescription(NamedTuple):
# Not to be confused with _SpanPointer. This class describes the parameters
# required to attach a span pointer to a Span. It lets us decouple code
Expand Down
181 changes: 145 additions & 36 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
import ddtrace
from ddtrace import config
from ddtrace._trace._inferred_proxy import create_inferred_proxy_span_if_headers_exist
from ddtrace._trace._span_link import SpanLinkKind as _SpanLinkKind
from ddtrace._trace._span_pointer import _SpanPointerDescription
from ddtrace._trace._span_pointer import _SpanPointerDirection
from ddtrace._trace._span_pointer import _SpanPointerDirectionName
from ddtrace._trace.span import Span
from ddtrace._trace.utils import extract_DD_context_from_messages
from ddtrace.constants import _SPAN_MEASURED_KEY
Expand All @@ -31,6 +34,7 @@
from ddtrace.contrib.internal.trace_utils import _set_url_tag
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanLinkKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import db
from ddtrace.ext import http
from ddtrace.ext import net
Expand All @@ -43,6 +47,7 @@
from ddtrace.internal.constants import FLASK_ENDPOINT
from ddtrace.internal.constants import FLASK_URL_RULE
from ddtrace.internal.constants import FLASK_VIEW_ARGS
from ddtrace.internal.constants import HTTP_REQUEST_UPGRADED
from ddtrace.internal.constants import MESSAGING_BATCH_COUNT
from ddtrace.internal.constants import MESSAGING_DESTINATION_NAME
from ddtrace.internal.constants import MESSAGING_MESSAGE_ID
Expand All @@ -57,6 +62,9 @@

log = get_logger(__name__)

_WEBSOCKET_LINK_ATTRS_EXECUTED = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED}
_WEBSOCKET_LINK_ATTRS_RESUMING = {SPAN_LINK_KIND: SpanLinkKind.RESUMING}


class _TracedIterable(wrapt.ObjectProxy):
def __init__(self, wrapped, span, parent_span, wrapped_is_iterator=False):
Expand Down Expand Up @@ -992,12 +1000,109 @@ def _set_client_ip_tags(scope: Mapping[str, Any], span: Span):
log.debug("Could not validate client IP address for websocket send message: %s", str(e))


def _init_websocket_message_counters(scope: Dict[str, Any]) -> None:
if "datadog" not in scope:
scope["datadog"] = {}
if "websocket_receive_counter" not in scope["datadog"]:
scope["datadog"]["websocket_receive_counter"] = 0
if "websocket_send_counter" not in scope["datadog"]:
scope["datadog"]["websocket_send_counter"] = 0


def _increment_websocket_counter(scope: Dict[str, Any], counter_type: str) -> int:
"""
Increment and return websocket message counter (either websocket_receive_counter or websocket_send_counter)
"""
scope["datadog"][counter_type] += 1
return scope["datadog"][counter_type]


def _build_websocket_span_pointer_hash(
handshake_trace_id: int,
handshake_span_id: int,
counter: int,
is_server: bool,
is_incoming: bool,
) -> str:
"""
Build websocket span pointer hash.

Format: <prefix><128 bit hex trace id><64 bit hex span id><32 bit hex counter>
Prefix: 'S' for server outgoing or client incoming, 'C' for server incoming or client outgoing
"""
if (is_server and not is_incoming) or (not is_server and is_incoming):
prefix = "S"
else:
prefix = "C"

trace_id_hex = f"{handshake_trace_id:032x}"
span_id_hex = f"{handshake_span_id:016x}"
counter_hex = f"{counter:08x}"

return f"{prefix}{trace_id_hex}{span_id_hex}{counter_hex}"


def _has_distributed_tracing_context(span: Span) -> bool:
"""
Check if the handshake span has extracted distributed tracing context.

A websocket server must not set the span pointer if the handshake has not extracted a context

A span has distributed tracing context if it has a parent context that was
extracted from headers.
"""
if not span or not span._parent_context:
return False
return span._parent_context._is_remote


def _add_websocket_span_pointer_attributes(
link_attributes: Dict[str, Any],
integration_config: Any,
handshake_span: Span,
scope: Dict[str, Any],
is_incoming: bool,
) -> None:
"""
Add span pointer attributes to link_attributes for websocket message correlation.
"""

if not integration_config.distributed_tracing or not _has_distributed_tracing_context(handshake_span):
return

# Increment counter based on message direction
counter_type = "websocket_receive_counter" if is_incoming else "websocket_send_counter"
counter = _increment_websocket_counter(scope, counter_type)

ptr_hash = _build_websocket_span_pointer_hash(
handshake_trace_id=handshake_span.trace_id,
handshake_span_id=handshake_span.span_id,
counter=counter,
is_server=True,
is_incoming=is_incoming,
)

if is_incoming:
link_name = _SpanPointerDirectionName.UPSTREAM
ptr_direction = _SpanPointerDirection.UPSTREAM
else:
link_name = _SpanPointerDirectionName.DOWNSTREAM
ptr_direction = _SpanPointerDirection.DOWNSTREAM

link_attributes.update(
{
"link.name": link_name,
"dd.kind": _SpanLinkKind.SPAN_POINTER.value,
"ptr.kind": SpanTypes.WEBSOCKET,
"ptr.dir": ptr_direction,
"ptr.hash": ptr_hash,
}
)


def _on_asgi_websocket_receive_message(ctx, scope, message):
"""
Handle websocket receive message events.

This handler is called when a websocket receive message event is dispatched.
It sets up the span with appropriate tags, metrics, and links.
"""
span = ctx.span
integration_config = ctx.get_item("integration_config")
Expand All @@ -1011,24 +1116,24 @@ def _on_asgi_websocket_receive_message(ctx, scope, message):
span.set_metric(websocket.MESSAGE_FRAMES, 1)

if hasattr(ctx, "parent") and ctx.parent.span:
span.set_link(
trace_id=ctx.parent.span.trace_id,
span_id=ctx.parent.span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED},
handshake_span = ctx.parent.span
link_attributes = _WEBSOCKET_LINK_ATTRS_EXECUTED.copy()

_add_websocket_span_pointer_attributes(
link_attributes, integration_config, handshake_span, scope, is_incoming=True
)

span.link_span(handshake_span.context, link_attributes)

if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True):
_inherit_sampling_tags(span, ctx.parent.span._local_root)
_inherit_sampling_tags(span, handshake_span._local_root)

_copy_trace_level_tags(span, ctx.parent.span)
_copy_trace_level_tags(span, handshake_span)


def _on_asgi_websocket_send_message(ctx, scope, message):
"""
Handle websocket send message events.

This handler is called when a websocket send message event is dispatched.
It sets up the span with appropriate tags, metrics, and links.
"""
span = ctx.span
integration_config = ctx.get_item("integration_config")
Expand All @@ -1041,19 +1146,19 @@ def _on_asgi_websocket_send_message(ctx, scope, message):
span.set_metric(websocket.MESSAGE_FRAMES, 1)

if hasattr(ctx, "parent") and ctx.parent.span:
span.set_link(
trace_id=ctx.parent.span.trace_id,
span_id=ctx.parent.span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING},
handshake_span = ctx.parent.span
link_attributes = _WEBSOCKET_LINK_ATTRS_RESUMING.copy()

_add_websocket_span_pointer_attributes(
link_attributes, integration_config, handshake_span, scope, is_incoming=False
)

span.link_span(handshake_span.context, link_attributes)


def _on_asgi_websocket_close_message(ctx, scope, message):
"""
Handle websocket close message events.

This handler is called when a websocket close message event is dispatched.
It sets up the span with appropriate tags, metrics, and links.
"""
span = ctx.span
integration_config = ctx.get_item("integration_config")
Expand All @@ -1068,21 +1173,21 @@ def _on_asgi_websocket_close_message(ctx, scope, message):
_set_websocket_close_tags(span, message)

if hasattr(ctx, "parent") and ctx.parent.span:
span.set_link(
trace_id=ctx.parent.span.trace_id,
span_id=ctx.parent.span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING},
handshake_span = ctx.parent.span
link_attributes = _WEBSOCKET_LINK_ATTRS_RESUMING.copy()

_add_websocket_span_pointer_attributes(
link_attributes, integration_config, handshake_span, scope, is_incoming=False
)

_copy_trace_level_tags(span, ctx.parent.span)
span.link_span(handshake_span.context, link_attributes)

_copy_trace_level_tags(span, handshake_span)


def _on_asgi_websocket_disconnect_message(ctx, scope, message):
"""
Handle websocket disconnect message events.

This handler is called when a websocket disconnect message event is dispatched.
It sets up the span with appropriate tags, metrics, and links.
"""
span = ctx.span
integration_config = ctx.get_item("integration_config")
Expand All @@ -1093,16 +1198,19 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message):
_set_websocket_close_tags(span, message)

if hasattr(ctx, "parent") and ctx.parent.span:
span.set_link(
trace_id=ctx.parent_span.trace_id,
span_id=ctx.parent_span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED},
handshake_span = ctx.parent.span
link_attributes = _WEBSOCKET_LINK_ATTRS_EXECUTED.copy()

_add_websocket_span_pointer_attributes(
link_attributes, integration_config, handshake_span, scope, is_incoming=True
)

span.link_span(handshake_span.context, link_attributes)

if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True):
_inherit_sampling_tags(span, ctx.parent.span._local_root)
_inherit_sampling_tags(span, handshake_span._local_root)

_copy_trace_level_tags(span, ctx.parent.span)
_copy_trace_level_tags(span, handshake_span)


def _on_asgi_request(ctx: core.ExecutionContext) -> None:
Expand All @@ -1115,14 +1223,15 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None:
span = _start_span(ctx)
ctx.set_item("req_span", span)

if scope["type"] == "websocket":
span._set_tag_str("http.upgraded", "websocket")

if "datadog" not in scope:
scope["datadog"] = {"request_spans": [span]}
else:
scope["datadog"]["request_spans"].append(span)

if scope["type"] == "websocket":
span._set_tag_str(HTTP_REQUEST_UPGRADED, SpanTypes.WEBSOCKET)
_init_websocket_message_counters(scope)


def listen():
core.on("wsgi.request.prepare", _on_request_prepare)
Expand Down
1 change: 1 addition & 0 deletions ddtrace/internal/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
HTTP_REQUEST_HEADER = "http.request.header"
HTTP_REQUEST_PARAMETER = "http.request.parameter"
HTTP_REQUEST_BODY = "http.request.body"
HTTP_REQUEST_UPGRADED = "http.upgraded"
HTTP_REQUEST_PATH_PARAMETER = "http.request.path.parameter"
REQUEST_PATH_PARAMS = "http.request.path_params"
STATUS_403_TYPE_AUTO = {"status_code": 403, "type": "auto"}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
feat(asgi): Enable context propagation between websocket message spans by implementing span pointers between incoming and outgoing messages.
Loading
Loading