Skip to content

Commit 233c132

Browse files
feat(closes OPEN-7550): add tracing for openai chat completions parse method
1 parent 9530729 commit 233c132

File tree

3 files changed

+501
-2
lines changed

3 files changed

+501
-2
lines changed

examples/tracing/openai/openai_tracing.ipynb

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,38 @@
106106
")"
107107
]
108108
},
109+
{
110+
"cell_type": "markdown",
111+
"id": "09d39983",
112+
"metadata": {},
113+
"source": [
114+
"#### Parse method (Structured Outputs)"
115+
]
116+
},
117+
{
118+
"cell_type": "code",
119+
"execution_count": null,
120+
"id": "9a86642c",
121+
"metadata": {},
122+
"outputs": [],
123+
"source": [
124+
"from pydantic import BaseModel\n",
125+
"\n",
126+
"class CalendarEvent(BaseModel):\n",
127+
" name: str\n",
128+
" date: str\n",
129+
" participants: list[str]\n",
130+
"\n",
131+
"completion = openai_client.chat.completions.parse(\n",
132+
" model=\"gpt-4o-mini\",\n",
133+
" messages=[\n",
134+
" {\"role\": \"system\", \"content\": \"Extract the event information.\"},\n",
135+
" {\"role\": \"user\", \"content\": \"Alice and Bob are going to a science fair on Friday.\"},\n",
136+
" ],\n",
137+
" response_format=CalendarEvent,\n",
138+
")\n"
139+
]
140+
},
109141
{
110142
"cell_type": "markdown",
111143
"id": "4e6fb396",
@@ -131,7 +163,7 @@
131163
],
132164
"metadata": {
133165
"kernelspec": {
134-
"display_name": "bedrock-test",
166+
"display_name": "base",
135167
"language": "python",
136168
"name": "python3"
137169
},
@@ -145,7 +177,7 @@
145177
"name": "python",
146178
"nbconvert_exporter": "python",
147179
"pygments_lexer": "ipython3",
148-
"version": "3.12.3"
180+
"version": "3.12.7"
149181
}
150182
},
151183
"nbformat": 4,

src/openlayer/lib/integrations/async_openai_tracer.py

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
create_trace_args,
2222
add_to_trace,
2323
parse_non_streaming_output_data,
24+
parse_structured_output_data,
2425
# Import Responses API helper functions
2526
extract_responses_chunk_data,
2627
extract_responses_inputs,
@@ -98,6 +99,33 @@ async def traced_chat_create_func(*args, **kwargs):
9899

99100
client.chat.completions.create = traced_chat_create_func
100101

102+
# Patch parse method if it exists
103+
if hasattr(client.chat.completions, 'parse'):
104+
parse_func = client.chat.completions.parse
105+
106+
@wraps(parse_func)
107+
async def traced_parse_func(*args, **kwargs):
108+
inference_id = kwargs.pop("inference_id", None)
109+
stream = kwargs.get("stream", False)
110+
111+
if stream:
112+
return handle_async_streaming_parse(
113+
*args,
114+
**kwargs,
115+
parse_func=parse_func,
116+
inference_id=inference_id,
117+
is_azure_openai=is_azure_openai,
118+
)
119+
return await handle_async_non_streaming_parse(
120+
*args,
121+
**kwargs,
122+
parse_func=parse_func,
123+
inference_id=inference_id,
124+
is_azure_openai=is_azure_openai,
125+
)
126+
127+
client.chat.completions.parse = traced_parse_func
128+
101129
# Patch Responses API (if available)
102130
if hasattr(client, "responses"):
103131
responses_create_func = client.responses.create
@@ -466,3 +494,186 @@ async def handle_async_responses_non_streaming_create(
466494
logger.error("Failed to trace the Responses API request with Openlayer. %s", e)
467495

468496
return response
497+
498+
499+
async def handle_async_streaming_parse(
500+
parse_func: callable,
501+
*args,
502+
is_azure_openai: bool = False,
503+
inference_id: Optional[str] = None,
504+
**kwargs,
505+
) -> AsyncIterator[Any]:
506+
"""Handles the parse method when streaming is enabled.
507+
508+
Parameters
509+
----------
510+
parse_func : callable
511+
The parse method to handle.
512+
is_azure_openai : bool, optional
513+
Whether the client is an Azure OpenAI client, by default False
514+
inference_id : Optional[str], optional
515+
A user-generated inference id, by default None
516+
517+
Returns
518+
-------
519+
AsyncIterator[Any]
520+
A generator that yields the chunks of the completion.
521+
"""
522+
chunks = await parse_func(*args, **kwargs)
523+
524+
# Create and return a new async generator that processes chunks
525+
collected_output_data = []
526+
collected_function_call = {
527+
"name": "",
528+
"arguments": "",
529+
}
530+
raw_outputs = []
531+
start_time = time.time()
532+
end_time = None
533+
first_token_time = None
534+
num_of_completion_tokens = None
535+
latency = None
536+
try:
537+
i = 0
538+
async for chunk in chunks:
539+
raw_outputs.append(chunk.model_dump())
540+
if i == 0:
541+
first_token_time = time.time()
542+
if i > 0:
543+
num_of_completion_tokens = i + 1
544+
i += 1
545+
546+
delta = chunk.choices[0].delta
547+
548+
if delta.content:
549+
collected_output_data.append(delta.content)
550+
elif delta.function_call:
551+
if delta.function_call.name:
552+
collected_function_call["name"] += delta.function_call.name
553+
if delta.function_call.arguments:
554+
collected_function_call[
555+
"arguments"
556+
] += delta.function_call.arguments
557+
elif delta.tool_calls:
558+
if delta.tool_calls[0].function.name:
559+
collected_function_call["name"] += delta.tool_calls[0].function.name
560+
if delta.tool_calls[0].function.arguments:
561+
collected_function_call["arguments"] += delta.tool_calls[
562+
0
563+
].function.arguments
564+
565+
yield chunk
566+
567+
end_time = time.time()
568+
latency = (end_time - start_time) * 1000
569+
# pylint: disable=broad-except
570+
except Exception as e:
571+
logger.error("Failed yield chunk. %s", e)
572+
finally:
573+
# Try to add step to the trace
574+
try:
575+
collected_output_data = [
576+
message for message in collected_output_data if message is not None
577+
]
578+
if collected_output_data:
579+
output_data = "".join(collected_output_data)
580+
else:
581+
collected_function_call["arguments"] = json.loads(
582+
collected_function_call["arguments"]
583+
)
584+
output_data = collected_function_call
585+
586+
trace_args = create_trace_args(
587+
end_time=end_time,
588+
inputs={"prompt": kwargs["messages"]},
589+
output=output_data,
590+
latency=latency,
591+
tokens=num_of_completion_tokens,
592+
prompt_tokens=0,
593+
completion_tokens=num_of_completion_tokens,
594+
model=kwargs.get("model"),
595+
model_parameters=get_model_parameters(kwargs),
596+
raw_output=raw_outputs,
597+
id=inference_id,
598+
metadata={
599+
"timeToFirstToken": (
600+
(first_token_time - start_time) * 1000
601+
if first_token_time
602+
else None
603+
),
604+
"method": "parse",
605+
"response_format": kwargs.get("response_format"),
606+
},
607+
)
608+
add_to_trace(
609+
**trace_args,
610+
is_azure_openai=is_azure_openai,
611+
)
612+
613+
# pylint: disable=broad-except
614+
except Exception as e:
615+
logger.error(
616+
"Failed to trace the parse chat completion request with Openlayer. %s",
617+
e,
618+
)
619+
620+
621+
async def handle_async_non_streaming_parse(
622+
parse_func: callable,
623+
*args,
624+
is_azure_openai: bool = False,
625+
inference_id: Optional[str] = None,
626+
**kwargs,
627+
) -> Any:
628+
"""Handles the parse method when streaming is disabled.
629+
630+
Parameters
631+
----------
632+
parse_func : callable
633+
The parse method to handle.
634+
is_azure_openai : bool, optional
635+
Whether the client is an Azure OpenAI client, by default False
636+
inference_id : Optional[str], optional
637+
A user-generated inference id, by default None
638+
639+
Returns
640+
-------
641+
Any
642+
The parsed completion response.
643+
"""
644+
start_time = time.time()
645+
response = await parse_func(*args, **kwargs)
646+
end_time = time.time()
647+
648+
# Try to add step to the trace
649+
try:
650+
output_data = parse_structured_output_data(response)
651+
trace_args = create_trace_args(
652+
end_time=end_time,
653+
inputs={"prompt": kwargs["messages"]},
654+
output=output_data,
655+
latency=(end_time - start_time) * 1000,
656+
tokens=response.usage.total_tokens,
657+
prompt_tokens=response.usage.prompt_tokens,
658+
completion_tokens=response.usage.completion_tokens,
659+
model=response.model,
660+
model_parameters=get_model_parameters(kwargs),
661+
raw_output=response.model_dump(),
662+
id=inference_id,
663+
metadata={
664+
"method": "parse",
665+
"response_format": kwargs.get("response_format"),
666+
},
667+
)
668+
669+
add_to_trace(
670+
is_azure_openai=is_azure_openai,
671+
**trace_args,
672+
)
673+
# pylint: disable=broad-except
674+
except Exception as e:
675+
logger.error(
676+
"Failed to trace the parse chat completion request with Openlayer. %s", e
677+
)
678+
679+
return response

0 commit comments

Comments
 (0)