From 7b0c6f4b820aa998fa4a4dd02b62d04868d7a05e Mon Sep 17 00:00:00 2001 From: Vinicius Mello Date: Mon, 10 Nov 2025 10:33:58 -0300 Subject: [PATCH] feat(closes OPEN-7550): add tracing for openai chat completions parse method --- examples/tracing/openai/openai_tracing.ipynb | 25 +- .../lib/integrations/async_openai_tracer.py | 214 +++++++++++++++ .../lib/integrations/openai_tracer.py | 259 ++++++++++++++++++ 3 files changed, 495 insertions(+), 3 deletions(-) diff --git a/examples/tracing/openai/openai_tracing.ipynb b/examples/tracing/openai/openai_tracing.ipynb index a79bae1f..938d52cb 100644 --- a/examples/tracing/openai/openai_tracing.ipynb +++ b/examples/tracing/openai/openai_tracing.ipynb @@ -104,12 +104,31 @@ "id": "abaf6987-c257-4f0d-96e7-3739b24c7206", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "from pydantic import BaseModel\n", + "\n", + "\n", + "class Person(BaseModel):\n", + " name: str\n", + " age: int\n", + " occupation: str\n", + "\n", + "# Parse method automatically returns structured Pydantic object\n", + "completion = openai_client.chat.completions.parse(\n", + " model=\"gpt-4o\",\n", + " messages=[\n", + " {\"role\": \"user\", \"content\": \"Extract: John Doe is 30 years old and works as a software engineer\"}\n", + " ],\n", + " response_format=Person,\n", + ")\n", + "\n", + "completion.choices[0].message.parsed" + ] } ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "base", "language": "python", "name": "python3" }, @@ -123,7 +142,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.18" + "version": "3.12.7" } }, "nbformat": 4, diff --git a/src/openlayer/lib/integrations/async_openai_tracer.py b/src/openlayer/lib/integrations/async_openai_tracer.py index f670fa16..3f36701e 100644 --- a/src/openlayer/lib/integrations/async_openai_tracer.py +++ b/src/openlayer/lib/integrations/async_openai_tracer.py @@ -20,6 +20,7 @@ create_trace_args, add_to_trace, parse_non_streaming_output_data, + parse_structured_output_data, ) logger = logging.getLogger(__name__) @@ -60,6 +61,8 @@ def trace_async_openai( ) is_azure_openai = isinstance(client, openai.AsyncAzureOpenAI) + + # Patch create method create_func = client.chat.completions.create @wraps(create_func) @@ -84,6 +87,34 @@ async def traced_create_func(*args, **kwargs): ) client.chat.completions.create = traced_create_func + + # Patch parse method if it exists + if hasattr(client.chat.completions, 'parse'): + parse_func = client.chat.completions.parse + + @wraps(parse_func) + async def traced_parse_func(*args, **kwargs): + inference_id = kwargs.pop("inference_id", None) + stream = kwargs.get("stream", False) + + if stream: + return handle_async_streaming_parse( + *args, + **kwargs, + parse_func=parse_func, + inference_id=inference_id, + is_azure_openai=is_azure_openai, + ) + return await handle_async_non_streaming_parse( + *args, + **kwargs, + parse_func=parse_func, + inference_id=inference_id, + is_azure_openai=is_azure_openai, + ) + + client.chat.completions.parse = traced_parse_func + return client @@ -259,6 +290,189 @@ async def handle_async_non_streaming_create( except Exception as e: logger.error( "Failed to trace the create chat completion request with Openlayer. %s", e + ) + + return response + + +async def handle_async_streaming_parse( + parse_func: callable, + *args, + is_azure_openai: bool = False, + inference_id: Optional[str] = None, + **kwargs, +) -> AsyncIterator[Any]: + """Handles the parse method when streaming is enabled. + + Parameters + ---------- + parse_func : callable + The parse method to handle. + is_azure_openai : bool, optional + Whether the client is an Azure OpenAI client, by default False + inference_id : Optional[str], optional + A user-generated inference id, by default None + + Returns + ------- + AsyncIterator[Any] + A generator that yields the chunks of the completion. + """ + chunks = await parse_func(*args, **kwargs) + + # Create and return a new async generator that processes chunks + collected_output_data = [] + collected_function_call = { + "name": "", + "arguments": "", + } + raw_outputs = [] + start_time = time.time() + end_time = None + first_token_time = None + num_of_completion_tokens = None + latency = None + try: + i = 0 + async for chunk in chunks: + raw_outputs.append(chunk.model_dump()) + if i == 0: + first_token_time = time.time() + if i > 0: + num_of_completion_tokens = i + 1 + i += 1 + + delta = chunk.choices[0].delta + + if delta.content: + collected_output_data.append(delta.content) + elif delta.function_call: + if delta.function_call.name: + collected_function_call["name"] += delta.function_call.name + if delta.function_call.arguments: + collected_function_call[ + "arguments" + ] += delta.function_call.arguments + elif delta.tool_calls: + if delta.tool_calls[0].function.name: + collected_function_call["name"] += delta.tool_calls[0].function.name + if delta.tool_calls[0].function.arguments: + collected_function_call["arguments"] += delta.tool_calls[ + 0 + ].function.arguments + + yield chunk + + end_time = time.time() + latency = (end_time - start_time) * 1000 + # pylint: disable=broad-except + except Exception as e: + logger.error("Failed yield chunk. %s", e) + finally: + # Try to add step to the trace + try: + collected_output_data = [ + message for message in collected_output_data if message is not None + ] + if collected_output_data: + output_data = "".join(collected_output_data) + else: + collected_function_call["arguments"] = json.loads( + collected_function_call["arguments"] + ) + output_data = collected_function_call + + trace_args = create_trace_args( + end_time=end_time, + inputs={"prompt": kwargs["messages"]}, + output=output_data, + latency=latency, + tokens=num_of_completion_tokens, + prompt_tokens=0, + completion_tokens=num_of_completion_tokens, + model=kwargs.get("model"), + model_parameters=get_model_parameters(kwargs), + raw_output=raw_outputs, + id=inference_id, + metadata={ + "timeToFirstToken": ( + (first_token_time - start_time) * 1000 + if first_token_time + else None + ), + "method": "parse", + "response_format": kwargs.get("response_format"), + }, + ) + add_to_trace( + **trace_args, + is_azure_openai=is_azure_openai, + ) + + # pylint: disable=broad-except + except Exception as e: + logger.error( + "Failed to trace the parse chat completion request with Openlayer. %s", + e, + ) + + +async def handle_async_non_streaming_parse( + parse_func: callable, + *args, + is_azure_openai: bool = False, + inference_id: Optional[str] = None, + **kwargs, +) -> Any: + """Handles the parse method when streaming is disabled. + + Parameters + ---------- + parse_func : callable + The parse method to handle. + is_azure_openai : bool, optional + Whether the client is an Azure OpenAI client, by default False + inference_id : Optional[str], optional + A user-generated inference id, by default None + + Returns + ------- + Any + The parsed completion response. + """ + start_time = time.time() + response = await parse_func(*args, **kwargs) + end_time = time.time() + + # Try to add step to the trace + try: + output_data = parse_structured_output_data(response) + trace_args = create_trace_args( + end_time=end_time, + inputs={"prompt": kwargs["messages"]}, + output=output_data, + latency=(end_time - start_time) * 1000, + tokens=response.usage.total_tokens, + prompt_tokens=response.usage.prompt_tokens, + completion_tokens=response.usage.completion_tokens, + model=response.model, + model_parameters=get_model_parameters(kwargs), + raw_output=response.model_dump(), + id=inference_id, + metadata={ + "method": "parse", + "response_format": kwargs.get("response_format"), + }, + ) + + add_to_trace( + is_azure_openai=is_azure_openai, + **trace_args, + ) + # pylint: disable=broad-except + except Exception as e: + logger.error( + "Failed to trace the parse chat completion request with Openlayer. %s", e ) return response diff --git a/src/openlayer/lib/integrations/openai_tracer.py b/src/openlayer/lib/integrations/openai_tracer.py index 0c787aa2..21770e95 100644 --- a/src/openlayer/lib/integrations/openai_tracer.py +++ b/src/openlayer/lib/integrations/openai_tracer.py @@ -55,6 +55,8 @@ def trace_openai( ) is_azure_openai = isinstance(client, openai.AzureOpenAI) + + # Patch create method create_func = client.chat.completions.create @wraps(create_func) @@ -79,6 +81,34 @@ def traced_create_func(*args, **kwargs): ) client.chat.completions.create = traced_create_func + + # Patch parse method if it exists + if hasattr(client.chat.completions, 'parse'): + parse_func = client.chat.completions.parse + + @wraps(parse_func) + def traced_parse_func(*args, **kwargs): + inference_id = kwargs.pop("inference_id", None) + stream = kwargs.get("stream", False) + + if stream: + return handle_streaming_parse( + *args, + **kwargs, + parse_func=parse_func, + inference_id=inference_id, + is_azure_openai=is_azure_openai, + ) + return handle_non_streaming_parse( + *args, + **kwargs, + parse_func=parse_func, + inference_id=inference_id, + is_azure_openai=is_azure_openai, + ) + + client.chat.completions.parse = traced_parse_func + return client @@ -368,6 +398,235 @@ def parse_non_streaming_output_data( return output_data +def handle_streaming_parse( + parse_func: callable, + *args, + is_azure_openai: bool = False, + inference_id: Optional[str] = None, + **kwargs, +) -> Iterator[Any]: + """Handles the parse method when streaming is enabled. + + Parameters + ---------- + parse_func : callable + The parse method to handle. + is_azure_openai : bool, optional + Whether the client is an Azure OpenAI client, by default False + inference_id : Optional[str], optional + A user-generated inference id, by default None + + Returns + ------- + Iterator[Any] + A generator that yields the chunks of the completion. + """ + chunks = parse_func(*args, **kwargs) + return stream_parse_chunks( + chunks=chunks, + kwargs=kwargs, + inference_id=inference_id, + is_azure_openai=is_azure_openai, + ) + + +def stream_parse_chunks( + chunks: Iterator[Any], + kwargs: Dict[str, any], + is_azure_openai: bool = False, + inference_id: Optional[str] = None, +): + """Streams the chunks of the parse completion and traces the completion.""" + collected_output_data = [] + collected_function_call = { + "name": "", + "arguments": "", + } + raw_outputs = [] + start_time = time.time() + end_time = None + first_token_time = None + num_of_completion_tokens = None + latency = None + try: + i = 0 + for i, chunk in enumerate(chunks): + raw_outputs.append(chunk.model_dump()) + if i == 0: + first_token_time = time.time() + if i > 0: + num_of_completion_tokens = i + 1 + + delta = chunk.choices[0].delta + + if delta.content: + collected_output_data.append(delta.content) + elif delta.function_call: + if delta.function_call.name: + collected_function_call["name"] += delta.function_call.name + if delta.function_call.arguments: + collected_function_call[ + "arguments" + ] += delta.function_call.arguments + elif delta.tool_calls: + if delta.tool_calls[0].function.name: + collected_function_call["name"] += delta.tool_calls[0].function.name + if delta.tool_calls[0].function.arguments: + collected_function_call["arguments"] += delta.tool_calls[ + 0 + ].function.arguments + + yield chunk + end_time = time.time() + latency = (end_time - start_time) * 1000 + # pylint: disable=broad-except + except Exception as e: + logger.error("Failed yield chunk. %s", e) + finally: + # Try to add step to the trace + try: + collected_output_data = [ + message for message in collected_output_data if message is not None + ] + if collected_output_data: + output_data = "".join(collected_output_data) + else: + collected_function_call["arguments"] = json.loads( + collected_function_call["arguments"] + ) + output_data = collected_function_call + + trace_args = create_trace_args( + end_time=end_time, + inputs={"prompt": kwargs["messages"]}, + output=output_data, + latency=latency, + tokens=num_of_completion_tokens, + prompt_tokens=0, + completion_tokens=num_of_completion_tokens, + model=kwargs.get("model"), + model_parameters=get_model_parameters(kwargs), + raw_output=raw_outputs, + id=inference_id, + metadata={ + "timeToFirstToken": ( + (first_token_time - start_time) * 1000 + if first_token_time + else None + ), + "method": "parse", + "response_format": kwargs.get("response_format"), + }, + ) + add_to_trace( + **trace_args, + is_azure_openai=is_azure_openai, + ) + + # pylint: disable=broad-except + except Exception as e: + logger.error( + "Failed to trace the parse chat completion request with Openlayer. %s", + e, + ) + + +def handle_non_streaming_parse( + parse_func: callable, + *args, + is_azure_openai: bool = False, + inference_id: Optional[str] = None, + **kwargs, +) -> Any: + """Handles the parse method when streaming is disabled. + + Parameters + ---------- + parse_func : callable + The parse method to handle. + is_azure_openai : bool, optional + Whether the client is an Azure OpenAI client, by default False + inference_id : Optional[str], optional + A user-generated inference id, by default None + + Returns + ------- + Any + The parsed completion response. + """ + start_time = time.time() + response = parse_func(*args, **kwargs) + end_time = time.time() + + # Try to add step to the trace + try: + output_data = parse_structured_output_data(response) + trace_args = create_trace_args( + end_time=end_time, + inputs={"prompt": kwargs["messages"]}, + output=output_data, + latency=(end_time - start_time) * 1000, + tokens=response.usage.total_tokens, + prompt_tokens=response.usage.prompt_tokens, + completion_tokens=response.usage.completion_tokens, + model=response.model, + model_parameters=get_model_parameters(kwargs), + raw_output=response.model_dump(), + id=inference_id, + metadata={ + "method": "parse", + "response_format": kwargs.get("response_format"), + }, + ) + + add_to_trace( + is_azure_openai=is_azure_openai, + **trace_args, + ) + # pylint: disable=broad-except + except Exception as e: + logger.error( + "Failed to trace the parse chat completion request with Openlayer. %s", e + ) + + return response + + +def parse_structured_output_data(response: Any) -> Union[str, Dict[str, Any], None]: + """Parses the structured output data from a parse method completion. + + Parameters + ---------- + response : Any + The parse method completion response. + + Returns + ------- + Union[str, Dict[str, Any], None] + The parsed structured output data. + """ + try: + # Check if response has parsed structured data + if hasattr(response, 'parsed') and response.parsed is not None: + # Handle Pydantic models + if hasattr(response.parsed, 'model_dump'): + return response.parsed.model_dump() + # Handle dict-like objects + elif hasattr(response.parsed, '__dict__'): + return response.parsed.__dict__ + # Handle other structured formats + else: + return response.parsed + + # Fallback to regular message content parsing + return parse_non_streaming_output_data(response) + + except Exception as e: + logger.error("Failed to parse structured output data: %s", e) + # Final fallback to regular parsing + return parse_non_streaming_output_data(response) + + # --------------------------- OpenAI Assistants API -------------------------- # def trace_openai_assistant_thread_run( client: "openai.OpenAI", run: "openai.types.beta.threads.run.Run"