Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
920ebfc
initial asyncio implemenation after durable task
filintod Aug 20, 2025
e2808c3
more tests
filintod Aug 20, 2025
35b2a72
remove redundant
filintod Aug 20, 2025
b0fb82c
add serializer
filintod Aug 20, 2025
23abd36
more changes to async work
filintod Aug 29, 2025
5df77ea
add is_replaying to async context
filintod Aug 31, 2025
998e4bb
add middleware hooks to handle inbound/outbound handoff from/to activ…
filintod Sep 2, 2025
ea66e27
better handling of asyncio gather statements
filintod Sep 2, 2025
f43b940
rename middleware to follow the regular methods on_call_activity and …
filintod Sep 3, 2025
cb5ffa3
updates, keep parity on asyncio ctx and regular one. Added tests to …
filintod Sep 6, 2025
567d266
modify middleware with interceptors
filintod Sep 8, 2025
f7f5b41
interceptors update/cleanup
filintod Sep 9, 2025
53585b2
Implement metadata handling in workflow client and runtime
filintod Sep 9, 2025
81f3ab3
updates to interceptors
filintod Sep 11, 2025
51756c4
add gprc helpers
filintod Sep 11, 2025
262d962
update docs
filintod Sep 11, 2025
bb34d03
Merge branch 'main' into filinto/asyncio
filintod Sep 11, 2025
9709023
fixes after merge, linting
filintod Sep 11, 2025
4a39bf4
fix bug in interceptor, add comments about generator returns yield
filintod Sep 12, 2025
3d38ddc
make async in parity with regular workflow
filintod Sep 12, 2025
c06d83f
add deterministic mixin class, proper metadata carry in awaitable
filintod Sep 16, 2025
e52ce1b
trace context update from durable task
filintod Sep 17, 2025
d42eb41
lint + updates to interceptor
filintod Oct 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mypy-protobuf>=2.9
flake8>=3.7.9
tox>=4.3.0
coverage>=5.3
pytest
wheel
# used in unit test only
opentelemetry-sdk
Expand Down
15 changes: 15 additions & 0 deletions examples/workflow-async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Dapr Workflow Async Examples (Python)

These examples mirror `examples/workflow/` but author orchestrators with `async def` using the
async workflow APIs. Activities remain regular functions unless noted.

How to run:
- Ensure a Dapr sidecar is running locally. If needed, set `DURABLETASK_GRPC_ENDPOINT`, or
`DURABLETASK_GRPC_HOST/PORT`.
- Install requirements: `pip install -r requirements.txt`
- Run any example: `python simple.py`

Notes:
- Orchestrators use `await ctx.activity(...)`, `await ctx.sleep(...)`, `await ctx.when_all/when_any(...)`, etc.
- No event loop is started manually; the Durable Task worker drives the async orchestrators.
- You can also launch instances using `DaprWorkflowClient` as in the non-async examples.
46 changes: 46 additions & 0 deletions examples/workflow-async/child_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-

"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the specific language governing permissions and
limitations under the License.
"""
from dapr.ext.workflow import (
AsyncWorkflowContext,
DaprWorkflowClient,
WorkflowRuntime,
)

wfr = WorkflowRuntime()


@wfr.async_workflow(name='child_async')
async def child(ctx: AsyncWorkflowContext, n: int) -> int:
return n * 2


@wfr.async_workflow(name='parent_async')
async def parent(ctx: AsyncWorkflowContext, n: int) -> int:
r = await ctx.call_child_workflow(child, input=n)
print(f'Child workflow returned {r}')
return r + 1


def main():
wfr.start()
client = DaprWorkflowClient()
instance_id = 'parent_async_instance'
client.schedule_new_workflow(workflow=parent, input=5, instance_id=instance_id)
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
wfr.shutdown()


if __name__ == '__main__':
main()
48 changes: 48 additions & 0 deletions examples/workflow-async/fan_out_fan_in.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the specific language governing permissions and
limitations under the License.
"""
from dapr.ext.workflow import (
AsyncWorkflowContext,
DaprWorkflowClient,
WorkflowActivityContext,
WorkflowRuntime,
)

wfr = WorkflowRuntime()


@wfr.activity(name='square')
def square(ctx: WorkflowActivityContext, x: int) -> int:
return x * x


@wfr.async_workflow(name='fan_out_fan_in_async')
async def orchestrator(ctx: AsyncWorkflowContext):
tasks = [ctx.call_activity(square, input=i) for i in range(1, 6)]
results = await ctx.when_all(tasks)
total = sum(results)
return total


def main():
wfr.start()
client = DaprWorkflowClient()
instance_id = 'fofi_async'
client.schedule_new_workflow(workflow=orchestrator, instance_id=instance_id)
wf_state = client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
print(f'Workflow state: {wf_state}')
wfr.shutdown()


if __name__ == '__main__':
main()
43 changes: 43 additions & 0 deletions examples/workflow-async/human_approval.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the specific language governing permissions and
limitations under the License.
"""
from dapr.ext.workflow import AsyncWorkflowContext, DaprWorkflowClient, WorkflowRuntime

wfr = WorkflowRuntime()


@wfr.async_workflow(name='human_approval_async')
async def orchestrator(ctx: AsyncWorkflowContext, request_id: str):
decision = await ctx.when_any([
ctx.wait_for_external_event(f'approve:{request_id}'),
ctx.wait_for_external_event(f'reject:{request_id}'),
ctx.create_timer(300.0),
])
if isinstance(decision, dict) and decision.get('approved'):
return 'APPROVED'
if isinstance(decision, dict) and decision.get('rejected'):
return 'REJECTED'
return 'TIMEOUT'


def main():
wfr.start()
client = DaprWorkflowClient()
instance_id = 'human_approval_async_1'
client.schedule_new_workflow(workflow=orchestrator, input='REQ-1', instance_id=instance_id)
# In a real scenario, raise approve/reject event from another service.
wfr.shutdown()


if __name__ == '__main__':
main()
2 changes: 2 additions & 0 deletions examples/workflow-async/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
dapr-ext-workflow-dev>=1.15.0.dev
dapr-dev>=1.15.0.dev
131 changes: 131 additions & 0 deletions examples/workflow-async/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-
"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the specific language governing permissions and
limitations under the License.
"""
from datetime import timedelta
from time import sleep

from dapr.ext.workflow import (
AsyncWorkflowContext,
DaprWorkflowClient,
RetryPolicy,
WorkflowActivityContext,
WorkflowRuntime,
)

counter = 0
retry_count = 0
child_orchestrator_string = ''
instance_id = 'asyncExampleInstanceID'
child_instance_id = 'asyncChildInstanceID'
workflow_name = 'async_hello_world_wf'
child_workflow_name = 'async_child_wf'
input_data = 'Hi Async Counter!'
event_name = 'event1'
event_data = 'eventData'

retry_policy = RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
backoff_coefficient=2,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(seconds=100),
)

wfr = WorkflowRuntime()


@wfr.async_workflow(name=workflow_name)
async def hello_world_wf(ctx: AsyncWorkflowContext, wf_input):
global counter
# activities
result_1 = await ctx.call_activity(hello_act, input=1)
print(f'Activity 1 returned {result_1}')
result_2 = await ctx.call_activity(hello_act, input=10)
print(f'Activity 2 returned {result_2}')
result_3 = await ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
print(f'Activity 3 returned {result_3}')
result_4 = await ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)
print(f'Child workflow returned {result_4}')

# Event vs timeout using when_any
first = await ctx.when_any([
ctx.wait_for_external_event(event_name),
ctx.create_timer(timedelta(seconds=30)),
])

# Proceed only if event won
if isinstance(first, dict) and 'event' in first:
await ctx.call_activity(hello_act, input=100)
await ctx.call_activity(hello_act, input=1000)
return 'Completed'
return 'Timeout'


@wfr.activity(name='async_hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
global counter
counter += wf_input
return f'Activity returned {wf_input}'


@wfr.activity(name='async_hello_retryable_act')
def hello_retryable_act(ctx: WorkflowActivityContext):
global retry_count
if (retry_count % 2) == 0:
retry_count += 1
raise ValueError('Retryable Error')
retry_count += 1
return f'Activity returned {retry_count}'


@wfr.async_workflow(name=child_workflow_name)
async def child_retryable_wf(ctx: AsyncWorkflowContext):
global child_orchestrator_string
# Call activity with retry and simulate retryable workflow failure until certain state
child_activity_result = await ctx.call_activity(act_for_child_wf, input='x', retry_policy=retry_policy)
print(f'Child activity returned {child_activity_result}')
# In a real sample, you might check state and raise to trigger retry
return 'ok'


@wfr.activity(name='async_act_for_child_wf')
def act_for_child_wf(ctx: WorkflowActivityContext, inp):
global child_orchestrator_string
child_orchestrator_string += inp


def main():
wfr.start()
wf_client = DaprWorkflowClient()

wf_client.schedule_new_workflow(
workflow=hello_world_wf, input=input_data, instance_id=instance_id
)

wf_client.wait_for_workflow_start(instance_id)

# Let initial activities run
sleep(5)

# Raise event to continue
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data={'ok': True})

# Wait for completion
state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
print(f'Workflow status: {state.runtime_status.name}')

wfr.shutdown()


if __name__ == '__main__':
main()
47 changes: 47 additions & 0 deletions examples/workflow-async/task_chaining.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the specific language governing permissions and
limitations under the License.
"""
from dapr.ext.workflow import (
AsyncWorkflowContext,
DaprWorkflowClient,
WorkflowActivityContext,
WorkflowRuntime,
)

wfr = WorkflowRuntime()


@wfr.activity(name='sum')
def sum_act(ctx: WorkflowActivityContext, nums):
return sum(nums)


@wfr.async_workflow(name='task_chaining_async')
async def orchestrator(ctx: AsyncWorkflowContext):
a = await ctx.call_activity(sum_act, input=[1, 2])
b = await ctx.call_activity(sum_act, input=[a, 3])
c = await ctx.call_activity(sum_act, input=[b, 4])
return c


def main():
wfr.start()
client = DaprWorkflowClient()
instance_id = 'task_chain_async'
client.schedule_new_workflow(workflow=orchestrator, instance_id=instance_id)
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=60)
wfr.shutdown()


if __name__ == '__main__':
main()
35 changes: 35 additions & 0 deletions ext/dapr-ext-workflow/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,41 @@ Installation

pip install dapr-ext-workflow

Async authoring (experimental)
------------------------------

This package supports authoring workflows with ``async def`` in addition to the existing generator-based orchestrators.

- Register async workflows using ``WorkflowRuntime.async_workflow`` or ``register_async_workflow``.
- Use ``AsyncWorkflowContext`` for deterministic operations:

- Activities: ``await ctx.activity(activity_fn, input=...)``
- Sub-orchestrators: ``await ctx.sub_orchestrator(workflow_fn, input=...)``
- Timers: ``await ctx.sleep(seconds|timedelta)``
- External events: ``await ctx.wait_for_external_event(name)``
- Concurrency: ``await ctx.when_all([...])``, ``await ctx.when_any([...])``
- Deterministic utils: ``ctx.now()``, ``ctx.random()``, ``ctx.uuid4()``

Best-effort sandbox
~~~~~~~~~~~~~~~~~~~

Opt-in scoped compatibility mode maps ``asyncio.sleep``, ``random``, ``uuid.uuid4``, and ``time.time`` to deterministic equivalents during workflow execution. Use ``sandbox_mode="best_effort"`` or ``"strict"`` when registering async workflows. Strict mode blocks ``asyncio.create_task`` in orchestrators.

Examples
~~~~~~~~

See ``ext/dapr-ext-workflow/examples`` for:

- ``async_activity_sequence.py``
- ``async_external_event.py``
- ``async_sub_orchestrator.py``

Determinism and semantics
~~~~~~~~~~~~~~~~~~~~~~~~~

- ``when_any`` losers: the first-completer result is returned; non-winning awaitables are ignored deterministically (no additional commands are emitted by the orchestrator for cancellation). This ensures replay stability. Integration behavior with the sidecar is subject to the Durable Task scheduler; the orchestrator does not actively cancel losers.
- Suspension and termination: when an instance is suspended, only new external events are buffered while replay continues to reconstruct state; async orchestrators can inspect ``ctx.is_suspended`` if exposed by the runtime. Termination completes the orchestrator with TERMINATED status and does not raise into the coroutine. End-to-end confirmation requires running against a sidecar; unit tests in this repo do not start a sidecar.

References
----------

Expand Down
Loading