From e492db258d8c80db413368f430ddf9f42f391461 Mon Sep 17 00:00:00 2001 From: Tim Li Date: Thu, 30 Oct 2025 15:46:01 -0700 Subject: [PATCH] feat: Implement signal_workflow method Signed-off-by: Tim Li --- cadence/client.py | 46 ++++++++++++++++++++++++ tests/integration_tests/test_client.py | 49 ++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/cadence/client.py b/cadence/client.py index dcad7f3..323dcdb 100644 --- a/cadence/client.py +++ b/cadence/client.py @@ -15,6 +15,7 @@ from grpc.aio import Channel, ClientInterceptor, secure_channel, insecure_channel from cadence.api.v1.service_workflow_pb2_grpc import WorkflowAPIStub from cadence.api.v1.service_workflow_pb2 import ( + SignalWorkflowExecutionRequest, StartWorkflowExecutionRequest, StartWorkflowExecutionResponse, ) @@ -229,6 +230,51 @@ async def start_workflow( except Exception: raise + async def signal_workflow( + self, + workflow_id: str, + run_id: str, + signal_name: str, + signal_input: Any = None, + ) -> None: + """ + Send a signal to a running workflow execution. + + Args: + workflow_id: The workflow ID + run_id: The run ID (can be empty string to signal current run) + signal_name: Name of the signal + signal_input: Input data for the signal + + Raises: + ValueError: If signal encoding fails + Exception: If the gRPC call fails + """ + signal_payload = None + if signal_input is not None: + try: + signal_payload = await self.data_converter.to_data(signal_input) + except Exception as e: + raise ValueError(f"Failed to encode signal input: {e}") + + workflow_execution = WorkflowExecution() + workflow_execution.workflow_id = workflow_id + if run_id: + workflow_execution.run_id = run_id + + signal_request = SignalWorkflowExecutionRequest( + domain=self.domain, + workflow_execution=workflow_execution, + identity=self.identity, + request_id=str(uuid.uuid4()), + signal_name=signal_name, + ) + + if signal_payload: + signal_request.signal_input.CopyFrom(signal_payload) + + await self.workflow_stub.SignalWorkflowExecution(signal_request) + def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions: if "target" not in options: diff --git a/tests/integration_tests/test_client.py b/tests/integration_tests/test_client.py index 3f96b9b..8bb0e02 100644 --- a/tests/integration_tests/test_client.py +++ b/tests/integration_tests/test_client.py @@ -136,3 +136,52 @@ async def test_workflow_stub_start_and_describe(helper: CadenceHelper): assert task_timeout_seconds == task_timeout.total_seconds(), ( f"task_start_to_close_timeout mismatch: expected {task_timeout.total_seconds()}s, got {task_timeout_seconds}s" ) + + +@pytest.mark.usefixtures("helper") +async def test_signal_workflow(helper: CadenceHelper): + """Test signal_workflow method. + + This integration test verifies: + 1. Starting a workflow execution + 2. Sending a signal to the running workflow + 3. Signal is accepted (no errors thrown) + """ + async with helper.client() as client: + workflow_type = "test-workflow-signal" + task_list_name = "test-task-list-signal" + workflow_id = "test-workflow-signal-789" + execution_timeout = timedelta(minutes=5) + signal_name = "test-signal" + signal_input = {"action": "update", "value": 42} + + execution = await client.start_workflow( + workflow_type, + task_list=task_list_name, + execution_start_to_close_timeout=execution_timeout, + workflow_id=workflow_id, + ) + + await client.signal_workflow( + workflow_id=execution.workflow_id, + run_id=execution.run_id, + signal_name=signal_name, + signal_input=signal_input, + ) + + describe_request = DescribeWorkflowExecutionRequest( + domain=DOMAIN_NAME, + workflow_execution=WorkflowExecution( + workflow_id=execution.workflow_id, + run_id=execution.run_id, + ), + ) + + response = await client.workflow_stub.DescribeWorkflowExecution( + describe_request + ) + + assert ( + response.workflow_execution_info.workflow_execution.workflow_id + == workflow_id + )