From 1f0259d8ffe28967d9361dc8809cefb8ae57f3c9 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Mon, 3 Nov 2025 16:41:41 -0800 Subject: [PATCH 1/4] feat: define workflow signal decorator in workflowDefinition Signed-off-by: Tim Li --- cadence/workflow.py | 51 +++++++++++++++- tests/cadence/worker/test_registry.py | 86 +++++++++++++++++++++++++++ 2 files changed, 135 insertions(+), 2 deletions(-) diff --git a/cadence/workflow.py b/cadence/workflow.py index 913ebd1..8da34aa 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -59,10 +59,16 @@ class WorkflowDefinition: Provides type safety and metadata for workflow classes. """ - def __init__(self, cls: Type, name: str, run_method_name: str): + def __init__(self, cls: Type, name: str, run_method_name: str, signals: dict[str, Callable[..., Any]]): self._cls = cls self._name = name self._run_method_name = run_method_name + self._signals = signals + + @property + def signals(self) -> dict[str, Callable[..., Any]]: + """Get the signals.""" + return self._signals @property def name(self) -> str: @@ -98,6 +104,9 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> "WorkflowDefinition": name = opts["name"] # Validate that the class has exactly one run method and find it + # Also validate that class does not have multiple signal methods with the same name + signals: dict[str, Callable[..., Any]] = {} + signal_names: dict[str, str] = {} # Map signal name to method name for duplicate detection run_method_name = None for attr_name in dir(cls): if attr_name.startswith("_"): @@ -114,11 +123,22 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> "WorkflowDefinition": f"Multiple @workflow.run methods found in class {cls.__name__}" ) run_method_name = attr_name + + if hasattr(attr, "_workflow_signal"): + signal_name = getattr(attr, "_workflow_signal") + if signal_name in signal_names: + raise ValueError( + f"Multiple @workflow.signal methods found in class {cls.__name__} " + f"with signal name '{signal_name}': '{attr_name}' and '{signal_names[signal_name]}'" + ) + signals[attr_name] = attr + signal_names[signal_name] = attr_name if run_method_name is None: raise ValueError(f"No @workflow.run method found in class {cls.__name__}") - return WorkflowDefinition(cls, name, run_method_name) + return WorkflowDefinition(cls, name, run_method_name, signals) + def run(func: Optional[T] = None) -> Union[T, Callable[[T], T]]: @@ -161,6 +181,33 @@ def decorator(f: T) -> T: # Called without parentheses: @workflow.run return decorator(func) +def signal(name: str | None = None) -> Callable[[T], T]: + """ + Decorator to mark a method as a workflow signal handler. + + Example: + @workflow.signal(name="approval_channel") + async def approve(self, approved: bool): + self.approved = approved + + Args: + name: The name of the signal + + Returns: + The decorated method with workflow signal metadata + + Raises: + ValueError: If name is not provided + + """ + if name is None: + raise ValueError("name is required") + + def decorator(f: T) -> T: + f._workflow_signal = name # type: ignore + return f + # Only allow @workflow.signal(name), require name to be explicitly provided + return decorator @dataclass class WorkflowInfo: diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index 0a61d8b..99a3b13 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -212,3 +212,89 @@ async def run(self, input: str) -> str: workflow_def = reg.get_workflow("custom_workflow_name") assert workflow_def.name == "custom_workflow_name" assert workflow_def.cls == CustomWorkflow + + def test_workflow_with_signal(self): + """Test workflow with signal handler.""" + reg = Registry() + + @reg.workflow + class WorkflowWithSignal: + @workflow.run + async def run(self): + return "done" + + @workflow.signal(name="approval") + async def handle_approval(self, approved: bool): + self.approved = approved + + workflow_def = reg.get_workflow("WorkflowWithSignal") + assert isinstance(workflow_def, WorkflowDefinition) + assert len(workflow_def.signals) == 1 + assert "handle_approval" in workflow_def.signals + assert hasattr(workflow_def.signals["handle_approval"], "_workflow_signal") + assert workflow_def.signals["handle_approval"]._workflow_signal == "approval" + + def test_workflow_with_multiple_signals(self): + """Test workflow with multiple signal handlers.""" + reg = Registry() + + @reg.workflow + class WorkflowWithMultipleSignals: + @workflow.run + async def run(self): + return "done" + + @workflow.signal(name="approval") + async def handle_approval(self, approved: bool): + self.approved = approved + + @workflow.signal(name="cancel") + async def handle_cancel(self): + self.cancelled = True + + workflow_def = reg.get_workflow("WorkflowWithMultipleSignals") + assert len(workflow_def.signals) == 2 + assert "handle_approval" in workflow_def.signals + assert "handle_cancel" in workflow_def.signals + assert getattr(workflow_def.signals["handle_approval"], "_workflow_signal") == "approval" + assert getattr(workflow_def.signals["handle_cancel"], "_workflow_signal") == "cancel" + + def test_signal_decorator_requires_name(self): + """Test that signal decorator requires name parameter.""" + with pytest.raises(ValueError, match="name is required"): + @workflow.signal() + async def test_signal(self): + pass + + def test_workflow_without_signals(self): + """Test that workflow without signals has empty signals dict.""" + reg = Registry() + + @reg.workflow + class WorkflowWithoutSignals: + @workflow.run + async def run(self): + return "done" + + workflow_def = reg.get_workflow("WorkflowWithoutSignals") + assert isinstance(workflow_def.signals, dict) + assert len(workflow_def.signals) == 0 + + def test_duplicate_signal_names_error(self): + """Test that duplicate signal names raise ValueError.""" + reg = Registry() + + with pytest.raises(ValueError, match="Multiple.*signal.*found.*with signal name 'approval'"): + @reg.workflow + class WorkflowWithDuplicateSignalNames: + @workflow.run + async def run(self): + return "done" + + @workflow.signal(name="approval") + async def handle_approval(self, approved: bool): + self.approved = approved + + @workflow.signal(name="approval") + async def handle_approval_different(self): + self.also_approved = True From 992534fe6c38586de9a892d7930b033e76de1f19 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Mon, 3 Nov 2025 16:43:50 -0800 Subject: [PATCH 2/4] lint + formatter Signed-off-by: Tim Li --- cadence/workflow.py | 22 ++++++++++++++++------ tests/cadence/worker/test_registry.py | 16 +++++++++++++--- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/cadence/workflow.py b/cadence/workflow.py index 8da34aa..12892e8 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -59,7 +59,13 @@ class WorkflowDefinition: Provides type safety and metadata for workflow classes. """ - def __init__(self, cls: Type, name: str, run_method_name: str, signals: dict[str, Callable[..., Any]]): + def __init__( + self, + cls: Type, + name: str, + run_method_name: str, + signals: dict[str, Callable[..., Any]], + ): self._cls = cls self._name = name self._run_method_name = run_method_name @@ -106,7 +112,9 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> "WorkflowDefinition": # Validate that the class has exactly one run method and find it # Also validate that class does not have multiple signal methods with the same name signals: dict[str, Callable[..., Any]] = {} - signal_names: dict[str, str] = {} # Map signal name to method name for duplicate detection + signal_names: dict[ + str, str + ] = {} # Map signal name to method name for duplicate detection run_method_name = None for attr_name in dir(cls): if attr_name.startswith("_"): @@ -123,7 +131,7 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> "WorkflowDefinition": f"Multiple @workflow.run methods found in class {cls.__name__}" ) run_method_name = attr_name - + if hasattr(attr, "_workflow_signal"): signal_name = getattr(attr, "_workflow_signal") if signal_name in signal_names: @@ -140,7 +148,6 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> "WorkflowDefinition": return WorkflowDefinition(cls, name, run_method_name, signals) - def run(func: Optional[T] = None) -> Union[T, Callable[[T], T]]: """ Decorator to mark a method as the main workflow run method. @@ -181,12 +188,13 @@ def decorator(f: T) -> T: # Called without parentheses: @workflow.run return decorator(func) + def signal(name: str | None = None) -> Callable[[T], T]: """ Decorator to mark a method as a workflow signal handler. Example: - @workflow.signal(name="approval_channel") + @workflow.signal(name="approval_channel") async def approve(self, approved: bool): self.approved = approved @@ -205,10 +213,12 @@ async def approve(self, approved: bool): def decorator(f: T) -> T: f._workflow_signal = name # type: ignore - return f + return f + # Only allow @workflow.signal(name), require name to be explicitly provided return decorator + @dataclass class WorkflowInfo: workflow_type: str diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index 99a3b13..94c22f0 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -256,12 +256,19 @@ async def handle_cancel(self): assert len(workflow_def.signals) == 2 assert "handle_approval" in workflow_def.signals assert "handle_cancel" in workflow_def.signals - assert getattr(workflow_def.signals["handle_approval"], "_workflow_signal") == "approval" - assert getattr(workflow_def.signals["handle_cancel"], "_workflow_signal") == "cancel" + assert ( + getattr(workflow_def.signals["handle_approval"], "_workflow_signal") + == "approval" + ) + assert ( + getattr(workflow_def.signals["handle_cancel"], "_workflow_signal") + == "cancel" + ) def test_signal_decorator_requires_name(self): """Test that signal decorator requires name parameter.""" with pytest.raises(ValueError, match="name is required"): + @workflow.signal() async def test_signal(self): pass @@ -284,7 +291,10 @@ def test_duplicate_signal_names_error(self): """Test that duplicate signal names raise ValueError.""" reg = Registry() - with pytest.raises(ValueError, match="Multiple.*signal.*found.*with signal name 'approval'"): + with pytest.raises( + ValueError, match="Multiple.*signal.*found.*with signal name 'approval'" + ): + @reg.workflow class WorkflowWithDuplicateSignalNames: @workflow.run From e02576ea8065f7b4b30a6ef01f9444058c2958c1 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 5 Nov 2025 12:14:15 -0800 Subject: [PATCH 3/4] minor lint Signed-off-by: Tim Li --- .../_internal/workflow/test_workflow_engine_integration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py index 3aa5d44..c40a1e6 100644 --- a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py +++ b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py @@ -4,7 +4,7 @@ """ import pytest -from unittest.mock import Mock, AsyncMock, patch +from unittest.mock import Mock, patch from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType from cadence.api.v1.history_pb2 import ( @@ -244,7 +244,7 @@ async def test_extract_workflow_input_deserialization_error( decision_task = self.create_mock_decision_task() # Mock data converter to raise an exception - mock_client.data_converter.from_data = AsyncMock( + mock_client.data_converter.from_data = Mock( side_effect=Exception("Deserialization error") ) From db46c858a337bf02fe2b0905e7457372e5280201 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 6 Nov 2025 14:49:31 -0800 Subject: [PATCH 4/4] implement signalDefinition Signed-off-by: Tim Li --- cadence/signal.py | 220 ++++++++++++++++++++++++++ cadence/workflow.py | 15 +- tests/cadence/worker/test_registry.py | 27 ++-- 3 files changed, 244 insertions(+), 18 deletions(-) create mode 100644 cadence/signal.py diff --git a/cadence/signal.py b/cadence/signal.py new file mode 100644 index 0000000..1816431 --- /dev/null +++ b/cadence/signal.py @@ -0,0 +1,220 @@ +""" +Signal definition and registration for Cadence workflows. + +This module provides functionality to define and register signal handlers +for workflows, similar to ActivityDefinition but for signals. +""" + +import inspect +from dataclasses import dataclass +from functools import update_wrapper +from inspect import Parameter, signature +from typing import ( + Callable, + Generic, + ParamSpec, + Type, + TypeVar, + TypedDict, + Unpack, + overload, + get_type_hints, + Any, +) + +P = ParamSpec("P") +T = TypeVar("T") + + +@dataclass(frozen=True) +class SignalParameter: + """Parameter metadata for a signal handler.""" + + name: str + type_hint: Type | None + has_default: bool + default_value: Any + + +class SignalDefinitionOptions(TypedDict, total=False): + """Options for defining a signal.""" + + name: str + + +class SignalDefinition(Generic[P, T]): + """ + Definition of a signal handler with metadata. + + Similar to ActivityDefinition but for signal handlers. + Provides type safety and metadata for signal handlers. + """ + + def __init__( + self, + wrapped: Callable[P, T], + name: str, + params: list[SignalParameter], + is_async: bool, + ): + self._wrapped = wrapped + self._name = name + self._params = params + self._is_async = is_async + update_wrapper(self, wrapped) + + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: + """Call the wrapped signal handler function.""" + return self._wrapped(*args, **kwargs) + + @property + def name(self) -> str: + """Get the signal name.""" + return self._name + + @property + def params(self) -> list[SignalParameter]: + """Get the signal parameters.""" + return self._params + + @property + def is_async(self) -> bool: + """Check if the signal handler is async.""" + return self._is_async + + @property + def wrapped(self) -> Callable[P, T]: + """Get the wrapped signal handler function.""" + return self._wrapped + + @staticmethod + def wrap( + fn: Callable[P, T], opts: SignalDefinitionOptions + ) -> "SignalDefinition[P, T]": + """ + Wrap a function as a SignalDefinition. + + Args: + fn: The signal handler function to wrap + opts: Options for the signal definition + + Returns: + A SignalDefinition instance + + Raises: + ValueError: If name is not provided in options or return type is not None + """ + name = opts.get("name") or fn.__qualname__ + is_async = inspect.iscoroutinefunction(fn) + params = _get_signal_signature(fn) + _validate_signal_return_type(fn) + + return SignalDefinition(fn, name, params, is_async) + + +SignalDecorator = Callable[[Callable[P, T]], SignalDefinition[P, T]] + + +@overload +def defn(fn: Callable[P, T]) -> SignalDefinition[P, T]: ... + + +@overload +def defn(**kwargs: Unpack[SignalDefinitionOptions]) -> SignalDecorator: ... + + +def defn( + fn: Callable[P, T] | None = None, **kwargs: Unpack[SignalDefinitionOptions] +) -> SignalDecorator | SignalDefinition[P, T]: + """ + Decorator to define a signal handler. + + Can be used with or without parentheses: + @signal.defn(name="approval") + async def handle_approval(self, approved: bool): + ... + + @signal.defn(name="approval") + def handle_approval(self, approved: bool): + ... + + Args: + fn: The signal handler function to decorate + **kwargs: Options for the signal definition (name is required) + + Returns: + The decorated function as a SignalDefinition instance + + Raises: + ValueError: If name is not provided + """ + options = SignalDefinitionOptions(**kwargs) + + def decorator(inner_fn: Callable[P, T]) -> SignalDefinition[P, T]: + return SignalDefinition.wrap(inner_fn, options) + + if fn is not None: + return decorator(fn) + + return decorator + + +def _validate_signal_return_type(fn: Callable) -> None: + """ + Validate that signal handler returns None. + + Args: + fn: The signal handler function + + Raises: + ValueError: If return type is not None + """ + try: + hints = get_type_hints(fn) + ret_type = hints.get("return", inspect.Signature.empty) + + if ret_type is not None and ret_type is not inspect.Signature.empty: + raise ValueError( + f"Signal handler '{fn.__qualname__}' must return None " + f"(signals cannot return values), got {ret_type}" + ) + except NameError: + pass + + +def _get_signal_signature(fn: Callable[P, T]) -> list[SignalParameter]: + """ + Extract parameter information from a signal handler function. + + Args: + fn: The signal handler function + + Returns: + List of SignalParameter objects + + Raises: + ValueError: If parameters are not positional + """ + sig = signature(fn) + args = sig.parameters + hints = get_type_hints(fn) + params = [] + + for name, param in args.items(): + # Filter out the self parameter for instance methods + if param.name == "self": + continue + + has_default = param.default != Parameter.empty + default = param.default if has_default else None + + if param.kind in (Parameter.POSITIONAL_ONLY, Parameter.POSITIONAL_OR_KEYWORD): + type_hint = hints.get(name, None) + params.append(SignalParameter(name, type_hint, has_default, default)) + else: + raise ValueError( + f"Signal handler '{fn.__qualname__}' parameter '{name}' must be positional, " + f"got {param.kind.name}" + ) + + return params diff --git a/cadence/workflow.py b/cadence/workflow.py index 12892e8..60baa58 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -19,6 +19,7 @@ from cadence.client import Client from cadence.data_converter import DataConverter +from cadence.signal import SignalDefinition, SignalDefinitionOptions ResultType = TypeVar("ResultType") @@ -64,7 +65,7 @@ def __init__( cls: Type, name: str, run_method_name: str, - signals: dict[str, Callable[..., Any]], + signals: dict[str, SignalDefinition[..., Any]], ): self._cls = cls self._name = name @@ -72,8 +73,8 @@ def __init__( self._signals = signals @property - def signals(self) -> dict[str, Callable[..., Any]]: - """Get the signals.""" + def signals(self) -> dict[str, SignalDefinition[..., Any]]: + """Get the signal definitions.""" return self._signals @property @@ -111,7 +112,7 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> "WorkflowDefinition": # Validate that the class has exactly one run method and find it # Also validate that class does not have multiple signal methods with the same name - signals: dict[str, Callable[..., Any]] = {} + signals: dict[str, SignalDefinition[..., Any]] = {} signal_names: dict[ str, str ] = {} # Map signal name to method name for duplicate detection @@ -139,7 +140,11 @@ def wrap(cls: Type, opts: WorkflowDefinitionOptions) -> "WorkflowDefinition": f"Multiple @workflow.signal methods found in class {cls.__name__} " f"with signal name '{signal_name}': '{attr_name}' and '{signal_names[signal_name]}'" ) - signals[attr_name] = attr + # Create SignalDefinition from the decorated method + signal_def = SignalDefinition.wrap( + attr, SignalDefinitionOptions(name=signal_name) + ) + signals[signal_name] = signal_def signal_names[signal_name] = attr_name if run_method_name is None: diff --git a/tests/cadence/worker/test_registry.py b/tests/cadence/worker/test_registry.py index 94c22f0..a7040dd 100644 --- a/tests/cadence/worker/test_registry.py +++ b/tests/cadence/worker/test_registry.py @@ -9,6 +9,7 @@ from cadence import workflow from cadence.worker import Registry from cadence.workflow import WorkflowDefinition +from cadence.signal import SignalDefinition from tests.cadence import common_activities @@ -230,9 +231,13 @@ async def handle_approval(self, approved: bool): workflow_def = reg.get_workflow("WorkflowWithSignal") assert isinstance(workflow_def, WorkflowDefinition) assert len(workflow_def.signals) == 1 - assert "handle_approval" in workflow_def.signals - assert hasattr(workflow_def.signals["handle_approval"], "_workflow_signal") - assert workflow_def.signals["handle_approval"]._workflow_signal == "approval" + assert "approval" in workflow_def.signals + signal_def = workflow_def.signals["approval"] + assert isinstance(signal_def, SignalDefinition) + assert signal_def.name == "approval" + assert signal_def.is_async is True + assert len(signal_def.params) == 1 + assert signal_def.params[0].name == "approved" def test_workflow_with_multiple_signals(self): """Test workflow with multiple signal handlers.""" @@ -254,16 +259,12 @@ async def handle_cancel(self): workflow_def = reg.get_workflow("WorkflowWithMultipleSignals") assert len(workflow_def.signals) == 2 - assert "handle_approval" in workflow_def.signals - assert "handle_cancel" in workflow_def.signals - assert ( - getattr(workflow_def.signals["handle_approval"], "_workflow_signal") - == "approval" - ) - assert ( - getattr(workflow_def.signals["handle_cancel"], "_workflow_signal") - == "cancel" - ) + assert "approval" in workflow_def.signals + assert "cancel" in workflow_def.signals + assert isinstance(workflow_def.signals["approval"], SignalDefinition) + assert isinstance(workflow_def.signals["cancel"], SignalDefinition) + assert workflow_def.signals["approval"].name == "approval" + assert workflow_def.signals["cancel"].name == "cancel" def test_signal_decorator_requires_name(self): """Test that signal decorator requires name parameter."""