3535
3636from scheduler .helpers .queues import Queue , perform_job
3737from scheduler .helpers .timeouts import JobExecutionMonitorTimeoutException , JobTimeoutException
38- from scheduler .helpers .utils import utcnow , current_timestamp
38+ from scheduler .helpers .utils import utcnow
3939
4040try :
4141 from setproctitle import setproctitle as setprocname
@@ -96,16 +96,16 @@ def from_model(cls, model: WorkerModel) -> Self:
9696 return res
9797
9898 def __init__ (
99- self ,
100- queues : Iterable [Union [str , Queue ]],
101- name : str ,
102- maintenance_interval : int = SCHEDULER_CONFIG .DEFAULT_MAINTENANCE_TASK_INTERVAL ,
103- job_monitoring_interval : int = SCHEDULER_CONFIG .DEFAULT_JOB_MONITORING_INTERVAL ,
104- dequeue_strategy : DequeueStrategy = DequeueStrategy .DEFAULT ,
105- fork_job_execution : bool = True ,
106- with_scheduler : bool = True ,
107- burst : bool = False ,
108- model : Optional [WorkerModel ] = None ,
99+ self ,
100+ queues : Iterable [Union [str , Queue ]],
101+ name : str ,
102+ maintenance_interval : int = SCHEDULER_CONFIG .DEFAULT_MAINTENANCE_TASK_INTERVAL ,
103+ job_monitoring_interval : int = SCHEDULER_CONFIG .DEFAULT_JOB_MONITORING_INTERVAL ,
104+ dequeue_strategy : DequeueStrategy = DequeueStrategy .DEFAULT ,
105+ fork_job_execution : bool = True ,
106+ with_scheduler : bool = True ,
107+ burst : bool = False ,
108+ model : Optional [WorkerModel ] = None ,
109109 ) -> None :
110110 self .fork_job_execution = fork_job_execution
111111 self .job_monitoring_interval : int = job_monitoring_interval
@@ -352,7 +352,7 @@ def run_maintenance_tasks(self) -> None:
352352 self ._model .save (connection = self .connection )
353353
354354 def dequeue_job_and_maintain_ttl (
355- self , timeout : Optional [int ], max_idle_time : Optional [int ] = None
355+ self , timeout : Optional [int ], max_idle_time : Optional [int ] = None
356356 ) -> Tuple [Optional [JobModel ], Optional [Queue ]]:
357357 """Dequeues a job while maintaining the TTL.
358358 :param timeout: The timeout for the dequeue operation.
@@ -523,7 +523,7 @@ def reorder_queues(self, reference_queue: Queue) -> None:
523523 return
524524 if self ._dequeue_strategy == DequeueStrategy .ROUND_ROBIN :
525525 pos = self ._ordered_queues .index (reference_queue )
526- self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
526+ self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
527527 return
528528 if self ._dequeue_strategy == DequeueStrategy .RANDOM :
529529 shuffle (self ._ordered_queues )
@@ -607,7 +607,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
607607 while True :
608608 try :
609609 with SCHEDULER_CONFIG .DEATH_PENALTY_CLASS (
610- self .job_monitoring_interval , JobExecutionMonitorTimeoutException
610+ self .job_monitoring_interval , JobExecutionMonitorTimeoutException
611611 ):
612612 retpid , ret_val = self .wait_for_job_execution_process ()
613613 break
@@ -625,7 +625,7 @@ def monitor_job_execution_process(self, job: JobModel, queue: Queue) -> None:
625625 self .wait_for_job_execution_process ()
626626 break
627627
628- self .maintain_heartbeats ( job , queue )
628+ self ._model . heartbeat ( self . connection , self . job_monitoring_interval + 60 )
629629
630630 except OSError as e :
631631 # In case we encountered an OSError due to EINTR (which is
@@ -685,11 +685,6 @@ def execute_job(self, job: JobModel, queue: Queue) -> None:
685685 self .perform_job (job , queue )
686686 self ._model .set_field ("state" , WorkerStatus .IDLE , connection = self .connection )
687687
688- def maintain_heartbeats (self , job : JobModel , queue : Queue ) -> None :
689- """Updates worker and job's last heartbeat field."""
690- self ._model .heartbeat (self .connection , self .job_monitoring_interval + 60 )
691- ttl = self .get_heartbeat_ttl (job )
692-
693688 def execute_in_separate_process (self , job : JobModel , queue : Queue ) -> None :
694689 """This is the entry point of the newly spawned job execution process.
695690 After fork()'ing, assure we are generating random sequences that are different from the worker.
@@ -840,7 +835,7 @@ class RoundRobinWorker(Worker):
840835
841836 def reorder_queues (self , reference_queue : Queue ) -> None :
842837 pos = self ._ordered_queues .index (reference_queue )
843- self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
838+ self ._ordered_queues = self ._ordered_queues [pos + 1 :] + self ._ordered_queues [: pos + 1 ]
844839
845840
846841class RandomWorker (Worker ):
0 commit comments