Skip to content

Commit 7aed606

Browse files
committed
Reorganize ephemeral fields and status output
- Add ConditionGuard and StatusFlagGuard decorators for more precise control flow - Replace boolean flags with Status enum flags for better state tracking - Move sleep/retry logic from composite into UpdateWorkflowStatusUntilCompletion behavior for better control - Improve status display with structured workflow step information - Save and reset ephemeral status flags instead of discarding them, which allows to show last run status in details
1 parent 18b8c5b commit 7aed606

File tree

7 files changed

+429
-175
lines changed

7 files changed

+429
-175
lines changed

src/redis_release/bht/behaviours.py

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,6 @@ def initialise(self) -> None:
264264
return
265265
self.workflow.inputs["release_tag"] = self.release_meta.tag
266266
ref = self.package_meta.ref if self.package_meta.ref is not None else "main"
267-
self.workflow.ephemeral.trigger_attempted = True
268267
if self.log_once(
269268
"workflow_trigger_start", self.workflow.ephemeral.log_once_flags
270269
):
@@ -366,21 +365,40 @@ def update(self) -> Status:
366365
return self.log_exception_and_return_failure(e)
367366

368367

369-
class UpdateWorkflowStatus(ReleaseAction):
368+
class UpdateWorkflowStatusUntilCompletion(ReleaseAction):
370369
def __init__(
371370
self,
372371
name: str,
373372
workflow: Workflow,
374373
github_client: GitHubClientAsync,
375374
package_meta: PackageMeta,
376375
log_prefix: str = "",
376+
timeout_seconds: int = 0,
377+
cutoff: int = 0,
378+
poll_interval: int = 3,
377379
) -> None:
378380
self.github_client = github_client
379381
self.workflow = workflow
380382
self.package_meta = package_meta
383+
self.timeout_seconds = timeout_seconds
384+
self.cutoff = cutoff
385+
self.interval = poll_interval
386+
self.start_time: Optional[float] = None
387+
self.tick_count: int = 0
388+
self.is_sleeping: bool = False
381389
super().__init__(name=name, log_prefix=log_prefix)
382390

383391
def initialise(self) -> None:
392+
self.logger.debug(
393+
f"Initialise: timeout: {self.timeout_seconds}, cutoff: {self.cutoff}, interval: {self.interval}"
394+
)
395+
self.start_time = asyncio.get_event_loop().time()
396+
self.is_sleeping = False
397+
self.tick_count = 0
398+
self.feedback_message = ""
399+
self._initialise_status_task()
400+
401+
def _initialise_status_task(self) -> None:
384402
if self.workflow.run_id is None:
385403
self.logger.error(
386404
"[red]Workflow run_id is None - cannot check completion[/red]"
@@ -398,6 +416,11 @@ def initialise(self) -> None:
398416
self.package_meta.repo, self.workflow.run_id
399417
)
400418
)
419+
self.is_sleeping = False
420+
421+
def _initialise_sleep_task(self) -> None:
422+
self.task = asyncio.create_task(asyncio.sleep(self.interval))
423+
self.is_sleeping = True
401424

402425
def update(self) -> Status:
403426
try:
@@ -406,7 +429,15 @@ def update(self) -> Status:
406429
if not self.task.done():
407430
return Status.RUNNING
408431

432+
# If we just finished sleeping, switch back to status request
433+
if self.is_sleeping:
434+
self._initialise_status_task()
435+
return Status.RUNNING
436+
437+
# We just finished a status request
409438
result = self.task.result()
439+
self.tick_count += 1
440+
410441
if self.log_once(
411442
"workflow_status_current", self.workflow.ephemeral.log_once_flags
412443
):
@@ -426,10 +457,47 @@ def update(self) -> Status:
426457
self.feedback_message = (
427458
f" {self.workflow.status}, {self.workflow.conclusion}"
428459
)
429-
return Status.SUCCESS
460+
461+
if self.workflow.conclusion is not None:
462+
if self.workflow.conclusion == WorkflowConclusion.SUCCESS:
463+
return Status.SUCCESS
464+
self.feedback_message = f"Workflow failed"
465+
return Status.FAILURE
466+
467+
# Check cutoff (0 means no limit)
468+
if self.cutoff > 0 and self.tick_count >= self.cutoff:
469+
self.logger.debug(f"Cutoff reached: {self.tick_count} ticks")
470+
self.feedback_message = f"Cutoff reached: {self.tick_count}"
471+
return Status.FAILURE
472+
473+
# Check timeout (0 means no limit)
474+
if self.timeout_seconds > 0 and self.start_time is not None:
475+
elapsed = asyncio.get_event_loop().time() - self.start_time
476+
self.feedback_message = (
477+
f"{self.feedback_message}, elapsed: {elapsed:.1f}s"
478+
)
479+
if elapsed >= self.timeout_seconds:
480+
self.logger.debug(f"Timeout reached: {elapsed:.1f}s")
481+
self.feedback_message = (
482+
f"Timed out: {elapsed:.1f}s of {self.timeout_seconds}s"
483+
)
484+
return Status.FAILURE
485+
486+
# Switch to sleep task
487+
self._initialise_sleep_task()
488+
return Status.RUNNING
489+
430490
except Exception as e:
431491
return self.log_exception_and_return_failure(e)
432492

493+
def terminate(self, new_status: Status) -> None:
494+
"""Cancel the current task if it's running."""
495+
if self.task is not None and not self.task.done():
496+
self.task.cancel()
497+
self.logger.debug(
498+
f"Cancelled task during terminate with status: {new_status}"
499+
)
500+
433501

434502
class Sleep(LoggingAction):
435503

src/redis_release/bht/composites.py

Lines changed: 32 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
Sleep,
3131
)
3232
from .behaviours import TriggerWorkflow as TriggerWorkflow
33-
from .behaviours import UpdateWorkflowStatus
34-
from .decorators import FlagGuard
33+
from .behaviours import UpdateWorkflowStatusUntilCompletion
34+
from .decorators import ConditionGuard, FlagGuard, StatusFlagGuard
3535
from .state import Package, PackageMeta, ReleaseMeta, Workflow
3636

3737

@@ -110,7 +110,7 @@ def tick(self) -> Iterator[Behaviour]:
110110
yield self
111111

112112

113-
class FindWorkflowByUUID(FlagGuard):
113+
class FindWorkflowByUUID(StatusFlagGuard):
114114
max_retries: int = 3
115115
poll_interval: int = 5
116116

@@ -145,12 +145,12 @@ def __init__(
145145
None if name == "" else name,
146146
identify_loop,
147147
workflow.ephemeral,
148-
"identify_failed",
148+
"identify_workflow",
149149
log_prefix=log_prefix,
150150
)
151151

152152

153-
class WaitForWorkflowCompletion(FlagGuard):
153+
class WaitForWorkflowCompletion(StatusFlagGuard):
154154
poll_interval: int
155155
timeout_seconds: int
156156

@@ -166,36 +166,26 @@ def __init__(
166166
self.poll_interval = poll_interval
167167
self.timeout_seconds = workflow.timeout_minutes * 60
168168

169-
update_workflow_status = UpdateWorkflowStatus(
170-
"Update Workflow Status",
169+
update_workflow_status = UpdateWorkflowStatusUntilCompletion(
170+
"Update status until completion",
171171
workflow,
172172
github_client,
173173
package_meta,
174174
log_prefix=log_prefix,
175+
timeout_seconds=self.timeout_seconds,
176+
poll_interval=self.poll_interval,
175177
)
176-
update_workflow_status_with_pause = Sequence(
177-
"Update Workflow Status with Pause",
178-
memory=True,
179-
children=[
180-
Sleep("Sleep", self.poll_interval, log_prefix=log_prefix),
181-
update_workflow_status,
182-
],
183-
)
184-
185178
super().__init__(
186179
None,
187-
Timeout(
188-
f"Timeout {workflow.timeout_minutes}m",
189-
Repeat("Repeat", update_workflow_status_with_pause, -1),
190-
self.timeout_seconds,
191-
),
180+
update_workflow_status,
192181
workflow.ephemeral,
193-
"timed_out",
182+
"wait_for_completion",
183+
"wait_for_completion_message",
194184
log_prefix=log_prefix,
195185
)
196186

197187

198-
class TriggerWorkflowGuarded(FlagGuard):
188+
class TriggerWorkflowGuarded(StatusFlagGuard):
199189
def __init__(
200190
self,
201191
name: str,
@@ -217,12 +207,12 @@ def __init__(
217207
None if name == "" else name,
218208
trigger_workflow,
219209
workflow.ephemeral,
220-
"trigger_failed",
210+
"trigger_workflow",
221211
log_prefix=log_prefix,
222212
)
223213

224214

225-
class IdentifyTargetRefGuarded(FlagGuard):
215+
class IdentifyTargetRefGuarded(StatusFlagGuard):
226216
def __init__(
227217
self,
228218
name: str,
@@ -241,12 +231,12 @@ def __init__(
241231
log_prefix=log_prefix,
242232
),
243233
package_meta.ephemeral,
244-
"identify_ref_failed",
234+
"identify_ref",
245235
log_prefix=log_prefix,
246236
)
247237

248238

249-
class DownloadArtifactsListGuarded(FlagGuard):
239+
class DownloadArtifactsListGuarded(StatusFlagGuard):
250240
def __init__(
251241
self,
252242
name: str,
@@ -265,12 +255,12 @@ def __init__(
265255
log_prefix=log_prefix,
266256
),
267257
workflow.ephemeral,
268-
"artifacts_download_failed",
258+
"download_artifacts",
269259
log_prefix=log_prefix,
270260
)
271261

272262

273-
class ExtractArtifactResultGuarded(FlagGuard):
263+
class ExtractArtifactResultGuarded(StatusFlagGuard):
274264
def __init__(
275265
self,
276266
name: str,
@@ -291,7 +281,7 @@ def __init__(
291281
log_prefix=log_prefix,
292282
),
293283
workflow.ephemeral,
294-
"extract_result_failed",
284+
"extract_artifact_result",
295285
log_prefix=log_prefix,
296286
)
297287

@@ -326,7 +316,7 @@ def __init__(
326316
)
327317

328318

329-
class RestartPackageGuarded(FlagGuard):
319+
class RestartPackageGuarded(ConditionGuard):
330320
"""
331321
Reset package if we didn't trigger the workflow in current run
332322
This is intended to be used for build workflow since if build has failed
@@ -353,29 +343,18 @@ def __init__(
353343
reset_package_state_running = SuccessIsRunning(
354344
"Success is Running", reset_package_state
355345
)
356-
reset_package_state_guarded = FlagGuard(
357-
None if name == "" else name,
358-
reset_package_state_running,
359-
package.meta.ephemeral,
360-
"identify_ref_failed",
361-
flag_value=True,
362-
raise_on=[],
363-
guard_status=Status.FAILURE,
364-
log_prefix=log_prefix,
365-
)
346+
366347
super().__init__(
367-
None if name == "" else name,
368-
reset_package_state_guarded,
369-
workflow.ephemeral,
370-
"trigger_attempted",
371-
flag_value=True,
372-
raise_on=[],
348+
name,
349+
condition=lambda: workflow.ephemeral.trigger_workflow is not None
350+
or package.meta.ref is None,
351+
child=reset_package_state_running,
373352
guard_status=Status.FAILURE,
374353
log_prefix=log_prefix,
375354
)
376355

377356

378-
class RestartWorkflowGuarded(FlagGuard):
357+
class RestartWorkflowGuarded(ConditionGuard):
379358
"""
380359
Reset workflow if we didn't trigger the workflow in current run and if there was no identify target ref error
381360
@@ -401,23 +380,12 @@ def __init__(
401380
reset_workflow_state_running = SuccessIsRunning(
402381
"Success is Running", reset_workflow_state
403382
)
404-
reset_workflow_state_guarded = FlagGuard(
405-
None if name == "" else name,
406-
reset_workflow_state_running,
407-
package_meta.ephemeral,
408-
"identify_ref_failed",
409-
flag_value=True,
410-
raise_on=[],
411-
guard_status=Status.FAILURE,
412-
log_prefix=log_prefix,
413-
)
383+
414384
super().__init__(
415-
None if name == "" else name,
416-
reset_workflow_state_guarded,
417-
workflow.ephemeral,
418-
"trigger_attempted",
419-
flag_value=True,
420-
raise_on=[],
385+
name,
386+
condition=lambda: workflow.ephemeral.trigger_workflow is not None
387+
or package_meta.ref is None,
388+
child=reset_workflow_state_running,
421389
guard_status=Status.FAILURE,
422390
log_prefix=log_prefix,
423391
)

0 commit comments

Comments
 (0)