From 79e7327fd03826fbd8a11585d9f2124d2ea3482b Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 30 Oct 2025 13:08:17 -0700 Subject: [PATCH 1/4] feat: Implement signal_with_start_workflow Signed-off-by: Tim Li --- cadence/client.py | 68 +++++++++++++++++++++ tests/integration_tests/test_client.py | 83 ++++++++++++++++++++++++++ 2 files changed, 151 insertions(+) diff --git a/cadence/client.py b/cadence/client.py index dcad7f3..75481ba 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -17,6 +17,8 @@ from cadence.api.v1.service_workflow_pb2 import ( StartWorkflowExecutionRequest, StartWorkflowExecutionResponse, + SignalWithStartWorkflowExecutionRequest, + SignalWithStartWorkflowExecutionResponse, ) from cadence.api.v1.common_pb2 import WorkflowType, WorkflowExecution from cadence.api.v1.tasklist_pb2 import TaskList @@ -229,6 +231,72 @@ async def start_workflow( except Exception: raise + async def signal_with_start_workflow( + self, + workflow: Union[str, Callable], + signal_name: str, + signal_input: Any = None, + *args, + **options_kwargs: Unpack[StartWorkflowOptions], + ) -> WorkflowExecution: + """ + Signal a workflow execution, starting it if it is not already running. + + Args: + workflow: Workflow function or workflow type name string + signal_name: Name of the signal + signal_input: Input data for the signal + *args: Arguments to pass to the workflow if it needs to be started + **options_kwargs: StartWorkflowOptions as keyword arguments + + Returns: + WorkflowExecution with workflow_id and run_id + + Raises: + ValueError: If required parameters are missing or invalid + Exception: If the gRPC call fails + """ + # Convert kwargs to StartWorkflowOptions and validate + options = _validate_and_apply_defaults(StartWorkflowOptions(**options_kwargs)) + + # Build the start workflow request + start_request = await self._build_start_workflow_request(workflow, args, options) + + # Encode signal input + signal_payload = None + if signal_input is not None: + try: + signal_payload = await self.data_converter.to_data(signal_input) + except Exception as e: + raise ValueError(f"Failed to encode signal input: {e}") + + # Build the SignalWithStartWorkflowExecution request + request = SignalWithStartWorkflowExecutionRequest( + start_request=start_request, + signal_name=signal_name, + ) + + if signal_payload: + request.signal_input.CopyFrom(signal_payload) + + # Execute the gRPC call + try: + response: SignalWithStartWorkflowExecutionResponse = ( + await self.workflow_stub.SignalWithStartWorkflowExecution(request) + ) + + # Emit metrics if available + if self.metrics_emitter: + # TODO: Add metrics similar to Go client + pass + + execution = WorkflowExecution() + execution.workflow_id = start_request.workflow_id + execution.run_id = response.run_id + return execution + except Exception: + raise + def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions: if "target" not in options: diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py index 3f96b9b..ec4c35d 100644 --- a/tests/integration_tests/test_client.py +++ b/tests/integration_tests/test_client.py @@ -136,3 +136,86 @@ async def test_workflow_stub_start_and_describe(helper: CadenceHelper): assert task_timeout_seconds == task_timeout.total_seconds(), ( f"task_start_to_close_timeout mismatch: expected {task_timeout.total_seconds()}s, got {task_timeout_seconds}s" ) + +# trying parametrized test for table test +@pytest.mark.parametrize( + "test_case,workflow_id,start_first,expected_same_run", + [ + ( + "new_workflow", + "test-workflow-signal-with-start-123", + False, + False, + ), + ( + "existing_workflow", + "test-workflow-signal-existing-456", + True, + True, + ), + ], +) +@pytest.mark.usefixtures("helper") +async def test_signal_with_start_workflow( + helper: CadenceHelper, + test_case: str, + workflow_id: str, + start_first: bool, + expected_same_run: bool, +): + """Test signal_with_start_workflow method. + + Test cases: + 1. new_workflow: SignalWithStartWorkflow starts a new workflow if it doesn't exist + 2. existing_workflow: SignalWithStartWorkflow signals existing workflow without restart + """ + async with helper.client() as client: + workflow_type = f"test-workflow-signal-{test_case}" + task_list_name = f"test-task-list-signal-{test_case}" + execution_timeout = timedelta(minutes=5) + signal_name = "test-signal" + signal_input = {"data": "test-signal-data"} + + first_run_id = None + if start_first: + first_execution = await client.start_workflow( + workflow_type, + task_list=task_list_name, + execution_start_to_close_timeout=execution_timeout, + workflow_id=workflow_id, + ) + first_run_id = first_execution.run_id + + execution = await client.signal_with_start_workflow( + workflow_type, + signal_name, + signal_input, + "arg1", + "arg2", + task_list=task_list_name, + execution_start_to_close_timeout=execution_timeout, + workflow_id=workflow_id, + ) + + assert execution is not None + assert execution.workflow_id == workflow_id + assert execution.run_id is not None + assert execution.run_id != "" + + if expected_same_run: + assert execution.run_id == first_run_id + + describe_request = DescribeWorkflowExecutionRequest( + domain=DOMAIN_NAME, + workflow_execution=WorkflowExecution( + workflow_id=execution.workflow_id, + run_id=execution.run_id, + ), + ) + + response = await client.workflow_stub.DescribeWorkflowExecution( + describe_request + ) + + assert response.workflow_execution_info.type.name == workflow_type + assert response.workflow_execution_info.task_list == task_list_name From f39f4032e7fa7fd0efa091fddba02f32196d838f Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 30 Oct 2025 13:24:07 -0700 Subject: [PATCH 2/4] format Signed-off-by: Tim Li --- cadence/client.py | 4 +++- tests/integration_tests/test_client.py | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cadence/client.py b/cadence/client.py index 75481ba..ee751f4 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -260,7 +260,9 @@ async def signal_with_start_workflow( options = _validate_and_apply_defaults(StartWorkflowOptions(**options_kwargs)) # Build the start workflow request - start_request = await self._build_start_workflow_request(workflow, args, options) + start_request = await self._build_start_workflow_request( + workflow, args, options + ) # Encode signal input signal_payload = None diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py index ec4c35d..cc2701e 100644 --- a/tests/integration_tests/test_client.py +++ b/tests/integration_tests/test_client.py @@ -137,6 +137,7 @@ async def test_workflow_stub_start_and_describe(helper: CadenceHelper): f"task_start_to_close_timeout mismatch: expected {task_timeout.total_seconds()}s, got {task_timeout_seconds}s" ) + # trying parametrized test for table test @pytest.mark.parametrize( "test_case,workflow_id,start_first,expected_same_run", From e50edde464e91a1c13b7ca1449770e0036ecfb8e Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 5 Nov 2025 13:02:07 -0800 Subject: [PATCH 3/4] respond to comments Signed-off-by: Tim Li --- cadence/client.py | 17 +++++++++-------- cadence/workflow.py | 6 ++++-- tests/cadence/test_client_workflow.py | 19 ++++++++++++++----- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/cadence/client.py b/cadence/client.py index ee751f4..0e63eb4 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -2,7 +2,7 @@ import socket import uuid from datetime import timedelta -from typing import TypedDict, Unpack, Any, cast, Union, Callable +from typing import TypedDict, Unpack, Any, cast, Union from grpc import ChannelCredentials, Compression from google.protobuf.duration_pb2 import Duration @@ -24,6 +24,7 @@ from cadence.api.v1.tasklist_pb2 import TaskList from cadence.data_converter import DataConverter, DefaultDataConverter from cadence.metrics import MetricsEmitter, NoOpMetricsEmitter +from cadence.workflow import WorkflowDefinition class StartWorkflowOptions(TypedDict, total=False): @@ -134,7 +135,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: async def _build_start_workflow_request( self, - workflow: Union[str, Callable], + workflow: Union[str, WorkflowDefinition], args: tuple[Any, ...], options: StartWorkflowOptions, ) -> StartWorkflowExecutionRequest: @@ -146,8 +147,8 @@ async def _build_start_workflow_request( if isinstance(workflow, str): workflow_type_name = workflow else: - # For callable, use function name or __name__ attribute - workflow_type_name = getattr(workflow, "__name__", str(workflow)) + # For WorkflowDefinition, use the name property + workflow_type_name = workflow.name # Encode input arguments input_payload = None @@ -188,7 +189,7 @@ async def _build_start_workflow_request( async def start_workflow( self, - workflow: Union[str, Callable], + workflow: Union[str, WorkflowDefinition], *args, **options_kwargs: Unpack[StartWorkflowOptions], ) -> WorkflowExecution: @@ -196,7 +197,7 @@ async def start_workflow( Start a workflow execution asynchronously. Args: - workflow: Workflow function or workflow type name string + workflow: WorkflowDefinition or workflow type name string *args: Arguments to pass to the workflow **options_kwargs: StartWorkflowOptions as keyword arguments @@ -233,7 +234,7 @@ async def start_workflow( async def signal_with_start_workflow( self, - workflow: Union[str, Callable], + workflow: Union[str, WorkflowDefinition], signal_name: str, signal_input: Any = None, *args, @@ -243,7 +244,7 @@ async def signal_with_start_workflow( Signal a workflow execution, starting it if it is not already running. Args: - workflow: Workflow function or workflow type name string + workflow: WorkflowDefinition or workflow type name string signal_name: Name of the signal signal_input: Input data for the signal *args: Arguments to pass to the workflow if it needs to be started diff --git a/cadence/workflow.py b/cadence/workflow.py index 0e346ea..2d3963f 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -12,10 +12,12 @@ Any, Optional, Union, + TYPE_CHECKING, ) import inspect -from cadence.client import Client +if TYPE_CHECKING: + from cadence.client import Client T = TypeVar("T", bound=Callable[..., Any]) @@ -152,7 +154,7 @@ class WorkflowContext(ABC): def info(self) -> WorkflowInfo: ... @abstractmethod - def client(self) -> Client: ... + def client(self) -> "Client": ... @contextmanager def _activate(self) -> Iterator[None]: diff --git a/tests/cadence/test_client_workflow.py b/tests/cadence/test_client_workflow.py index 9f6a7c3..3e9c766 100644 --- a/tests/cadence/test_client_workflow.py +++ b/tests/cadence/test_client_workflow.py @@ -10,6 +10,7 @@ ) from cadence.client import Client, StartWorkflowOptions, _validate_and_apply_defaults from cadence.data_converter import DefaultDataConverter +from cadence.workflow import WorkflowDefinition, WorkflowDefinitionOptions @pytest.fixture @@ -96,11 +97,17 @@ async def test_build_request_with_string_workflow(self, mock_client): uuid.UUID(request.request_id) # This will raise if not valid UUID @pytest.mark.asyncio - async def test_build_request_with_callable_workflow(self, mock_client): - """Test building request with callable workflow.""" + async def test_build_request_with_workflow_definition(self, mock_client): + """Test building request with WorkflowDefinition.""" + from cadence import workflow - def test_workflow(): - pass + class TestWorkflow: + @workflow.run + async def run(self): + pass + + workflow_opts = WorkflowDefinitionOptions(name="test_workflow") + workflow_definition = WorkflowDefinition.wrap(TestWorkflow, workflow_opts) client = Client(domain="test-domain", target="localhost:7933") @@ -110,7 +117,9 @@ def test_workflow(): task_start_to_close_timeout=timedelta(seconds=30), ) - request = await client._build_start_workflow_request(test_workflow, (), options) + request = await client._build_start_workflow_request( + workflow_definition, (), options + ) assert request.workflow_type.name == "test_workflow" From d42e22e32e320c497eb707720912f3a2f7329e8f Mon Sep 17 00:00:00 2001 From: Tim Li Date: Wed, 5 Nov 2025 13:18:55 -0800 Subject: [PATCH 4/4] fix linter and integration tests Signed-off-by: Tim Li --- cadence/client.py | 6 ++---- cadence/workflow.py | 1 + .../_internal/workflow/test_workflow_engine_integration.py | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/cadence/client.py b/cadence/client.py index 052bdc2..ef140fd 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -261,15 +261,13 @@ async def signal_with_start_workflow( options = _validate_and_apply_defaults(StartWorkflowOptions(**options_kwargs)) # Build the start workflow request - start_request = await self._build_start_workflow_request( - workflow, args, options - ) + start_request = self._build_start_workflow_request(workflow, args, options) # Encode signal input signal_payload = None if signal_input is not None: try: - signal_payload = await self.data_converter.to_data(signal_input) + signal_payload = self.data_converter.to_data([signal_input]) except Exception as e: raise ValueError(f"Failed to encode signal input: {e}") diff --git a/cadence/workflow.py b/cadence/workflow.py index a32b694..de8791e 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -44,6 +44,7 @@ async def execute_activity( activity, result_type, *args, **kwargs ) + T = TypeVar("T", bound=Callable[..., Any]) diff --git a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py index 3aa5d44..c40a1e6 100644 --- a/tests/cadence/_internal/workflow/test_workflow_engine_integration.py +++ b/tests/cadence/_internal/workflow/test_workflow_engine_integration.py @@ -4,7 +4,7 @@ """ import pytest -from unittest.mock import Mock, AsyncMock, patch +from unittest.mock import Mock, patch from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse from cadence.api.v1.common_pb2 import Payload, WorkflowExecution, WorkflowType from cadence.api.v1.history_pb2 import ( @@ -244,7 +244,7 @@ async def test_extract_workflow_input_deserialization_error( decision_task = self.create_mock_decision_task() # Mock data converter to raise an exception - mock_client.data_converter.from_data = AsyncMock( + mock_client.data_converter.from_data = Mock( side_effect=Exception("Deserialization error") )