Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,65 @@ asyncio.run(main())

Functionality between the synchronous and asynchronous clients is otherwise identical.

## LLM Tracing

Openlayer provides automatic tracing for popular LLM providers, enabling you to monitor model performance, token usage, and response quality.

### OpenAI Tracing

Trace OpenAI chat completions (including the new structured output `parse` method) with automatic monitoring:

```python
import openai
from openlayer.lib import trace_openai

# Trace your OpenAI client
client = trace_openai(openai.OpenAI())

# Use normally - both create and parse methods are automatically traced
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello!"}]
)

# NEW: Parse method support for structured outputs
from pydantic import BaseModel

class Person(BaseModel):
name: str
age: int

structured_response = client.chat.completions.parse(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Extract: John Doe, 30 years old"}],
response_format=Person
)
```

**What gets traced:**
- Input messages and model parameters
- Response content (structured data for parse method)
- Token usage and latency metrics
- Raw API responses for debugging
- Custom inference IDs for request tracking

### Other LLM Providers

```python
from openlayer.lib import trace_anthropic, trace_mistral, trace_groq

# Anthropic
anthropic_client = trace_anthropic(anthropic.Anthropic())

# Mistral
mistral_client = trace_mistral(mistralai.Mistral())

# Groq
groq_client = trace_groq(groq.Groq())
```

See the [examples directory](examples/tracing/) for comprehensive tracing examples with all supported providers.

### With aiohttp

By default, the async client uses `httpx` for HTTP requests. However, for improved concurrency performance you may also use `aiohttp` as the HTTP backend.
Expand Down
25 changes: 22 additions & 3 deletions examples/tracing/openai/openai_tracing.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,31 @@
"id": "abaf6987-c257-4f0d-96e7-3739b24c7206",
"metadata": {},
"outputs": [],
"source": []
"source": [
"from pydantic import BaseModel\n",
"\n",
"\n",
"class Person(BaseModel):\n",
" name: str\n",
" age: int\n",
" occupation: str\n",
"\n",
"# Parse method automatically returns structured Pydantic object\n",
"completion = openai_client.chat.completions.parse(\n",
" model=\"gpt-4o\",\n",
" messages=[\n",
" {\"role\": \"user\", \"content\": \"Extract: John Doe is 30 years old and works as a software engineer\"}\n",
" ],\n",
" response_format=Person,\n",
")\n",
"\n",
"completion.choices[0].message.parsed"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "base",
"language": "python",
"name": "python3"
},
Expand All @@ -123,7 +142,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.18"
"version": "3.12.7"
}
},
"nbformat": 4,
Expand Down
214 changes: 214 additions & 0 deletions src/openlayer/lib/integrations/async_openai_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
create_trace_args,
add_to_trace,
parse_non_streaming_output_data,
parse_structured_output_data,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,6 +61,8 @@ def trace_async_openai(
)

is_azure_openai = isinstance(client, openai.AsyncAzureOpenAI)

# Patch create method
create_func = client.chat.completions.create

@wraps(create_func)
Expand All @@ -84,6 +87,34 @@ async def traced_create_func(*args, **kwargs):
)

client.chat.completions.create = traced_create_func

# Patch parse method if it exists
if hasattr(client.chat.completions, 'parse'):
parse_func = client.chat.completions.parse

@wraps(parse_func)
async def traced_parse_func(*args, **kwargs):
inference_id = kwargs.pop("inference_id", None)
stream = kwargs.get("stream", False)

if stream:
return handle_async_streaming_parse(
*args,
**kwargs,
parse_func=parse_func,
inference_id=inference_id,
is_azure_openai=is_azure_openai,
)
return await handle_async_non_streaming_parse(
*args,
**kwargs,
parse_func=parse_func,
inference_id=inference_id,
is_azure_openai=is_azure_openai,
)

client.chat.completions.parse = traced_parse_func

return client


Expand Down Expand Up @@ -259,6 +290,189 @@ async def handle_async_non_streaming_create(
except Exception as e:
logger.error(
"Failed to trace the create chat completion request with Openlayer. %s", e
)

return response


async def handle_async_streaming_parse(
parse_func: callable,
*args,
is_azure_openai: bool = False,
inference_id: Optional[str] = None,
**kwargs,
) -> AsyncIterator[Any]:
"""Handles the parse method when streaming is enabled.

Parameters
----------
parse_func : callable
The parse method to handle.
is_azure_openai : bool, optional
Whether the client is an Azure OpenAI client, by default False
inference_id : Optional[str], optional
A user-generated inference id, by default None

Returns
-------
AsyncIterator[Any]
A generator that yields the chunks of the completion.
"""
chunks = await parse_func(*args, **kwargs)

# Create and return a new async generator that processes chunks
collected_output_data = []
collected_function_call = {
"name": "",
"arguments": "",
}
raw_outputs = []
start_time = time.time()
end_time = None
first_token_time = None
num_of_completion_tokens = None
latency = None
try:
i = 0
async for chunk in chunks:
raw_outputs.append(chunk.model_dump())
if i == 0:
first_token_time = time.time()
if i > 0:
num_of_completion_tokens = i + 1
i += 1

delta = chunk.choices[0].delta

if delta.content:
collected_output_data.append(delta.content)
elif delta.function_call:
if delta.function_call.name:
collected_function_call["name"] += delta.function_call.name
if delta.function_call.arguments:
collected_function_call[
"arguments"
] += delta.function_call.arguments
elif delta.tool_calls:
if delta.tool_calls[0].function.name:
collected_function_call["name"] += delta.tool_calls[0].function.name
if delta.tool_calls[0].function.arguments:
collected_function_call["arguments"] += delta.tool_calls[
0
].function.arguments

yield chunk

end_time = time.time()
latency = (end_time - start_time) * 1000
# pylint: disable=broad-except
except Exception as e:
logger.error("Failed yield chunk. %s", e)
finally:
# Try to add step to the trace
try:
collected_output_data = [
message for message in collected_output_data if message is not None
]
if collected_output_data:
output_data = "".join(collected_output_data)
else:
collected_function_call["arguments"] = json.loads(
collected_function_call["arguments"]
)
output_data = collected_function_call

trace_args = create_trace_args(
end_time=end_time,
inputs={"prompt": kwargs["messages"]},
output=output_data,
latency=latency,
tokens=num_of_completion_tokens,
prompt_tokens=0,
completion_tokens=num_of_completion_tokens,
model=kwargs.get("model"),
model_parameters=get_model_parameters(kwargs),
raw_output=raw_outputs,
id=inference_id,
metadata={
"timeToFirstToken": (
(first_token_time - start_time) * 1000
if first_token_time
else None
),
"method": "parse",
"response_format": kwargs.get("response_format"),
},
)
add_to_trace(
**trace_args,
is_azure_openai=is_azure_openai,
)

# pylint: disable=broad-except
except Exception as e:
logger.error(
"Failed to trace the parse chat completion request with Openlayer. %s",
e,
)


async def handle_async_non_streaming_parse(
parse_func: callable,
*args,
is_azure_openai: bool = False,
inference_id: Optional[str] = None,
**kwargs,
) -> Any:
"""Handles the parse method when streaming is disabled.

Parameters
----------
parse_func : callable
The parse method to handle.
is_azure_openai : bool, optional
Whether the client is an Azure OpenAI client, by default False
inference_id : Optional[str], optional
A user-generated inference id, by default None

Returns
-------
Any
The parsed completion response.
"""
start_time = time.time()
response = await parse_func(*args, **kwargs)
end_time = time.time()

# Try to add step to the trace
try:
output_data = parse_structured_output_data(response)
trace_args = create_trace_args(
end_time=end_time,
inputs={"prompt": kwargs["messages"]},
output=output_data,
latency=(end_time - start_time) * 1000,
tokens=response.usage.total_tokens,
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
model=response.model,
model_parameters=get_model_parameters(kwargs),
raw_output=response.model_dump(),
id=inference_id,
metadata={
"method": "parse",
"response_format": kwargs.get("response_format"),
},
)

add_to_trace(
is_azure_openai=is_azure_openai,
**trace_args,
)
# pylint: disable=broad-except
except Exception as e:
logger.error(
"Failed to trace the parse chat completion request with Openlayer. %s", e
)

return response
Loading