Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions cadence/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from cadence.api.v1.service_workflow_pb2 import (
StartWorkflowExecutionRequest,
StartWorkflowExecutionResponse,
SignalWithStartWorkflowExecutionRequest,
SignalWithStartWorkflowExecutionResponse,
)
from cadence.api.v1.common_pb2 import WorkflowType, WorkflowExecution
from cadence.api.v1.tasklist_pb2 import TaskList
Expand Down Expand Up @@ -229,6 +231,74 @@ async def start_workflow(
except Exception:
raise

async def signal_with_start_workflow(
self,
workflow: Union[str, Callable],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we require workflow to be WorkflowDefinition?

Suggested change
workflow: Union[str, Callable],
workflow: Union[str, WorkflowDefinition],

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you

signal_name: str,
signal_input: Any = None,
*args,
**options_kwargs: Unpack[StartWorkflowOptions],
) -> WorkflowExecution:
"""
Signal a workflow execution, starting it if it is not already running.

Args:
workflow: Workflow function or workflow type name string
signal_name: Name of the signal
signal_input: Input data for the signal
*args: Arguments to pass to the workflow if it needs to be started
**options_kwargs: StartWorkflowOptions as keyword arguments

Returns:
WorkflowExecution with workflow_id and run_id

Raises:
ValueError: If required parameters are missing or invalid
Exception: If the gRPC call fails
"""
# Convert kwargs to StartWorkflowOptions and validate
options = _validate_and_apply_defaults(StartWorkflowOptions(**options_kwargs))

# Build the start workflow request
start_request = await self._build_start_workflow_request(
workflow, args, options
)

# Encode signal input
signal_payload = None
if signal_input is not None:
try:
signal_payload = await self.data_converter.to_data(signal_input)
except Exception as e:
raise ValueError(f"Failed to encode signal input: {e}")

# Build the SignalWithStartWorkflowExecution request
request = SignalWithStartWorkflowExecutionRequest(
start_request=start_request,
signal_name=signal_name,
)

if signal_payload:
request.signal_input.CopyFrom(signal_payload)

# Execute the gRPC call
try:
response: SignalWithStartWorkflowExecutionResponse = (
await self.workflow_stub.SignalWithStartWorkflowExecution(request)
)

# Emit metrics if available
if self.metrics_emitter:
# TODO: Add metrics similar to Go client
pass

execution = WorkflowExecution()
execution.workflow_id = start_request.workflow_id
execution.run_id = response.run_id
return execution
except Exception:
raise


def _validate_and_copy_defaults(options: ClientOptions) -> ClientOptions:
if "target" not in options:
Expand Down
84 changes: 84 additions & 0 deletions tests/integration_tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,87 @@ async def test_workflow_stub_start_and_describe(helper: CadenceHelper):
assert task_timeout_seconds == task_timeout.total_seconds(), (
f"task_start_to_close_timeout mismatch: expected {task_timeout.total_seconds()}s, got {task_timeout_seconds}s"
)


# trying parametrized test for table test
@pytest.mark.parametrize(
"test_case,workflow_id,start_first,expected_same_run",
[
(
"new_workflow",
"test-workflow-signal-with-start-123",
False,
False,
),
(
"existing_workflow",
"test-workflow-signal-existing-456",
True,
True,
),
],
)
@pytest.mark.usefixtures("helper")
async def test_signal_with_start_workflow(
helper: CadenceHelper,
test_case: str,
workflow_id: str,
start_first: bool,
expected_same_run: bool,
):
"""Test signal_with_start_workflow method.

Test cases:
1. new_workflow: SignalWithStartWorkflow starts a new workflow if it doesn't exist
2. existing_workflow: SignalWithStartWorkflow signals existing workflow without restart
"""
async with helper.client() as client:
workflow_type = f"test-workflow-signal-{test_case}"
task_list_name = f"test-task-list-signal-{test_case}"
execution_timeout = timedelta(minutes=5)
signal_name = "test-signal"
signal_input = {"data": "test-signal-data"}

first_run_id = None
if start_first:
first_execution = await client.start_workflow(
workflow_type,
task_list=task_list_name,
execution_start_to_close_timeout=execution_timeout,
workflow_id=workflow_id,
)
first_run_id = first_execution.run_id

execution = await client.signal_with_start_workflow(
workflow_type,
signal_name,
signal_input,
"arg1",
"arg2",
task_list=task_list_name,
execution_start_to_close_timeout=execution_timeout,
workflow_id=workflow_id,
)

assert execution is not None
assert execution.workflow_id == workflow_id
assert execution.run_id is not None
assert execution.run_id != ""

if expected_same_run:
assert execution.run_id == first_run_id

describe_request = DescribeWorkflowExecutionRequest(
domain=DOMAIN_NAME,
workflow_execution=WorkflowExecution(
workflow_id=execution.workflow_id,
run_id=execution.run_id,
),
)

response = await client.workflow_stub.DescribeWorkflowExecution(
describe_request
)

assert response.workflow_execution_info.type.name == workflow_type
assert response.workflow_execution_info.task_list == task_list_name
Loading