Skip to content
Open
67 changes: 67 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,73 @@ The output of this example should look like this:
- "== APP == Workflow completed! Result: Completed"
```

### Simple Workflow Async
This example represents a workflow with async activities and using the async WorkflowClient that manages counters through a series of activities and child workflows.
It shows several Dapr Workflow features including:
- Basic activity execution with counter increments
- Retryable activities with configurable retry policies
- Child workflow orchestration with retry logic
- External event handling with timeouts
- Workflow state management (pause/resume)
- Activity error handling and retry backoff
- Global state tracking across workflow components
- Workflow lifecycle management (start, pause, resume, purge)


<!--STEP
name: Run the simple async workflow example
expected_stdout_lines:
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: SUSPENDED"
- "== APP == Get response from hello_world_wf after resume call: RUNNING"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: Completed"
timeout_seconds: 30
-->

```sh
dapr run --app-id wf-simple-aio-example -- python3 simple_aio.py
```
<!--END_STEP-->

The output of this example should look like this:

```
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: SUSPENDED"
- "== APP == Get response from hello_world_wf after resume call: RUNNING"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: Completed"
```

### Task Chaining

This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command:
Expand Down
177 changes: 177 additions & 0 deletions examples/workflow/simple_aio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# -*- coding: utf-8 -*-
# Copyright 2025 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from datetime import timedelta

import dapr.ext.workflow as wf
from dapr.ext.workflow.aio import DaprWorkflowClient, WorkflowRuntime

from dapr.clients.exceptions import DaprInternalError
from dapr.conf import Settings

settings = Settings()

counter = 0
retry_count = 0
child_orchestrator_count = 0
child_orchestrator_string = ''
child_act_retry_count = 0
instance_id = 'exampleInstanceID'
child_instance_id = 'childInstanceID'
workflow_name = 'hello_world_wf'
child_workflow_name = 'child_wf'
input_data = 'Hi Counter!'
event_name = 'event1'
event_data = 'eventData'
non_existent_id_error = 'no such instance exists'


retry_policy = wf.RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
backoff_coefficient=2,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(seconds=100),
)

asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
wfr = WorkflowRuntime(main_event_loop=loop)


# workflow still synchronous. Not expected to run any async code.
@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: wf.DaprWorkflowContext, wf_input):
print(f'{wf_input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)

# Change in event handling: Use when_any to handle both event and timeout
event = ctx.wait_for_external_event(event_name)
timeout = ctx.create_timer(timedelta(seconds=30))
winner = yield wf.when_any([event, timeout])

if winner == timeout:
print('Workflow timed out waiting for event')
return 'Timeout'

yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
return 'Completed'


# activity is async and will be executed on the main event loop.
@wfr.activity(name='hello_act')
async def hello_act(ctx: wf.WorkflowActivityContext, wf_input):
global counter
counter += wf_input
await asyncio.sleep(0.1) # simulates async work.
print(f'New counter value is: {counter}!', flush=True)


# an activity can also be left synchronous.
@wfr.activity(name='hello_retryable_act')
def hello_retryable_act(ctx: wf.WorkflowActivityContext):
global retry_count
if (retry_count % 2) == 0:
print(f'Retry count value is: {retry_count}!', flush=True)
retry_count += 1
raise ValueError('Retryable Error')
print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
retry_count += 1


# workflow still synchronous. Not expected to run any async code.
@wfr.workflow(name='child_retryable_wf')
def child_retryable_wf(ctx: wf.DaprWorkflowContext):
global child_orchestrator_string, child_orchestrator_count
if not ctx.is_replaying:
child_orchestrator_count += 1
print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
child_orchestrator_string += str(child_orchestrator_count)
yield ctx.call_activity(
act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy
)
if child_orchestrator_count < 3:
raise ValueError('Retryable Error')


@wfr.activity(name='act_for_child_wf')
async def act_for_child_wf(ctx: wf.WorkflowActivityContext, inp):
global child_orchestrator_string, child_act_retry_count
inp_char = chr(96 + inp)
print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
child_orchestrator_string += inp_char
if child_act_retry_count % 2 == 0:
child_act_retry_count += 1
raise ValueError('Retryable Error')
child_act_retry_count += 1


async def main():
wfr.start()
wf_client = DaprWorkflowClient()

print('==========Start Counter Increase as per Input:==========')
await wf_client.schedule_new_workflow(
workflow=hello_world_wf, input=input_data, instance_id=instance_id
)

await wf_client.wait_for_workflow_start(instance_id)

# Sleep to let the workflow run initial activities
await asyncio.sleep(12)

assert counter == 11
assert retry_count == 2
assert child_orchestrator_string == '1aa2bb3cc'

# Pause Test
await wf_client.pause_workflow(instance_id=instance_id)
metadata = await wf_client.get_workflow_state(instance_id=instance_id)
print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}')

# Resume Test
await wf_client.resume_workflow(instance_id=instance_id)
metadata = await wf_client.get_workflow_state(instance_id=instance_id)
print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}')

await asyncio.sleep(2) # Give the workflow time to reach the event wait state
await wf_client.raise_workflow_event(
instance_id=instance_id, event_name=event_name, data=event_data
)

print('========= Waiting for Workflow completion', flush=True)
try:
state = await wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
if state.runtime_status.name == 'COMPLETED':
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
else:
print(f'Workflow failed! Status: {state.runtime_status.name}')
except TimeoutError:
print('*** Workflow timed out!')

await wf_client.purge_workflow(instance_id=instance_id)
try:
await wf_client.get_workflow_state(instance_id=instance_id)
except DaprInternalError as err:
if non_existent_id_error in err._message:
print('Instance Successfully Purged')

wfr.shutdown()


if __name__ == '__main__':
loop.run_until_complete(main())
22 changes: 22 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-

"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from .dapr_workflow_client import DaprWorkflowClient
from .workflow_runtime import WorkflowRuntime

__all__ = [
'DaprWorkflowClient',
'WorkflowRuntime',
]
Loading
Loading