Skip to content

Commit d32c9aa

Browse files
committed
Added async workflow client implementation, leveraging new durabletask.aio.client implementation
1 parent c44c28d commit d32c9aa

File tree

4 files changed

+467
-1
lines changed

4 files changed

+467
-1
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# -*- coding: utf-8 -*-
2+
3+
from .dapr_workflow_client import DaprWorkflowClientAsync
4+
5+
# Public alias to mirror sync naming under aio namespace
6+
DaprWorkflowClient = DaprWorkflowClientAsync
7+
8+
__all__ = [
9+
'DaprWorkflowClientAsync',
10+
'DaprWorkflowClient',
11+
]
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2025 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
from datetime import datetime
19+
from typing import Any, Optional, TypeVar
20+
21+
import durabletask.internal.orchestrator_service_pb2 as pb
22+
from dapr.ext.workflow.logger import Logger, LoggerOptions
23+
from dapr.ext.workflow.util import getAddress
24+
from dapr.ext.workflow.workflow_context import Workflow
25+
from dapr.ext.workflow.workflow_state import WorkflowState
26+
from durabletask.aio import client as aioclient
27+
from grpc.aio import AioRpcError
28+
29+
from dapr.clients import DaprInternalError
30+
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
31+
from dapr.conf import settings
32+
from dapr.conf.helpers import GrpcEndpoint
33+
34+
T = TypeVar('T')
35+
TInput = TypeVar('TInput')
36+
TOutput = TypeVar('TOutput')
37+
38+
39+
class DaprWorkflowClientAsync:
40+
"""Async client for managing Dapr Workflow instances.
41+
42+
This uses a gRPC async connection to send commands directly to the workflow engine,
43+
bypassing the Dapr API layer. Intended to be used by workflow applications.
44+
"""
45+
46+
def __init__(
47+
self,
48+
host: Optional[str] = None,
49+
port: Optional[str] = None,
50+
logger_options: Optional[LoggerOptions] = None,
51+
):
52+
address = getAddress(host, port)
53+
54+
try:
55+
uri = GrpcEndpoint(address)
56+
except ValueError as error:
57+
raise DaprInternalError(f'{error}') from error
58+
59+
self._logger = Logger('DaprWorkflowClientAsync', logger_options)
60+
61+
metadata = tuple()
62+
if settings.DAPR_API_TOKEN:
63+
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
64+
options = self._logger.get_options()
65+
self.__obj = aioclient.AsyncTaskHubGrpcClient(
66+
host_address=uri.endpoint,
67+
metadata=metadata,
68+
secure_channel=uri.tls,
69+
log_handler=options.log_handler,
70+
log_formatter=options.log_formatter,
71+
)
72+
73+
async def schedule_new_workflow(
74+
self,
75+
workflow: Workflow,
76+
*,
77+
input: Optional[TInput] = None,
78+
instance_id: Optional[str] = None,
79+
start_at: Optional[datetime] = None,
80+
reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None,
81+
) -> str:
82+
"""Schedules a new workflow instance for execution.
83+
84+
Args:
85+
workflow: The workflow to schedule.
86+
input: The optional input to pass to the scheduled workflow instance. This must be a
87+
serializable value.
88+
instance_id: The unique ID of the workflow instance to schedule. If not specified, a
89+
new GUID value is used.
90+
start_at: The time when the workflow instance should start executing.
91+
If not specified or if a date-time in the past is specified, the workflow instance will
92+
be scheduled immediately.
93+
reuse_id_policy: Optional policy to reuse the workflow id when there is a conflict with
94+
an existing workflow instance.
95+
96+
Returns:
97+
The ID of the scheduled workflow instance.
98+
"""
99+
if hasattr(workflow, '_dapr_alternate_name'):
100+
return await self.__obj.schedule_new_orchestration(
101+
workflow.__dict__['_dapr_alternate_name'],
102+
input=input,
103+
instance_id=instance_id,
104+
start_at=start_at,
105+
reuse_id_policy=reuse_id_policy,
106+
)
107+
return await self.__obj.schedule_new_orchestration(
108+
workflow.__name__,
109+
input=input,
110+
instance_id=instance_id,
111+
start_at=start_at,
112+
reuse_id_policy=reuse_id_policy,
113+
)
114+
115+
async def get_workflow_state(
116+
self, instance_id: str, *, fetch_payloads: bool = True
117+
) -> Optional[WorkflowState]:
118+
"""Fetches runtime state for the specified workflow instance.
119+
120+
Args:
121+
instance_id: The unique ID of the workflow instance to fetch.
122+
fetch_payloads: If true, fetches the input, output payloads and custom status
123+
for the workflow instance. Defaults to true.
124+
125+
Returns:
126+
The current state of the workflow instance, or None if the workflow instance does not
127+
exist.
128+
129+
"""
130+
try:
131+
state = await self.__obj.get_orchestration_state(
132+
instance_id, fetch_payloads=fetch_payloads
133+
)
134+
return WorkflowState(state) if state else None
135+
except AioRpcError as error:
136+
if error.details() and 'no such instance exists' in error.details():
137+
self._logger.warning(f'Workflow instance not found: {instance_id}')
138+
return None
139+
self._logger.error(
140+
f'Unhandled RPC error while fetching workflow state: {error.code()} - {error.details()}'
141+
)
142+
raise
143+
144+
async def wait_for_workflow_start(
145+
self, instance_id: str, *, fetch_payloads: bool = False, timeout_in_seconds: int = 0
146+
) -> Optional[WorkflowState]:
147+
"""Waits for a workflow to start running and returns a WorkflowState object that contains
148+
metadata about the started workflow.
149+
150+
A "started" workflow instance is any instance not in the WorkflowRuntimeStatus.Pending
151+
state. This method will return a completed task if the workflow has already started
152+
running or has already completed.
153+
154+
Args:
155+
instance_id: The unique ID of the workflow instance to wait for.
156+
fetch_payloads: If true, fetches the input, output payloads and custom status for
157+
the workflow instance. Defaults to false.
158+
timeout_in_seconds: The maximum time to wait for the workflow instance to start running.
159+
Defaults to meaning no timeout.
160+
161+
Returns:
162+
WorkflowState record that describes the workflow instance and its execution status.
163+
If the specified workflow isn't found, the WorkflowState.Exists value will be false.
164+
"""
165+
state = await self.__obj.wait_for_orchestration_start(
166+
instance_id, fetch_payloads=fetch_payloads, timeout=timeout_in_seconds
167+
)
168+
return WorkflowState(state) if state else None
169+
170+
async def wait_for_workflow_completion(
171+
self, instance_id: str, *, fetch_payloads: bool = True, timeout_in_seconds: int = 0
172+
) -> Optional[WorkflowState]:
173+
"""Waits for a workflow to complete and returns a WorkflowState object that contains
174+
metadata about the started instance.
175+
176+
A "completed" workflow instance is any instance in one of the terminal states. For
177+
example, the WorkflowRuntimeStatus.Completed, WorkflowRuntimeStatus.Failed or
178+
WorkflowRuntimeStatus.Terminated states.
179+
180+
Workflows are long-running and could take hours, days, or months before completing.
181+
Workflows can also be eternal, in which case they'll never complete unless terminated.
182+
In such cases, this call may block indefinitely, so care must be taken to ensure
183+
appropriate timeouts are enforced using timeout parameter.
184+
185+
If a workflow instance is already complete when this method is called, the method
186+
will return immediately.
187+
188+
Args:
189+
instance_id: The unique ID of the workflow instance to wait for.
190+
fetch_payloads: If true, fetches the input, output payloads and custom status
191+
for the workflow instance. Defaults to true.
192+
timeout_in_seconds: The maximum time in seconds to wait for the workflow instance to
193+
complete. Defaults to 0 seconds, meaning no timeout.
194+
195+
Returns:
196+
WorkflowState record that describes the workflow instance and its execution status.
197+
"""
198+
state = await self.__obj.wait_for_orchestration_completion(
199+
instance_id, fetch_payloads=fetch_payloads, timeout=timeout_in_seconds
200+
)
201+
return WorkflowState(state) if state else None
202+
203+
async def raise_workflow_event(
204+
self, instance_id: str, event_name: str, *, data: Optional[Any] = None
205+
) -> None:
206+
"""Sends an event notification message to a waiting workflow instance.
207+
In order to handle the event, the target workflow instance must be waiting for an
208+
event named value of "eventName" param using the wait_for_external_event API.
209+
If the target workflow instance is not yet waiting for an event named param "eventName"
210+
value, then the event will be saved in the workflow instance state and dispatched
211+
immediately when the workflow calls wait_for_external_event.
212+
This event saving occurs even if the workflow has canceled its wait operation before
213+
the event was received.
214+
215+
Workflows can wait for the same event name multiple times, so sending multiple events
216+
with the same name is allowed. Each external event received by a workflow will complete
217+
just one task returned by the wait_for_external_event method.
218+
219+
Raised events for a completed or non-existent workflow instance will be silently
220+
discarded.
221+
222+
Args:
223+
instance_id: The ID of the workflow instance that will handle the event.
224+
event_name: The name of the event. Event names are case-insensitive.
225+
data: The serializable data payload to include with the event.
226+
"""
227+
return await self.__obj.raise_orchestration_event(instance_id, event_name, data=data)
228+
229+
async def terminate_workflow(
230+
self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True
231+
) -> None:
232+
"""Terminates a running workflow instance and updates its runtime status to
233+
WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in
234+
the task hub. When the task hub worker processes this message, it will update the runtime
235+
status of the target instance to WorkflowRuntimeStatus.Terminated. You can use
236+
wait_for_workflow_completion to wait for the instance to reach the terminated state.
237+
238+
Terminating a workflow will terminate all child workflows that were started by
239+
the workflow instance.
240+
241+
However, terminating a workflow has no effect on any in-flight activity function
242+
executions that were started by the terminated workflow instance.
243+
244+
At the time of writing, there is no way to terminate an in-flight activity execution.
245+
246+
Args:
247+
instance_id: The ID of the workflow instance to terminate.
248+
output: The optional output to set for the terminated workflow instance.
249+
recursive: The optional flag to terminate all child workflows.
250+
251+
"""
252+
return await self.__obj.terminate_orchestration(
253+
instance_id, output=output, recursive=recursive
254+
)
255+
256+
async def pause_workflow(self, instance_id: str) -> None:
257+
"""Suspends a workflow instance, halting processing of it until resume_workflow is used to
258+
resume the workflow.
259+
260+
Args:
261+
instance_id: The instance ID of the workflow to suspend.
262+
"""
263+
return await self.__obj.suspend_orchestration(instance_id)
264+
265+
async def resume_workflow(self, instance_id: str) -> None:
266+
"""Resumes a workflow instance that was suspended via pause_workflow.
267+
268+
Args:
269+
instance_id: The instance ID of the workflow to resume.
270+
"""
271+
return await self.__obj.resume_orchestration(instance_id)
272+
273+
async def purge_workflow(self, instance_id: str, recursive: bool = True) -> None:
274+
"""Purge data from a workflow instance.
275+
276+
Args:
277+
instance_id: The instance ID of the workflow to purge.
278+
recursive: The optional flag to also purge data from all child workflows.
279+
"""
280+
return await self.__obj.purge_orchestration(instance_id, recursive)

ext/dapr-ext-workflow/setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ packages = find_namespace:
2525
include_package_data = True
2626
install_requires =
2727
dapr >= 1.16.1rc1
28-
durabletask-dapr >= 0.2.0a9
28+
durabletask-dapr >= 0.2.0a11
2929

3030
[options.packages.find]
3131
include =

0 commit comments

Comments
 (0)