Skip to content

Commit 0a50af6

Browse files
chore(profiling): dedupe lock profiler code in _acquire and _release (#15088)
https://datadoghq.atlassian.net/browse/PROF-12594 ## Description There's quite a bit of code duplication between the `_acquire` and `_release` methods. It makes reasoning about the two behaviors difficult, and is error-prone when making changes. Code reuse here will help highlight key differences between what these methods are doing. **Changes** * consolidate duplicated code into a helper method and reuse it * group wrapper's acquire/release methods together for readability ## Testing * green signals ## Risks low: tests should catch anything out of place
1 parent 4d0ed61 commit 0a50af6

File tree

1 file changed

+65
-93
lines changed

1 file changed

+65
-93
lines changed

ddtrace/profiling/collector/_lock.py

Lines changed: 65 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,15 @@ def __init__(
6969
self._self_acquired_at: int = 0
7070
self._self_name: Optional[str] = None
7171

72+
def acquire(self, *args: Any, **kwargs: Any) -> Any:
73+
return self._acquire(self.__wrapped__.acquire, *args, **kwargs)
74+
75+
def __enter__(self, *args: Any, **kwargs: Any) -> Any:
76+
return self._acquire(self.__wrapped__.__enter__, *args, **kwargs)
77+
7278
def __aenter__(self, *args: Any, **kwargs: Any) -> Any:
7379
return self._acquire(self.__wrapped__.__aenter__, *args, **kwargs)
7480

75-
def __aexit__(self, *args: Any, **kwargs: Any) -> Any:
76-
return self._release(self.__wrapped__.__aexit__, *args, **kwargs)
77-
7881
def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
7982
if not self._self_capture_sampler.capture():
8083
return inner_func(*args, **kwargs)
@@ -83,55 +86,22 @@ def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) ->
8386
try:
8487
return inner_func(*args, **kwargs)
8588
finally:
89+
end: int = time.monotonic_ns()
90+
self._self_acquired_at = end
8691
try:
87-
end: int = time.monotonic_ns()
88-
self._self_acquired_at = end
89-
90-
thread_id: int
91-
thread_name: str
92-
thread_id, thread_name = _current_thread()
93-
94-
task_id: Optional[int]
95-
task_name: Optional[str]
96-
task_frame: Optional[FrameType]
97-
task_id, task_name, task_frame = _task.get_task(thread_id)
98-
9992
self._maybe_update_self_name()
100-
lock_name: str = (
101-
"%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc
102-
)
103-
104-
frame: FrameType
105-
if task_frame is None:
106-
# If we can't get the task frame, we use the caller frame. We expect acquire/release or
107-
# __enter__/__exit__ to be on the stack, so we go back 2 frames.
108-
frame = sys._getframe(2)
109-
else:
110-
frame = task_frame
111-
112-
frames: List[DDFrame]
113-
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)
114-
115-
thread_native_id: int = _threading.get_thread_native_id(thread_id)
116-
117-
handle: ddup.SampleHandle = ddup.SampleHandle()
118-
handle.push_monotonic_ns(end)
119-
handle.push_lock_name(lock_name)
120-
handle.push_acquire(end - start, 1) # AFAICT, capture_pct does not adjust anything here
121-
handle.push_threadinfo(thread_id, thread_native_id, thread_name)
122-
handle.push_task_id(task_id)
123-
handle.push_task_name(task_name)
124-
125-
if self._self_tracer is not None:
126-
handle.push_span(self._self_tracer.current_span())
127-
for ddframe in frames:
128-
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)
129-
handle.flush_sample()
93+
self._flush_sample(start, end, is_acquire=True)
13094
except Exception:
13195
pass # nosec
13296

133-
def acquire(self, *args: Any, **kwargs: Any) -> Any:
134-
return self._acquire(self.__wrapped__.acquire, *args, **kwargs)
97+
def release(self, *args: Any, **kwargs: Any) -> Any:
98+
return self._release(self.__wrapped__.release, *args, **kwargs)
99+
100+
def __exit__(self, *args: Any, **kwargs: Any) -> None:
101+
self._release(self.__wrapped__.__exit__, *args, **kwargs)
102+
103+
def __aexit__(self, *args: Any, **kwargs: Any) -> Any:
104+
return self._release(self.__wrapped__.__aexit__, *args, **kwargs)
135105

136106
def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
137107
# The underlying threading.Lock class is implemented using C code, and
@@ -151,59 +121,61 @@ def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) ->
151121
except AttributeError:
152122
# We just ignore the error, if the attribute is not found.
153123
pass
124+
154125
try:
155126
return inner_func(*args, **kwargs)
156127
finally:
157128
if start is not None:
158-
end: int = time.monotonic_ns()
159-
160-
thread_id: int
161-
thread_name: str
162-
thread_id, thread_name = _current_thread()
163-
164-
task_id: Optional[int]
165-
task_name: Optional[str]
166-
task_frame: Optional[FrameType]
167-
task_id, task_name, task_frame = _task.get_task(thread_id)
168-
169-
lock_name: str = (
170-
"%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc
171-
)
172-
173-
frame: FrameType
174-
if task_frame is None:
175-
# See the comments in _acquire
176-
frame = sys._getframe(2)
177-
else:
178-
frame = task_frame
179-
180-
frames: List[DDFrame]
181-
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)
182-
183-
thread_native_id: int = _threading.get_thread_native_id(thread_id)
184-
185-
handle: ddup.SampleHandle = ddup.SampleHandle()
186-
handle.push_monotonic_ns(end)
187-
handle.push_lock_name(lock_name)
188-
handle.push_release(end - start, 1) # AFAICT, capture_pct does not adjust anything here
189-
handle.push_threadinfo(thread_id, thread_native_id, thread_name)
190-
handle.push_task_id(task_id)
191-
handle.push_task_name(task_name)
192-
193-
if self._self_tracer is not None:
194-
handle.push_span(self._self_tracer.current_span())
195-
for ddframe in frames:
196-
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)
197-
handle.flush_sample()
129+
self._flush_sample(start, end=time.monotonic_ns(), is_acquire=False)
198130

199-
def release(self, *args: Any, **kwargs: Any) -> Any:
200-
return self._release(self.__wrapped__.release, *args, **kwargs)
131+
def _flush_sample(self, start: int, end: int, is_acquire: bool) -> None:
132+
"""Helper method to push lock profiling data to ddup.
201133
202-
def __enter__(self, *args: Any, **kwargs: Any) -> Any:
203-
return self._acquire(self.__wrapped__.__enter__, *args, **kwargs)
134+
Args:
135+
start: Start timestamp in nanoseconds
136+
end: End timestamp in nanoseconds
137+
is_acquire: True for acquire operations, False for release operations
138+
"""
139+
handle: ddup.SampleHandle = ddup.SampleHandle()
204140

205-
def __exit__(self, *args: Any, **kwargs: Any) -> None:
206-
self._release(self.__wrapped__.__exit__, *args, **kwargs)
141+
handle.push_monotonic_ns(end)
142+
143+
lock_name: str = f"{self._self_init_loc}:{self._self_name}" if self._self_name else self._self_init_loc
144+
handle.push_lock_name(lock_name)
145+
146+
duration_ns: int = end - start
147+
if is_acquire:
148+
handle.push_acquire(duration_ns, 1)
149+
else:
150+
handle.push_release(duration_ns, 1)
151+
152+
thread_id: int
153+
thread_name: str
154+
thread_id, thread_name = _current_thread()
155+
156+
task_id: Optional[int]
157+
task_name: Optional[str]
158+
task_frame: Optional[FrameType]
159+
task_id, task_name, task_frame = _task.get_task(thread_id)
160+
161+
handle.push_task_id(task_id)
162+
handle.push_task_name(task_name)
163+
164+
thread_native_id: int = _threading.get_thread_native_id(thread_id)
165+
handle.push_threadinfo(thread_id, thread_native_id, thread_name)
166+
167+
if self._self_tracer is not None:
168+
handle.push_span(self._self_tracer.current_span())
169+
170+
# If we can't get the task frame, we use the caller frame.
171+
# Call stack: 0: _flush_sample, 1: _acquire/_release, 2: acquire/release/__enter__/__exit__, 3: caller
172+
frame: FrameType = task_frame or sys._getframe(3)
173+
frames: List[DDFrame]
174+
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)
175+
for ddframe in frames:
176+
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)
177+
178+
handle.flush_sample()
207179

208180
def _find_self_name(self, var_dict: Dict[str, Any]) -> Optional[str]:
209181
for name, value in var_dict.items():

0 commit comments

Comments
 (0)