Skip to content

Commit fd9e58b

Browse files
fix(profiling): track running asyncio loop if it exists (#15120)
## Description https://datadoghq.atlassian.net/browse/PROF-12842 This PR updates the wrapping and thread-registering logic for the Profiler in order to track the running loop when it exists. This is needed because otherwise, importing/starting the Profiler after starting a Task (or a loop more generally) will make us blind to the existing running loop. Currently, we `wrap` the `asyncio.set_event_loop` function to capture when the Event Loop is first set (or is swapped). However, if the `_asyncio` module that sets up wrapping is imported/executed _after_ the loop has been set, we will miss that first call to `set_event_loop` and be blind to `asyncio` Tasks until the Event Loop is changed (which in many cases never happens). Note that we also need to execute the "find loop and track it" logic when we start the Profiler generally speaking, as in this case we may have tried (earlier) to call `track_event_loop` but that would have failed as no thread was registered in the Profiler. I added four tests that account for various edge cases. Unfortunately, currently, two of them fail (marked them as `xfail`) and there is no way to correctly fix them. The issue is that we can only get _the current running loop_ and not _the current (non-running) event loop_. In other words, if an event loop is created and set in `asyncio`, and immediately after the Profiler is started without a Task having first been started, we will not be able to see that loop from the initialisation code and we will thus not be able to observe it from the Profiler thread. In short, what works is the most common case: * ✅ Import Profiler, start Profiler, import asyncio, start Tasks * ✅ Import asyncio, Import Profiler, start Profiler, start Tasks * ✅ Import asyncio, Import Profiler, start Tasks (from within the Tasks) * 🚫 Import asyncio, Import Profiler, create (non running) event loop, start Profiler, start Task * 🚫 Import asyncio, Import Profiler, create (non running) event loop, create Task, start Profiler It is OK to start with that as I really consider the latter two to be edge cases. **Example: today we miss all `asyncio` data with the following code** ```py # 0. Profiler is NOT imported here, no watching is set up import os import asyncio async def my_coroutine(n): await asyncio.sleep(n) # 0. Function is defined, not run, Profiler is still not imported async def main(): # 3. We get here, import the Profiler module (and _asyncio as well) # We also start watching for set_event_loop_calls – we don't see the existing loop from ddtrace.profiling import Profiler prof = Profiler() prof.start() # Should be as early as possible, eg before other imports, to ensure everything is profiled EXECUTION_TIME_SEC = int(os.environ.get("EXECUTION_TIME_SEC", "2")) t = asyncio.create_task(my_coroutine(EXECUTION_TIME_SEC / 2)) await asyncio.gather(t, my_coroutine(EXECUTION_TIME_SEC)) # 4. Interestingly, we detect a set_event_loop call here, but it's # being set to None before exiting # 1. This is executed first if __name__ == "__main__": # 2. This implicitly creates and set the Event Loop asyncio.run(main()) ``` ## Testing I have tested this in `prof-correctness` (initially just replicated that it _did not_ work) and it now works as expected. I will be adding more correctness tests, one with a "top of file" import and Profiler start, one with a "top of file import" and "in-code Profiler start", and one with both an "in-code file import" and "in-code Profiler start". I also added four new tests to make sure we catch different edge cases with order of imports and order of task/profiler starts. Currently, two of them are marked as `XFAILED` because there is no way to reliably make them pass.
1 parent b4bd468 commit fd9e58b

File tree

5 files changed

+815
-12
lines changed

5 files changed

+815
-12
lines changed

ddtrace/internal/datadog/profiling/stack_v2/__init__.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ from typing import Optional, Sequence, Union
44

55
def register_thread(id: int, native_id: int, name: str) -> None: ... # noqa: A002
66
def unregister_thread(name: str) -> None: ...
7-
def track_asyncio_loop(thread_id: int, loop: asyncio.AbstractEventLoop) -> None: ...
7+
def track_asyncio_loop(thread_id: int, loop: Optional[asyncio.AbstractEventLoop]) -> None: ...
88
def link_tasks(parent: asyncio.AbstractEventLoop, child: asyncio.Task) -> None: ...
99
def init_asyncio(
1010
current_tasks: Sequence[asyncio.Task],

ddtrace/profiling/_asyncio.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
THREAD_LINK = None # type: typing.Optional[_threading._ThreadLink]
2222

23+
ASYNCIO_IMPORTED = False
24+
2325

2426
def current_task(loop: typing.Union["asyncio.AbstractEventLoop", None] = None) -> typing.Union["asyncio.Task", None]:
2527
return None
@@ -35,10 +37,51 @@ def _task_get_name(task: "asyncio.Task") -> str:
3537
return "Task-%d" % id(task)
3638

3739

40+
def _call_init_asyncio(asyncio: ModuleType) -> None:
41+
from asyncio import tasks as asyncio_tasks
42+
43+
if sys.hexversion >= 0x030C0000:
44+
scheduled_tasks = asyncio_tasks._scheduled_tasks.data # type: ignore[attr-defined]
45+
eager_tasks = asyncio_tasks._eager_tasks # type: ignore[attr-defined]
46+
else:
47+
scheduled_tasks = asyncio_tasks._all_tasks.data # type: ignore[attr-defined]
48+
eager_tasks = None
49+
50+
stack_v2.init_asyncio(asyncio_tasks._current_tasks, scheduled_tasks, eager_tasks) # type: ignore[attr-defined]
51+
52+
53+
def link_existing_loop_to_current_thread() -> None:
54+
global ASYNCIO_IMPORTED
55+
56+
# Only proceed if asyncio is actually imported and available
57+
# Don't rely solely on ASYNCIO_IMPORTED global since it persists across forks
58+
if not ASYNCIO_IMPORTED or "asyncio" not in sys.modules:
59+
return
60+
61+
import asyncio
62+
63+
# Only track if there's actually a running loop
64+
running_loop: typing.Union["asyncio.AbstractEventLoop", None] = None
65+
try:
66+
running_loop = asyncio.get_running_loop()
67+
except RuntimeError:
68+
# No existing loop to track, nothing to do
69+
return
70+
71+
# We have a running loop, track it
72+
assert THREAD_LINK is not None # nosec: assert is used for typing
73+
THREAD_LINK.clear_threads(set(sys._current_frames().keys()))
74+
THREAD_LINK.link_object(running_loop)
75+
stack_v2.track_asyncio_loop(typing.cast(int, ddtrace_threading.current_thread().ident), running_loop)
76+
_call_init_asyncio(asyncio)
77+
78+
3879
@ModuleWatchdog.after_module_imported("asyncio")
39-
def _(asyncio):
40-
# type: (ModuleType) -> None
80+
def _(asyncio: ModuleType) -> None:
4181
global THREAD_LINK
82+
global ASYNCIO_IMPORTED
83+
84+
ASYNCIO_IMPORTED = True
4285

4386
if hasattr(asyncio, "current_task"):
4487
globals()["current_task"] = asyncio.current_task
@@ -57,7 +100,7 @@ def _(asyncio):
57100
if THREAD_LINK is None:
58101
THREAD_LINK = _threading._ThreadLink()
59102

60-
init_stack_v2 = config.stack.v2_enabled and stack_v2.is_available
103+
init_stack_v2: bool = config.stack.v2_enabled and stack_v2.is_available
61104

62105
@partial(wrap, sys.modules["asyncio.events"].BaseDefaultEventLoopPolicy.set_event_loop)
63106
def _(f, args, kwargs):
@@ -91,14 +134,7 @@ def _(f, args, kwargs):
91134
for child in children:
92135
stack_v2.link_tasks(parent, child)
93136

94-
if sys.hexversion >= 0x030C0000:
95-
scheduled_tasks = asyncio.tasks._scheduled_tasks.data
96-
eager_tasks = asyncio.tasks._eager_tasks
97-
else:
98-
scheduled_tasks = asyncio.tasks._all_tasks.data
99-
eager_tasks = None
100-
101-
stack_v2.init_asyncio(asyncio.tasks._current_tasks, scheduled_tasks, eager_tasks)
137+
_call_init_asyncio(asyncio)
102138

103139

104140
def get_event_loop_for_thread(thread_id: int) -> typing.Union["asyncio.AbstractEventLoop", None]:

ddtrace/profiling/collector/threading.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,8 @@ def thread_bootstrap_inner(self, *args, **kwargs):
6868
# Instrument any living threads
6969
for thread_id, thread in ddtrace_threading._active.items(): # type: ignore[attr-defined]
7070
stack_v2.register_thread(thread_id, thread.native_id, thread.name)
71+
72+
# Import _asyncio to ensure asyncio post-import wrappers are initialised
73+
from ddtrace.profiling import _asyncio # noqa: F401
74+
75+
_asyncio.link_existing_loop_to_current_thread()
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
fixes:
2+
- |
3+
profiling: this fix resolves an issue where importing the profiler module after an asyncio Event Loop had been
4+
started would make the Profiler blind to the existing Event Loop and its Tasks.

0 commit comments

Comments
 (0)