Skip to content

Commit 2ecaf83

Browse files
committed
lint
Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
1 parent ebb3319 commit 2ecaf83

File tree

3 files changed

+60
-42
lines changed

3 files changed

+60
-42
lines changed

cadence/_internal/workflow/decision_events_iterator.py

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
particularly focusing on decision-related events for replay and execution.
77
"""
88

9-
from dataclasses import dataclass, field
10-
from typing import Iterator, List, Optional, Tuple
9+
from dataclasses import dataclass
10+
from typing import Iterator, List, Optional
1111

1212
from cadence._internal.workflow.history_event_iterator import HistoryEventsIterator
1313
from cadence.api.v1.history_pb2 import HistoryEvent
@@ -43,7 +43,8 @@ class DecisionEventsIterator(Iterator[DecisionEvents]):
4343
"""
4444

4545
def __init__(
46-
self, decision_task: PollForDecisionTaskResponse,
46+
self,
47+
decision_task: PollForDecisionTaskResponse,
4748
events: List[HistoryEvent],
4849
):
4950
self._decision_task = decision_task
@@ -55,7 +56,6 @@ def __iter__(self):
5556
return self
5657

5758
def __next__(self) -> DecisionEvents:
58-
5959
"""
6060
Process the next decision batch.
6161
1. Find the next valid decision task started event during replay or last scheduled decision task events for non-replay
@@ -73,12 +73,15 @@ def __next__(self) -> DecisionEvents:
7373
next_event = self._events.peek()
7474

7575
# latest event, not replay, assign started event as decision event insteaad
76-
if next_event == None:
76+
if next_event is None:
7777
decision_event = event
7878
break
7979

8080
match next_event.WhichOneof("attributes"):
81-
case "decision_task_failed_event_attributes" | "decision_task_timed_out_event_attributes":
81+
case (
82+
"decision_task_failed_event_attributes"
83+
| "decision_task_timed_out_event_attributes"
84+
):
8285
# skip failed / timed out decision tasks and continue searching
8386
next(self._events)
8487
continue
@@ -87,25 +90,31 @@ def __next__(self) -> DecisionEvents:
8790
decision_event = next(self._events)
8891
break
8992
case _:
90-
raise ValueError(f"unexpected event type after decision task started event: {next_event}")
93+
raise ValueError(
94+
f"unexpected event type after decision task started event: {next_event}"
95+
)
9196

9297
case _:
9398
decision_input_events.append(event)
9499

95100
if not decision_event:
96-
raise StopIteration(f"no decision event found")
101+
raise StopIteration("no decision event found")
97102

98103
# collect decision output events
99104
while self._events.has_next():
100-
if not is_decision_event(self._events.peek()):
105+
nxt = self._events.peek() if self._events.has_next() else None
106+
if nxt and not is_decision_event(nxt):
101107
break
102108
decision_output_events.append(next(self._events))
103109

104110
replay_current_time_milliseconds = decision_event.event_time.ToMilliseconds()
105111

106-
replay : bool
107-
next_decision_event_id : int
108-
if decision_event.WhichOneof("attributes") == "decision_task_completed_event_attributes": # completed decision task
112+
replay: bool
113+
next_decision_event_id: int
114+
if (
115+
decision_event.WhichOneof("attributes")
116+
== "decision_task_completed_event_attributes"
117+
): # completed decision task
109118
replay = True
110119
next_decision_event_id = decision_event.event_id + 1
111120
else:
@@ -124,25 +133,32 @@ def __next__(self) -> DecisionEvents:
124133
next_decision_event_id=next_decision_event_id,
125134
)
126135

136+
127137
def is_decision_event(event: HistoryEvent) -> bool:
128138
"""Check if an event is a decision output event."""
129-
return event != None and event.WhichOneof("attributes") in set([
130-
"activity_task_scheduled_event_attributes",
131-
"start_child_workflow_execution_initiated_event_attributes",
132-
"timer_started_event_attributes",
133-
"workflow_execution_completed_event_attributes",
134-
"workflow_execution_failed_event_attributes",
135-
"workflow_execution_canceled_event_attributes",
136-
"workflow_execution_continued_as_new_event_attributes",
137-
"activity_task_cancel_requested_event_attributes",
138-
"request_cancel_activity_task_failed_event_attributes",
139-
"timer_canceled_event_attributes",
140-
"cancel_timer_failed_event_attributes",
141-
"request_cancel_external_workflow_execution_initiated_event_attributes",
142-
"marker_recorded_event_attributes",
143-
"signal_external_workflow_execution_initiated_event_attributes",
144-
"upsert_workflow_search_attributes_event_attributes",
145-
])
139+
return event is not None and event.WhichOneof("attributes") in set(
140+
[
141+
"activity_task_scheduled_event_attributes",
142+
"start_child_workflow_execution_initiated_event_attributes",
143+
"timer_started_event_attributes",
144+
"workflow_execution_completed_event_attributes",
145+
"workflow_execution_failed_event_attributes",
146+
"workflow_execution_canceled_event_attributes",
147+
"workflow_execution_continued_as_new_event_attributes",
148+
"activity_task_cancel_requested_event_attributes",
149+
"request_cancel_activity_task_failed_event_attributes",
150+
"timer_canceled_event_attributes",
151+
"cancel_timer_failed_event_attributes",
152+
"request_cancel_external_workflow_execution_initiated_event_attributes",
153+
"marker_recorded_event_attributes",
154+
"signal_external_workflow_execution_initiated_event_attributes",
155+
"upsert_workflow_search_attributes_event_attributes",
156+
]
157+
)
158+
146159

147160
def is_marker_event(event: HistoryEvent) -> bool:
148-
return event != None and event.WhichOneof("attributes") == "marker_recorded_event_attributes"
161+
return bool(
162+
event is not None
163+
and event.WhichOneof("attributes") == "marker_recorded_event_attributes"
164+
)

cadence/_internal/workflow/history_event_iterator.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ async def iterate_history_events(
3535
current_page = response.history.events
3636
next_page_token = response.next_page_token
3737

38+
3839
class HistoryEventsIterator(Iterator[HistoryEvent]):
3940
def __init__(self, events: List[HistoryEvent]):
4041
self._iter = iter(events)

tests/cadence/_internal/workflow/test_decision_events_iterator.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,28 @@
66
import pytest
77
from typing import List
88

9-
from cadence.api.v1.history_pb2 import HistoryEvent, History, WorkflowExecutionStartedEventAttributes
9+
from cadence.api.v1.history_pb2 import HistoryEvent, History
1010
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskResponse
1111
from cadence.api.v1.common_pb2 import WorkflowExecution
12-
from google.protobuf.timestamp_pb2 import Timestamp
1312

1413
from cadence._internal.workflow.decision_events_iterator import (
1514
DecisionEventsIterator,
1615
)
1716

17+
1818
class TestDecisionEventsIterator:
1919
"""Test the DecisionEventsIterator class."""
2020

21-
2221
@pytest.mark.parametrize(
2322
"name, event_types, expected",
2423
[
2524
(
2625
"workflow_started",
27-
["workflow_execution_started", "decision_task_scheduled", "decision_task_started"],
26+
[
27+
"workflow_execution_started",
28+
"decision_task_scheduled",
29+
"decision_task_started",
30+
],
2831
[
2932
{
3033
"input": 2,
@@ -34,7 +37,7 @@ class TestDecisionEventsIterator:
3437
"replay_time": 3000,
3538
"next_decision_event_id": 5,
3639
},
37-
]
40+
],
3841
),
3942
(
4043
"workflow_with_activity_scheduled",
@@ -54,7 +57,7 @@ class TestDecisionEventsIterator:
5457
"replay_time": 4000,
5558
"next_decision_event_id": 5,
5659
},
57-
]
60+
],
5861
),
5962
(
6063
"workflow_with_activity_completed",
@@ -86,8 +89,8 @@ class TestDecisionEventsIterator:
8689
"replay_time": 9000,
8790
"next_decision_event_id": 11,
8891
},
89-
]
90-
)
92+
],
93+
),
9194
],
9295
)
9396
def test_successful_cases(self, name, event_types, expected):
@@ -106,15 +109,13 @@ def test_successful_cases(self, name, event_types, expected):
106109
assert batch.replay_current_time_milliseconds == expect["replay_time"]
107110
assert batch.next_decision_event_id == expect["next_decision_event_id"]
108111

109-
def create_mock_history_event(
110-
event_types: List[str]
111-
) -> List[HistoryEvent]:
112112

113+
def create_mock_history_event(event_types: List[str]) -> List[HistoryEvent]:
113114
events = []
114115
for i, event_type in enumerate(event_types):
115116
event = HistoryEvent()
116117
event.event_id = i + 1
117-
event.event_time.FromMilliseconds((i+1) * 1000)
118+
event.event_time.FromMilliseconds((i + 1) * 1000)
118119

119120
# Set the appropriate attribute based on event type
120121
if event_type == "decision_task_started":

0 commit comments

Comments
 (0)