Skip to content

Conversation

@v1r3n
Copy link
Contributor

@v1r3n v1r3n commented Nov 9, 2025

Worker Configuration, Event-Driven Observability & Metrics

Overview

Introduces event-driven observability with Prometheus metrics, hierarchical worker configuration, runtime pausing, and startup logging.

Key Features

1. Event-Driven Observability

Zero-coupling architecture for metrics and monitoring:

class CustomListener:
    def on_task_execution_completed(self, event: TaskExecutionCompleted):
        statsd.timing(f'task.{event.task_type}', event.duration_ms)

TaskHandler(event_listeners=[CustomListener(), MetricsCollector()])

Benefits: Multiple backends (Prometheus, DataDog, custom), protocol-based, non-blocking

2. Built-in Prometheus Metrics

HTTP server with automatic multiprocess aggregation:

metrics_settings = MetricsSettings(http_port=8000)
# Access: curl http://localhost:8000/metrics

Metrics: API latency (p50-p99), task execution time, error rates, queue saturation

3. Worker Configuration

Single-line startup logging + hierarchical env overrides:

# Log output
INFO - Conductor Worker[name=task, status=active, poll_interval=500ms, domain=prod]

# Environment config (overrides code)
export conductor.worker.all.domain=production
export conductor.worker.critical_task.thread_count=50

4. Runtime Worker Pausing

Environment-only control (no code changes):

export conductor.worker.all.paused=true  # Maintenance mode
export conductor.worker.task_name.paused=true  # Specific worker

5. Async functions can now be used with @worker_task annotations

When a function is marked as async, they are executed using background asyncio event loop.

Related Documentation

@codecov
Copy link

codecov bot commented Nov 9, 2025

Codecov Report

❌ Patch coverage is 80.09479% with 294 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
.../conductor/client/automator/task_runner_asyncio.py 80.90% 105 Missing ⚠️
src/conductor/client/worker/worker_loader.py 0.00% 94 Missing ⚠️
...conductor/client/automator/task_handler_asyncio.py 78.45% 39 Missing ⚠️
...rc/conductor/client/telemetry/metrics_collector.py 82.50% 21 Missing ⚠️
src/conductor/client/event/listeners.py 67.64% 11 Missing ⚠️
src/conductor/client/event/listener_register.py 66.66% 10 Missing ⚠️
src/conductor/client/worker/worker_config.py 89.55% 7 Missing ⚠️
src/conductor/client/context/task_context.py 95.00% 3 Missing ⚠️
src/conductor/client/event/event_dispatcher.py 95.55% 2 Missing ⚠️
src/conductor/client/workflow/task/task.py 0.00% 2 Missing ⚠️
Files with missing lines Coverage Δ
src/conductor/client/automator/task_handler.py 97.56% <100.00%> (+32.22%) ⬆️
src/conductor/client/automator/task_runner.py 100.00% <100.00%> (+20.27%) ⬆️
src/conductor/client/event/conductor_event.py 100.00% <100.00%> (ø)
src/conductor/client/event/task_events.py 100.00% <100.00%> (ø)
src/conductor/client/event/task_runner_events.py 100.00% <100.00%> (ø)
src/conductor/client/event/workflow_events.py 100.00% <100.00%> (ø)
src/conductor/client/http/api_client.py 98.97% <100.00%> (+44.17%) ⬆️
...rc/conductor/client/http/models/integration_api.py 97.79% <ø> (-0.14%) ⬇️
src/conductor/client/http/models/schema_def.py 91.81% <ø> (-0.15%) ⬇️
src/conductor/client/http/models/workflow_def.py 85.19% <ø> (-0.38%) ⬇️
... and 16 more

... and 4 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@v1r3n v1r3n marked this pull request as draft November 9, 2025 08:54
@v1r3n v1r3n changed the title Asyncio workers, fixes and event listeners Worker improvements Nov 24, 2025
@v1r3n v1r3n marked this pull request as ready for review November 24, 2025 03:21
@v1r3n v1r3n requested review from am-orkes and nthmost-orkes and removed request for am-orkes November 24, 2025 05:59


class BackgroundEventLoop:
"""Manages a persistent asyncio event loop running in a background thread.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a SDK user, given I choose to use the async worker, I should have created my own event loop in my applicaiton instead of expecting the SDK to maintain a loop on behalf of me internally.

Is this a fair assumption?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current design creates one thread per worker task (so if you have 5 workers, there will be 5 threads polling/executing). This is consistent with other SDKs like Java. When creating the loop the loop belongs to the thread. This is internal implementation for the SDK. However I can see a future version that supports bringing your own loop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants