Skip to content
Open
67 changes: 67 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,73 @@ The output of this example should look like this:
- "== APP == Workflow completed! Result: Completed"
```

### Simple Workflow Async
This example represents a workflow with async activities and using the async WorkflowClient that manages counters through a series of activities and child workflows.
It shows several Dapr Workflow features including:
- Basic activity execution with counter increments
- Retryable activities with configurable retry policies
- Child workflow orchestration with retry logic
- External event handling with timeouts
- Workflow state management (pause/resume)
- Activity error handling and retry backoff
- Global state tracking across workflow components
- Workflow lifecycle management (start, pause, resume, purge)


<!--STEP
name: Run the simple async workflow example
expected_stdout_lines:
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: SUSPENDED"
- "== APP == Get response from hello_world_wf after resume call: RUNNING"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: Completed"
timeout_seconds: 30
-->

```sh
dapr run --app-id wf-simple-aio-example -- python3 simple_aio.py
```
<!--END_STEP-->

The output of this example should look like this:

```
- "== APP == Hi Counter!"
- "== APP == New counter value is: 1!"
- "== APP == New counter value is: 11!"
- "== APP == Retry count value is: 0!"
- "== APP == Retry count value is: 1! This print statement verifies retry"
- "== APP == Appending 1 to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending a to child_orchestrator_string!"
- "== APP == Appending 2 to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending b to child_orchestrator_string!"
- "== APP == Appending 3 to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Appending c to child_orchestrator_string!"
- "== APP == Get response from hello_world_wf after pause call: SUSPENDED"
- "== APP == Get response from hello_world_wf after resume call: RUNNING"
- "== APP == New counter value is: 111!"
- "== APP == New counter value is: 1111!"
- "== APP == Workflow completed! Result: Completed"
```

### Task Chaining

This example demonstrates how to chain "activity" tasks together in a workflow. You can run this sample using the following command:
Expand Down
175 changes: 175 additions & 0 deletions examples/workflow/simple_aio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
# -*- coding: utf-8 -*-
# Copyright 2023 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from datetime import timedelta

import dapr.ext.workflow as wf
from dapr.ext.workflow.aio import DaprWorkflowClient, WorkflowRuntime

from dapr.clients.exceptions import DaprInternalError
from dapr.conf import Settings

settings = Settings()

counter = 0
retry_count = 0
child_orchestrator_count = 0
child_orchestrator_string = ''
child_act_retry_count = 0
instance_id = 'exampleInstanceID'
child_instance_id = 'childInstanceID'
workflow_name = 'hello_world_wf'
child_workflow_name = 'child_wf'
input_data = 'Hi Counter!'
event_name = 'event1'
event_data = 'eventData'
non_existent_id_error = 'no such instance exists'


retry_policy = wf.RetryPolicy(
first_retry_interval=timedelta(seconds=1),
max_number_of_attempts=3,
backoff_coefficient=2,
max_retry_interval=timedelta(seconds=10),
retry_timeout=timedelta(seconds=100),
)

asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.get_event_loop()
wfr = WorkflowRuntime(main_event_loop=loop)


# workflow still synchronous. Not expected to run any async code.
@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: wf.DaprWorkflowContext, wf_input):
print(f'{wf_input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)

# Change in event handling: Use when_any to handle both event and timeout
event = ctx.wait_for_external_event(event_name)
timeout = ctx.create_timer(timedelta(seconds=30))
winner = yield wf.when_any([event, timeout])

if winner == timeout:
print('Workflow timed out waiting for event')
return 'Timeout'

yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
return 'Completed'


# activity is async and will be executed on the main event loop.
@wfr.activity(name='hello_act')
async def hello_act(ctx: wf.WorkflowActivityContext, wf_input):
global counter
counter += wf_input
print(f'New counter value is: {counter}!', flush=True)


@wfr.activity(name='hello_retryable_act')
async def hello_retryable_act(ctx: wf.WorkflowActivityContext):
global retry_count
if (retry_count % 2) == 0:
print(f'Retry count value is: {retry_count}!', flush=True)
retry_count += 1
raise ValueError('Retryable Error')
print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
retry_count += 1


# workflow still synchronous. Not expected to run any async code.
@wfr.workflow(name='child_retryable_wf')
def child_retryable_wf(ctx: wf.DaprWorkflowContext):
global child_orchestrator_string, child_orchestrator_count
if not ctx.is_replaying:
child_orchestrator_count += 1
print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
child_orchestrator_string += str(child_orchestrator_count)
yield ctx.call_activity(
act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy
)
if child_orchestrator_count < 3:
raise ValueError('Retryable Error')


@wfr.activity(name='act_for_child_wf')
async def act_for_child_wf(ctx: wf.WorkflowActivityContext, inp):
global child_orchestrator_string, child_act_retry_count
inp_char = chr(96 + inp)
print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
child_orchestrator_string += inp_char
if child_act_retry_count % 2 == 0:
child_act_retry_count += 1
raise ValueError('Retryable Error')
child_act_retry_count += 1


async def main():
wfr.start()
wf_client = DaprWorkflowClient()

print('==========Start Counter Increase as per Input:==========')
await wf_client.schedule_new_workflow(
workflow=hello_world_wf, input=input_data, instance_id=instance_id
)

await wf_client.wait_for_workflow_start(instance_id)

# Sleep to let the workflow run initial activities
await asyncio.sleep(12)

assert counter == 11
assert retry_count == 2
assert child_orchestrator_string == '1aa2bb3cc'

# Pause Test
await wf_client.pause_workflow(instance_id=instance_id)
metadata = await wf_client.get_workflow_state(instance_id=instance_id)
print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}')

# Resume Test
await wf_client.resume_workflow(instance_id=instance_id)
metadata = await wf_client.get_workflow_state(instance_id=instance_id)
print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}')

await asyncio.sleep(2) # Give the workflow time to reach the event wait state
await wf_client.raise_workflow_event(
instance_id=instance_id, event_name=event_name, data=event_data
)

print('========= Waiting for Workflow completion', flush=True)
try:
state = await wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
if state.runtime_status.name == 'COMPLETED':
print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
else:
print(f'Workflow failed! Status: {state.runtime_status.name}')
except TimeoutError:
print('*** Workflow timed out!')

await wf_client.purge_workflow(instance_id=instance_id)
try:
await wf_client.get_workflow_state(instance_id=instance_id)
except DaprInternalError as err:
if non_existent_id_error in err._message:
print('Instance Successfully Purged')

wfr.shutdown()


if __name__ == '__main__':
loop.run_until_complete(main())
9 changes: 9 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-

from .dapr_workflow_client import DaprWorkflowClient
from .workflow_runtime import WorkflowRuntime

__all__ = [
'DaprWorkflowClient',
'WorkflowRuntime',
]
Loading
Loading