Skip to content

Commit 1338d6e

Browse files
authored
chore(internal): add process_tags to APM payload (#15146)
This PR implements this [RFC](https://docs.google.com/document/d/1AFdLUuVk70i0bJd5335-RxqsvwAV9ovAqcO2z5mEMbA/edit?pli=1&tab=t.0#heading=h.s9l1lctqlg11) for the tracing product. Add process_tags in APM payload. According to the RFC and as the python tracer supports only v0.4 and v0.5 the tag `_dd.tags.process` is set in the first first span of each trace chunk set. The set tag are: - `entrypoint.workdir`. The working dir name, should be the last path segment. - `entrypoint.name`. The entrypoint of the application. It is the the script name. - `entrypoint.type`. I'd assume that it can be only script for Python. - `entrypoint.basedir`. The base dir is the directory where the called executable resides. Should be the last path segment.
1 parent 2ccf506 commit 1338d6e

16 files changed

+507
-0
lines changed

ddtrace/_trace/processor/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
from ddtrace.constants import _APM_ENABLED_METRIC_KEY as MK_APM_ENABLED
1515
from ddtrace.constants import _SINGLE_SPAN_SAMPLING_MECHANISM
1616
from ddtrace.internal import gitmetadata
17+
from ddtrace.internal import process_tags
1718
from ddtrace.internal import telemetry
1819
from ddtrace.internal.constants import COMPONENT
1920
from ddtrace.internal.constants import HIGHER_ORDER_TRACE_ID_BITS
2021
from ddtrace.internal.constants import LAST_DD_PARENT_ID_KEY
2122
from ddtrace.internal.constants import MAX_UINT_64BITS
23+
from ddtrace.internal.constants import PROCESS_TAGS
2224
from ddtrace.internal.constants import SAMPLING_DECISION_TRACE_TAG_KEY
2325
from ddtrace.internal.constants import SamplingMechanism
2426
from ddtrace.internal.logger import get_logger
@@ -250,6 +252,8 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
250252
span._update_tags_from_context()
251253
self._set_git_metadata(span)
252254
span._set_tag_str("language", "python")
255+
if p_tags := process_tags.process_tags:
256+
span._set_tag_str(PROCESS_TAGS, p_tags)
253257
# for 128 bit trace ids
254258
if span.trace_id > MAX_UINT_64BITS:
255259
trace_id_hob = _get_64_highest_order_bits_as_hex(span.trace_id)

ddtrace/debugging/_encoding.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from ddtrace.debugging._signal.log import LogSignal
2121
from ddtrace.debugging._signal.snapshot import Snapshot
2222
from ddtrace.internal import forksafe
23+
from ddtrace.internal import process_tags
2324
from ddtrace.internal._encoding import BufferFull
2425
from ddtrace.internal.logger import get_logger
2526
from ddtrace.internal.utils.formats import format_trace_id
@@ -113,6 +114,9 @@ def _build_log_track_payload(
113114
"timestamp": int(signal.timestamp * 1e3), # milliseconds,
114115
}
115116

117+
if p_tags := process_tags.process_tags:
118+
payload["process_tags"] = p_tags
119+
116120
# Add the correlation IDs if available
117121
if context is not None and context.trace_id is not None:
118122
payload["dd"] = {

ddtrace/internal/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
HTTP_REQUEST_PATH_PARAMETER = "http.request.path.parameter"
7272
REQUEST_PATH_PARAMS = "http.request.path_params"
7373
STATUS_403_TYPE_AUTO = {"status_code": 403, "type": "auto"}
74+
PROCESS_TAGS = "_dd.tags.process"
7475

7576
CONTAINER_ID_HEADER_NAME = "Datadog-Container-Id"
7677

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import os
2+
from pathlib import Path
3+
import re
4+
import sys
5+
from typing import Optional
6+
7+
from ddtrace.internal.logger import get_logger
8+
from ddtrace.internal.settings._config import config
9+
10+
11+
log = get_logger(__name__)
12+
13+
ENTRYPOINT_NAME_TAG = "entrypoint.name"
14+
ENTRYPOINT_WORKDIR_TAG = "entrypoint.workdir"
15+
ENTRYPOINT_TYPE_TAG = "entrypoint.type"
16+
ENTRYPOINT_TYPE_SCRIPT = "script"
17+
ENTRYPOINT_BASEDIR_TAG = "entrypoint.basedir"
18+
19+
_CONSECUTIVE_UNDERSCORES_PATTERN = re.compile(r"_{2,}")
20+
_ALLOWED_CHARS = _ALLOWED_CHARS = frozenset("abcdefghijklmnopqrstuvwxyz0123456789/:._-")
21+
MAX_LENGTH = 200
22+
23+
24+
def normalize_tag_value(value: str) -> str:
25+
# we copy the behavior of the agent which
26+
# checks the size on the original value and not on
27+
# an intermediary normalized step
28+
if len(value) > MAX_LENGTH:
29+
value = value[:MAX_LENGTH]
30+
31+
result = value.lower()
32+
33+
def is_allowed_char(char: str) -> str:
34+
# ASCII alphanumeric and special chars: / : . _ -
35+
if char in _ALLOWED_CHARS:
36+
return char
37+
# Unicode letters and digits
38+
if char.isalpha() or char.isdigit():
39+
return char
40+
return "_"
41+
42+
result = "".join(is_allowed_char(char) for char in result)
43+
result = _CONSECUTIVE_UNDERSCORES_PATTERN.sub("_", result)
44+
return result.strip("_")
45+
46+
47+
def generate_process_tags() -> Optional[str]:
48+
if not config._process_tags_enabled:
49+
return None
50+
51+
try:
52+
return ",".join(
53+
f"{key}:{normalize_tag_value(value)}"
54+
for key, value in sorted(
55+
[
56+
(ENTRYPOINT_WORKDIR_TAG, os.path.basename(os.getcwd())),
57+
(ENTRYPOINT_BASEDIR_TAG, Path(sys.argv[0]).resolve().parent.name),
58+
(ENTRYPOINT_NAME_TAG, os.path.splitext(os.path.basename(sys.argv[0]))[0]),
59+
(ENTRYPOINT_TYPE_TAG, ENTRYPOINT_TYPE_SCRIPT),
60+
]
61+
)
62+
)
63+
except Exception as e:
64+
log.debug("failed to get process_tags: %s", e)
65+
return None
66+
67+
68+
process_tags = generate_process_tags()

ddtrace/internal/settings/_config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,9 @@ def __init__(self):
665665
self._trace_resource_renaming_always_simplified_endpoint = _get_config(
666666
"DD_TRACE_RESOURCE_RENAMING_ALWAYS_SIMPLIFIED_ENDPOINT", default=False, modifier=asbool
667667
)
668+
self._process_tags_enabled = _get_config(
669+
"DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED", default=False, modifier=asbool
670+
)
668671

669672
# Long-running span interval configurations
670673
# Only supported for Ray spans for now

tests/debugging/test_encoding.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,61 @@ def test_batch_json_encoder():
250250
assert queue.count == 0
251251

252252

253+
def test_process_tags_are_not_included_by_default():
254+
s = Snapshot(
255+
probe=create_snapshot_line_probe(probe_id="batch-test", source_file="foo.py", line=42),
256+
frame=inspect.currentframe(),
257+
thread=threading.current_thread(),
258+
)
259+
buffer_size = 30 * (1 << 20)
260+
queue = SignalQueue(encoder=LogSignalJsonEncoder(None), buffer_size=buffer_size)
261+
262+
s.line({})
263+
264+
queue = SignalQueue(encoder=LogSignalJsonEncoder("test-service"))
265+
queue.put(s)
266+
data = queue.flush()
267+
assert data is not None
268+
payload, _ = data
269+
decoded = json.loads(payload.decode())
270+
assert "process_tags" not in decoded[0]
271+
272+
273+
@pytest.mark.subprocess(
274+
env=dict(
275+
DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED="true",
276+
)
277+
)
278+
def test_process_tags_are_included():
279+
import inspect
280+
import json
281+
import threading
282+
283+
from ddtrace.debugging._encoding import LogSignalJsonEncoder
284+
from ddtrace.debugging._encoding import SignalQueue
285+
from ddtrace.debugging._signal.snapshot import Snapshot
286+
from tests.debugging.utils import create_snapshot_line_probe
287+
288+
s = Snapshot(
289+
probe=create_snapshot_line_probe(probe_id="batch-test", source_file="foo.py", line=42),
290+
frame=inspect.currentframe(),
291+
thread=threading.current_thread(),
292+
)
293+
buffer_size = 30 * (1 << 20)
294+
queue = SignalQueue(encoder=LogSignalJsonEncoder(None), buffer_size=buffer_size)
295+
296+
s.line({})
297+
298+
queue = SignalQueue(encoder=LogSignalJsonEncoder("test-service"))
299+
queue.put(s)
300+
data = queue.flush()
301+
assert data is not None
302+
payload, _ = data
303+
decoded = json.loads(payload.decode())
304+
305+
assert "process_tags" in decoded[0]
306+
307+
253308
def test_batch_flush_reencode():
254309
s = Snapshot(
255310
probe=create_snapshot_line_probe(probe_id="batch-test", source_file="foo.py", line=42),
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
from unittest.mock import patch
2+
3+
import pytest
4+
5+
from ddtrace.internal import process_tags
6+
from ddtrace.internal.constants import PROCESS_TAGS
7+
from ddtrace.internal.process_tags import ENTRYPOINT_BASEDIR_TAG
8+
from ddtrace.internal.process_tags import ENTRYPOINT_NAME_TAG
9+
from ddtrace.internal.process_tags import ENTRYPOINT_TYPE_TAG
10+
from ddtrace.internal.process_tags import ENTRYPOINT_WORKDIR_TAG
11+
from ddtrace.internal.process_tags import normalize_tag_value
12+
from ddtrace.internal.settings._config import config
13+
from tests.subprocesstest import run_in_subprocess
14+
from tests.utils import TracerTestCase
15+
from tests.utils import process_tag_reload
16+
17+
18+
TEST_SCRIPT_PATH = "/path/to/test_script.py"
19+
TEST_WORKDIR_PATH = "/path/to/workdir"
20+
21+
22+
@pytest.mark.parametrize(
23+
"input_tag,expected",
24+
[
25+
("#test_starting_hash", "test_starting_hash"),
26+
("TestCAPSandSuch", "testcapsandsuch"),
27+
("Test Conversion Of Weird !@#$%^&**() Characters", "test_conversion_of_weird_characters"),
28+
("$#weird_starting", "weird_starting"),
29+
("allowed:c0l0ns", "allowed:c0l0ns"),
30+
("1love", "1love"),
31+
("/love2", "/love2"),
32+
("ünicöde", "ünicöde"),
33+
("ünicöde:metäl", "ünicöde:metäl"),
34+
("Data🐨dog🐶 繋がっ⛰てて", "data_dog_繋がっ_てて"),
35+
(" spaces ", "spaces"),
36+
(" #hashtag!@#spaces #__<># ", "hashtag_spaces"),
37+
(":testing", ":testing"),
38+
("_foo", "foo"),
39+
(":::test", ":::test"),
40+
("contiguous_____underscores", "contiguous_underscores"),
41+
("foo_", "foo"),
42+
("\u017fodd_\u017fcase\u017f", "\u017fodd_\u017fcase\u017f"),
43+
("", ""),
44+
(" ", ""),
45+
("ok", "ok"),
46+
("™Ö™Ö™™Ö™", "ö_ö_ö"),
47+
("AlsO:ök", "also:ök"),
48+
(":still_ok", ":still_ok"),
49+
("___trim", "trim"),
50+
("12.:trim@", "12.:trim"),
51+
("12.:trim@@", "12.:trim"),
52+
("fun:ky__tag/1", "fun:ky_tag/1"),
53+
("fun:ky@tag/2", "fun:ky_tag/2"),
54+
("fun:ky@@@tag/3", "fun:ky_tag/3"),
55+
("tag:1/2.3", "tag:1/2.3"),
56+
("---fun:k####y_ta@#g/1_@@#", "---fun:k_y_ta_g/1"),
57+
("AlsO:œ#@ö))œk", "also:œ_ö_œk"),
58+
("test\x99\x8faaa", "test_aaa"),
59+
("test\x99\x8f", "test"),
60+
("a" * 888, "a" * 200),
61+
("a" + "🐶" * 799 + "b", "a"),
62+
("a" + "\ufffd", "a"),
63+
("a" + "\ufffd" + "\ufffd", "a"),
64+
("a" + "\ufffd" + "\ufffd" + "b", "a_b"),
65+
(
66+
"A00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
67+
"000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
68+
" 000000000000",
69+
"a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
70+
"000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"
71+
"_0",
72+
),
73+
],
74+
)
75+
def test_normalize_tag(input_tag, expected):
76+
assert normalize_tag_value(input_tag) == expected
77+
78+
79+
class TestProcessTags(TracerTestCase):
80+
def setUp(self):
81+
super(TestProcessTags, self).setUp()
82+
self._original_process_tags_enabled = config._process_tags_enabled
83+
self._original_process_tags = process_tags.process_tags
84+
85+
def tearDown(self):
86+
config._process_tags_enabled = self._original_process_tags_enabled
87+
process_tags.process_tags = self._original_process_tags
88+
super().tearDown()
89+
90+
@pytest.mark.snapshot
91+
def test_process_tags_deactivated(self):
92+
config._process_tags_enabled = False
93+
process_tag_reload()
94+
95+
with self.tracer.trace("test"):
96+
pass
97+
98+
@pytest.mark.snapshot
99+
def test_process_tags_activated(self):
100+
with patch("sys.argv", [TEST_SCRIPT_PATH]), patch("os.getcwd", return_value=TEST_WORKDIR_PATH):
101+
config._process_tags_enabled = True
102+
process_tag_reload()
103+
104+
with self.tracer.trace("parent"):
105+
with self.tracer.trace("child"):
106+
pass
107+
108+
@pytest.mark.snapshot
109+
def test_process_tags_edge_case(self):
110+
with patch("sys.argv", ["/test_script"]), patch("os.getcwd", return_value=TEST_WORKDIR_PATH):
111+
config._process_tags_enabled = True
112+
process_tag_reload()
113+
114+
with self.tracer.trace("span"):
115+
pass
116+
117+
@pytest.mark.snapshot
118+
def test_process_tags_error(self):
119+
with patch("sys.argv", []), patch("os.getcwd", return_value=TEST_WORKDIR_PATH):
120+
config._process_tags_enabled = True
121+
122+
with self.override_global_config(dict(_telemetry_enabled=False)):
123+
with patch("ddtrace.internal.process_tags.log") as mock_log:
124+
process_tag_reload()
125+
126+
with self.tracer.trace("span"):
127+
pass
128+
129+
# Check if debug log was called
130+
mock_log.debug.assert_called_once()
131+
call_args = mock_log.debug.call_args[0]
132+
assert "failed to get process_tags" in call_args[0], (
133+
f"Expected error message not found. Got: {call_args[0]}"
134+
)
135+
136+
@pytest.mark.snapshot
137+
@run_in_subprocess(env_overrides=dict(DD_TRACE_PARTIAL_FLUSH_ENABLED="true", DD_TRACE_PARTIAL_FLUSH_MIN_SPANS="2"))
138+
def test_process_tags_partial_flush(self):
139+
with patch("sys.argv", [TEST_SCRIPT_PATH]), patch("os.getcwd", return_value=TEST_WORKDIR_PATH):
140+
config._process_tags_enabled = True
141+
process_tag_reload()
142+
143+
with self.override_global_config(dict(_partial_flush_enabled=True, _partial_flush_min_spans=2)):
144+
with self.tracer.trace("parent"):
145+
with self.tracer.trace("child1"):
146+
pass
147+
with self.tracer.trace("child2"):
148+
pass
149+
150+
@run_in_subprocess(env_overrides=dict(DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED="True"))
151+
def test_process_tags_activated_with_env(self):
152+
with self.tracer.trace("test"):
153+
pass
154+
155+
span = self.get_spans()[0]
156+
157+
assert span is not None
158+
assert PROCESS_TAGS in span._meta
159+
160+
process_tags = span._meta[PROCESS_TAGS]
161+
assert ENTRYPOINT_BASEDIR_TAG in process_tags
162+
assert ENTRYPOINT_NAME_TAG in process_tags
163+
assert ENTRYPOINT_TYPE_TAG in process_tags
164+
assert ENTRYPOINT_WORKDIR_TAG in process_tags
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
[[
2+
{
3+
"name": "span",
4+
"service": "tests.internal",
5+
"resource": "span",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "",
10+
"error": 0,
11+
"meta": {
12+
"_dd.p.dm": "-0",
13+
"_dd.p.tid": "6911da3a00000000",
14+
"_dd.tags.process": "entrypoint.basedir:,entrypoint.name:test_script,entrypoint.type:script,entrypoint.workdir:workdir",
15+
"language": "python",
16+
"runtime-id": "c9342b8003de45feb0bf56d32ece46a1"
17+
},
18+
"metrics": {
19+
"_dd.top_level": 1,
20+
"_dd.tracer_kr": 1.0,
21+
"_sampling_priority_v1": 1,
22+
"process_id": 605
23+
},
24+
"duration": 105292,
25+
"start": 1762777658431833668
26+
}]]

0 commit comments

Comments
 (0)