Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
64f11d2
initial commit for asyncio
v1r3n Nov 9, 2025
b2de890
token refresh logic
v1r3n Nov 9, 2025
999e4e7
batch polling and batching support
v1r3n Nov 9, 2025
87883e3
more updates
v1r3n Nov 10, 2025
8c0cedc
Update requirements.txt
v1r3n Nov 10, 2025
c9c5172
Update pyproject.toml
v1r3n Nov 10, 2025
4174b52
Update poetry.lock
v1r3n Nov 10, 2025
a3f2efb
more
v1r3n Nov 10, 2025
5c77310
logging
v1r3n Nov 10, 2025
3033e1e
more tests
v1r3n Nov 10, 2025
b312836
Update test_metrics_collector_events.py
v1r3n Nov 10, 2025
3d78c38
fix tests
v1r3n Nov 10, 2025
da3daeb
remove deprecation warnings
v1r3n Nov 10, 2025
340a2fd
test fixes
v1r3n Nov 10, 2025
8b2fb1b
Create test_task_runner_asyncio_coverage.py
v1r3n Nov 11, 2025
1c6bd6b
tests
v1r3n Nov 11, 2025
a2ba557
Update test_task_handler_coverage.py
v1r3n Nov 11, 2025
ca946f2
Update test_task_handler_coverage.py
v1r3n Nov 11, 2025
ec4b411
Update test_api_metrics.py
v1r3n Nov 11, 2025
0a19e31
Update test_task_runner.py
v1r3n Nov 11, 2025
9338435
Update test_task_handler_asyncio.py
v1r3n Nov 11, 2025
874a46e
Update pull_request.yml
v1r3n Nov 11, 2025
93567b1
Update test_api_metrics.py
v1r3n Nov 11, 2025
a27b1d2
Update test_task_runner_asyncio_concurrency.py
v1r3n Nov 11, 2025
173aa16
fix
v1r3n Nov 12, 2025
6ed3f82
tests
v1r3n Nov 12, 2025
2d3c7be
Update test_task_handler_coverage.py
v1r3n Nov 12, 2025
74cb99f
Update test_task_handler_coverage.py
v1r3n Nov 12, 2025
f914402
tests
v1r3n Nov 12, 2025
26564fc
asyncio loop
v1r3n Nov 20, 2025
23b6cc9
docs and tests
v1r3n Nov 20, 2025
5af2250
Delete ASYNCIO_TEST_COVERAGE.md
v1r3n Nov 20, 2025
9f0ba20
asyncio clean up
v1r3n Nov 21, 2025
ffbeb98
Create LEASE_EXTENSION.md
v1r3n Nov 21, 2025
d2f8b69
retries
v1r3n Nov 21, 2025
c674629
clean up
v1r3n Nov 21, 2025
1ef6929
more
v1r3n Nov 21, 2025
a800e2c
tests
v1r3n Nov 21, 2025
bd49baa
Update test_worker_async_performance.py
v1r3n Nov 21, 2025
af26e7e
docs
v1r3n Nov 22, 2025
db87528
fixes
v1r3n Nov 22, 2025
f2f2918
Delete test_worker_async_performance.py
v1r3n Nov 22, 2025
455137b
listeners and fixes
v1r3n Nov 22, 2025
baf78dc
fixes
v1r3n Nov 23, 2025
7d35dc5
Update task_runner.py
v1r3n Nov 23, 2025
7b01246
more
v1r3n Nov 23, 2025
a293352
fixes
v1r3n Nov 23, 2025
14c6b7c
remove deprecation notices
v1r3n Nov 23, 2025
cd88e9f
Update test_workflows.py
v1r3n Nov 23, 2025
611797f
Update test_worker_coverage.py
v1r3n Nov 23, 2025
108651b
Update task_runner.py
v1r3n Nov 23, 2025
bd6321a
Update test_task_handler_coverage.py
v1r3n Nov 23, 2025
f78ef94
Update test_task_handler_coverage.py
v1r3n Nov 23, 2025
a18afef
Delete test_task_runner_async.py
v1r3n Nov 23, 2025
e71d824
fixes
v1r3n Nov 23, 2025
d18f162
fixes
v1r3n Nov 23, 2025
e6ed229
Update test_metrics_collector.py
v1r3n Nov 23, 2025
7a56f98
docs
v1r3n Nov 23, 2025
9bec5d4
documentation
v1r3n Nov 24, 2025
5f6f6ee
updates to the documentation
v1r3n Nov 24, 2025
78100b2
Automate build python sdk with version from the github release (#367)…
v1r3n Nov 24, 2025
9b93f7d
updates
v1r3n Nov 29, 2025
a7e48f5
docs
v1r3n Nov 29, 2025
866a76c
tests
v1r3n Nov 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
416 changes: 416 additions & 0 deletions ASYNCIO_TEST_COVERAGE.md

Large diffs are not rendered by default.

331 changes: 331 additions & 0 deletions METRICS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,331 @@
# Metrics Documentation

The Conductor Python SDK includes built-in metrics collection using Prometheus to monitor worker performance, API requests, and task execution.

## Table of Contents

- [Quick Reference](#quick-reference)
- [Configuration](#configuration)
- [Metric Types](#metric-types)
- [Examples](#examples)

## Quick Reference

| Metric Name | Type | Labels | Description |
|------------|------|--------|-------------|
| `api_request_time_seconds` | Timer (quantile gauge) | `method`, `uri`, `status`, `quantile` | API request latency to Conductor server |
| `api_request_time_seconds_count` | Gauge | `method`, `uri`, `status` | Total number of API requests |
| `api_request_time_seconds_sum` | Gauge | `method`, `uri`, `status` | Total time spent in API requests |
| `task_poll_total` | Counter | `taskType` | Number of task poll attempts |
| `task_poll_time` | Gauge | `taskType` | Most recent poll duration (legacy) |
| `task_poll_time_seconds` | Timer (quantile gauge) | `taskType`, `status`, `quantile` | Task poll latency distribution |
| `task_poll_time_seconds_count` | Gauge | `taskType`, `status` | Total number of poll attempts by status |
| `task_poll_time_seconds_sum` | Gauge | `taskType`, `status` | Total time spent polling |
| `task_execute_time` | Gauge | `taskType` | Most recent execution duration (legacy) |
| `task_execute_time_seconds` | Timer (quantile gauge) | `taskType`, `status`, `quantile` | Task execution latency distribution |
| `task_execute_time_seconds_count` | Gauge | `taskType`, `status` | Total number of task executions by status |
| `task_execute_time_seconds_sum` | Gauge | `taskType`, `status` | Total time spent executing tasks |
| `task_execute_error_total` | Counter | `taskType`, `exception` | Number of task execution errors |
| `task_update_time_seconds` | Timer (quantile gauge) | `taskType`, `status`, `quantile` | Task update latency distribution |
| `task_update_time_seconds_count` | Gauge | `taskType`, `status` | Total number of task updates by status |
| `task_update_time_seconds_sum` | Gauge | `taskType`, `status` | Total time spent updating tasks |
| `task_update_error_total` | Counter | `taskType`, `exception` | Number of task update errors |
| `task_result_size` | Gauge | `taskType` | Size of task result payload (bytes) |
| `task_execution_queue_full_total` | Counter | `taskType` | Number of times execution queue was full |
| `task_paused_total` | Counter | `taskType` | Number of polls while worker paused |
| `external_payload_used_total` | Counter | `taskType`, `payloadType` | External payload storage usage count |
| `workflow_input_size` | Gauge | `workflowType`, `version` | Workflow input payload size (bytes) |
| `workflow_start_error_total` | Counter | `workflowType`, `exception` | Workflow start error count |

### Label Values

**`status`**: `SUCCESS`, `FAILURE`
**`method`**: `GET`, `POST`, `PUT`, `DELETE`
**`uri`**: API endpoint path (e.g., `/tasks/poll/batch/{taskType}`, `/tasks/update-v2`)
**`status` (HTTP)**: HTTP response code (`200`, `401`, `404`, `500`) or `error`
**`quantile`**: `0.5` (p50), `0.75` (p75), `0.9` (p90), `0.95` (p95), `0.99` (p99)
**`payloadType`**: `input`, `output`
**`exception`**: Exception type or error message

### Example Metrics Output

```prometheus
# API Request Metrics
api_request_time_seconds{method="GET",uri="/tasks/poll/batch/myTask",status="200",quantile="0.5"} 0.112
api_request_time_seconds{method="GET",uri="/tasks/poll/batch/myTask",status="200",quantile="0.99"} 0.245
api_request_time_seconds_count{method="GET",uri="/tasks/poll/batch/myTask",status="200"} 1000.0
api_request_time_seconds_sum{method="GET",uri="/tasks/poll/batch/myTask",status="200"} 114.5

# Task Poll Metrics
task_poll_total{taskType="myTask"} 10264.0
task_poll_time_seconds{taskType="myTask",status="SUCCESS",quantile="0.95"} 0.025
task_poll_time_seconds_count{taskType="myTask",status="SUCCESS"} 1000.0
task_poll_time_seconds_count{taskType="myTask",status="FAILURE"} 95.0

# Task Execution Metrics
task_execute_time_seconds{taskType="myTask",status="SUCCESS",quantile="0.99"} 0.017
task_execute_time_seconds_count{taskType="myTask",status="SUCCESS"} 120.0
task_execute_error_total{taskType="myTask",exception="TimeoutError"} 3.0

# Task Update Metrics
task_update_time_seconds{taskType="myTask",status="SUCCESS",quantile="0.95"} 0.096
task_update_time_seconds_count{taskType="myTask",status="SUCCESS"} 15.0
```

## Configuration

### Enabling Metrics

Metrics are enabled by providing a `MetricsSettings` object when creating a `TaskHandler`:

```python
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.automator.task_handler import TaskHandler

# Configure metrics
metrics_settings = MetricsSettings(
directory='/path/to/metrics', # Directory where metrics file will be written
file_name='conductor_metrics.prom', # Metrics file name (default: 'conductor_metrics.prom')
update_interval=10 # Update interval in seconds (default: 10)
)

# Configure Conductor connection
api_config = Configuration(
server_api_url='http://localhost:8080/api',
debug=False
)

# Create task handler with metrics
with TaskHandler(
configuration=api_config,
metrics_settings=metrics_settings,
workers=[...]
) as task_handler:
task_handler.start_processes()
```

### AsyncIO Workers

For AsyncIO-based workers:

```python
from conductor.client.automator.task_handler_asyncio import TaskHandlerAsyncIO

async with TaskHandlerAsyncIO(
configuration=api_config,
metrics_settings=metrics_settings,
scan_for_annotated_workers=True,
import_modules=['your_module']
) as task_handler:
await task_handler.start()
```

### Metrics File Cleanup

For multiprocess workers using Prometheus multiprocess mode, clean the metrics directory on startup to avoid stale data:

```python
import os
import shutil

metrics_dir = '/path/to/metrics'
if os.path.exists(metrics_dir):
shutil.rmtree(metrics_dir)
os.makedirs(metrics_dir, exist_ok=True)

metrics_settings = MetricsSettings(
directory=metrics_dir,
file_name='conductor_metrics.prom',
update_interval=10
)
```


## Metric Types

### Quantile Gauges (Timers)

All timing metrics use quantile gauges to track latency distribution:

- **Quantile labels**: Each metric includes 5 quantiles (p50, p75, p90, p95, p99)
- **Count suffix**: `{metric_name}_count` tracks total number of observations
- **Sum suffix**: `{metric_name}_sum` tracks total time spent

**Example calculation (average):**
```
average = task_poll_time_seconds_sum / task_poll_time_seconds_count
average = 18.75 / 1000.0 = 0.01875 seconds
```

**Why quantiles instead of histograms?**
- More accurate percentile tracking with sliding window (last 1000 observations)
- No need to pre-configure bucket boundaries
- Lower memory footprint
- Direct percentile values without interpolation

### Sliding Window

Quantile metrics use a sliding window of the last 1000 observations to calculate percentiles. This provides:
- Recent performance data (not cumulative)
- Accurate percentile estimation
- Bounded memory usage

## Examples

### Querying Metrics with PromQL

**Average API request latency:**
```promql
rate(api_request_time_seconds_sum[5m]) / rate(api_request_time_seconds_count[5m])
```

**API error rate:**
```promql
sum(rate(api_request_time_seconds_count{status=~"4..|5.."}[5m]))
/
sum(rate(api_request_time_seconds_count[5m]))
```

**Task poll success rate:**
```promql
sum(rate(task_poll_time_seconds_count{status="SUCCESS"}[5m]))
/
sum(rate(task_poll_time_seconds_count[5m]))
```

**p95 task execution time:**
```promql
task_execute_time_seconds{quantile="0.95"}
```

**Slowest API endpoints (p99):**
```promql
topk(10, api_request_time_seconds{quantile="0.99"})
```

### Complete Example

```python
import os
import shutil
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.metrics_settings import MetricsSettings
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.worker.worker_interface import WorkerInterface

# Clean metrics directory
metrics_dir = os.path.join(os.path.expanduser('~'), 'conductor_metrics')
if os.path.exists(metrics_dir):
shutil.rmtree(metrics_dir)
os.makedirs(metrics_dir, exist_ok=True)

# Configure metrics
metrics_settings = MetricsSettings(
directory=metrics_dir,
file_name='conductor_metrics.prom',
update_interval=10 # Update file every 10 seconds
)

# Configure Conductor
api_config = Configuration(
server_api_url='http://localhost:8080/api',
debug=False
)

# Define worker
class MyWorker(WorkerInterface):
def execute(self, task):
return {'status': 'completed'}

def get_task_definition_name(self):
return 'my_task'

# Start with metrics
with TaskHandler(
configuration=api_config,
metrics_settings=metrics_settings,
workers=[MyWorker()]
) as task_handler:
task_handler.start_processes()
```

### Scraping with Prometheus

Configure Prometheus to scrape the metrics file:

```yaml
# prometheus.yml
scrape_configs:
- job_name: 'conductor-python-sdk'
static_configs:
- targets: ['localhost:8000'] # Use file_sd or custom exporter
metric_relabel_configs:
- source_labels: [taskType]
target_label: task_type
```

**Note:** Since metrics are written to a file, you'll need to either:
1. Use Prometheus's `textfile` collector with Node Exporter
2. Create a simple HTTP server to expose the metrics file
3. Use a custom exporter to read and serve the file

### Example HTTP Metrics Server

```python
from http.server import HTTPServer, SimpleHTTPRequestHandler
import os

class MetricsHandler(SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == '/metrics':
metrics_file = '/path/to/conductor_metrics.prom'
if os.path.exists(metrics_file):
with open(metrics_file, 'rb') as f:
content = f.read()
self.send_response(200)
self.send_header('Content-Type', 'text/plain; version=0.0.4')
self.end_headers()
self.wfile.write(content)
else:
self.send_response(404)
self.end_headers()
else:
self.send_response(404)
self.end_headers()

# Run server
httpd = HTTPServer(('0.0.0.0', 8000), MetricsHandler)
httpd.serve_forever()
```

## Best Practices

1. **Clean metrics directory on startup** to avoid stale multiprocess metrics
2. **Monitor disk space** as metrics files can grow with many task types
3. **Use appropriate update_interval** (10-60 seconds recommended)
4. **Set up alerts** on error rates and high latencies
5. **Monitor queue saturation** (`task_execution_queue_full_total`) for backpressure
6. **Track API errors** by status code to identify authentication or server issues
7. **Use p95/p99 latencies** for SLO monitoring rather than averages

## Troubleshooting

### Metrics file is empty
- Ensure `MetricsCollector` is registered as an event listener
- Check that workers are actually polling and executing tasks
- Verify the metrics directory has write permissions

### Stale metrics after restart
- Clean the metrics directory on startup (see Configuration section)
- Prometheus's `multiprocess` mode requires cleanup between runs

### High memory usage
- Reduce the sliding window size (default: 1000 observations)
- Increase `update_interval` to write less frequently
- Limit the number of unique label combinations

### Missing metrics
- Verify `metrics_settings` is passed to TaskHandler/TaskHandlerAsyncIO
- Check that the SDK version supports the metric you're looking for
- Ensure workers are properly registered and running
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ export CONDUCTOR_SERVER_URL=https://[cluster-name].orkesconductor.io/api
- If you want to run the workflow on the Orkes Conductor Playground, set the Conductor Server variable as follows:

```shell
export CONDUCTOR_SERVER_URL=https://play.orkes.io/api
export CONDUCTOR_SERVER_URL=https://developer.orkescloud.com/api
```

- Orkes Conductor requires authentication. [Obtain the key and secret from the Conductor server](https://orkes.io/content/how-to-videos/access-key-and-secret) and set the following environment variables.
Expand Down Expand Up @@ -562,7 +562,7 @@ def send_email(email: str, subject: str, body: str):
def main():

# defaults to reading the configuration using following env variables
# CONDUCTOR_SERVER_URL : conductor server e.g. https://play.orkes.io/api
# CONDUCTOR_SERVER_URL : conductor server e.g. https://developer.orkescloud.com/api
# CONDUCTOR_AUTH_KEY : API Authentication Key
# CONDUCTOR_AUTH_SECRET: API Auth Secret
api_config = Configuration()
Expand Down
Loading