2222from multiprocessing .managers import SyncManager
2323from multiprocessing .synchronize import Event as ProcessingEvent
2424from threading import Event as ThreadingEvent
25- from typing import Any , Callable , cast , Generic , List , Protocol , TypeVar
25+ from typing import Any , Callable , Generic , Protocol , TypeVar , cast
2626
2727import culsans
2828from pydantic import BaseModel
5050
5151CheckStopCallableT = Callable [[bool , int ], bool ]
5252
53+
5354class MessagingStopCallback (Protocol ):
5455 """Protocol for evaluating stop conditions in messaging operations."""
5556
@@ -248,14 +249,17 @@ async def stop(self):
248249 if self .shutdown_event is not None :
249250 self .shutdown_event .set ()
250251 else :
251- raise RuntimeError ("shutdown_event is not set; was start() not called or is this a redundant stop() call?" )
252+ raise RuntimeError (
253+ "shutdown_event is not set; was start() not called or "
254+ "is this a redundant stop() call?"
255+ )
252256 tasks = [self .send_task , self .receive_task ]
253- tasks_to_run : List [asyncio .Task [Any ]] = [task for task in tasks if task is not None ]
257+ tasks_to_run : list [asyncio .Task [Any ]] = [
258+ task for task in tasks if task is not None
259+ ]
254260 if len (tasks_to_run ) > 0 :
255261 with contextlib .suppress (asyncio .CancelledError ):
256- await asyncio .gather (
257- * tasks_to_run , return_exceptions = True
258- )
262+ await asyncio .gather (* tasks_to_run , return_exceptions = True )
259263 self .send_task = None
260264 self .receive_task = None
261265 if self .worker_index is None :
@@ -354,7 +358,9 @@ async def get(self, timeout: float | None = None) -> ReceiveMessageT:
354358 :return: Decoded message from the receive buffer
355359 """
356360 if self .buffer_receive_queue is None :
357- raise RuntimeError ("buffer receive queue is None; check start()/stop() calls" )
361+ raise RuntimeError (
362+ "buffer receive queue is None; check start()/stop() calls"
363+ )
358364 return await asyncio .wait_for (
359365 self .buffer_receive_queue .async_get (), timeout = timeout
360366 )
@@ -367,7 +373,9 @@ def get_sync(self, timeout: float | None = None) -> ReceiveMessageT:
367373 :return: Decoded message from the receive buffer
368374 """
369375 if self .buffer_receive_queue is None :
370- raise RuntimeError ("buffer receive queue is None; check start()/stop() calls" )
376+ raise RuntimeError (
377+ "buffer receive queue is None; check start()/stop() calls"
378+ )
371379 if timeout is not None and timeout <= 0 :
372380 return self .buffer_receive_queue .get_nowait ()
373381 else :
@@ -381,7 +389,9 @@ async def put(self, item: SendMessageT, timeout: float | None = None):
381389 :param timeout: Maximum time to wait for buffer space
382390 """
383391 if self .buffer_send_queue is None :
384- raise RuntimeError ("buffer receive queue is None; check start()/stop() calls" )
392+ raise RuntimeError (
393+ "buffer receive queue is None; check start()/stop() calls"
394+ )
385395 await asyncio .wait_for (self .buffer_send_queue .async_put (item ), timeout = timeout )
386396
387397 def put_sync (self , item : SendMessageT , timeout : float | None = None ):
@@ -392,7 +402,9 @@ def put_sync(self, item: SendMessageT, timeout: float | None = None):
392402 :param timeout: Maximum time to wait for buffer space, if <=0 uses put_nowait
393403 """
394404 if self .buffer_send_queue is None :
395- raise RuntimeError ("buffer receive queue is None; check start()/stop() calls" )
405+ raise RuntimeError (
406+ "buffer receive queue is None; check start()/stop() calls"
407+ )
396408 if timeout is not None and timeout <= 0 :
397409 self .buffer_send_queue .put_nowait (item )
398410 else :
@@ -457,6 +469,7 @@ class InterProcessMessagingQueue(InterProcessMessaging[SendMessageT, ReceiveMess
457469 # Create worker copy for distributed processing
458470 worker_messaging = messaging.create_worker_copy(worker_index=0)
459471 """
472+
460473 pending_queue : multiprocessing .Queue | queue .Queue [Any ] | None
461474 done_queue : multiprocessing .Queue | queue .Queue [Any ] | None
462475
@@ -545,15 +558,15 @@ async def stop(self):
545558 with contextlib .suppress (queue .Empty ):
546559 while True :
547560 self .pending_queue .get_nowait ()
548- if hasattr (self .pending_queue , ' close' ):
561+ if hasattr (self .pending_queue , " close" ):
549562 self .pending_queue .close ()
550563
551564 if self .done_queue is None :
552565 raise RuntimeError ("done_queue is None; was stop() already called?" )
553566 with contextlib .suppress (queue .Empty ):
554567 while True :
555568 self .done_queue .get_nowait ()
556- if hasattr (self .done_queue , ' close' ):
569+ if hasattr (self .done_queue , " close" ):
557570 self .done_queue .close ()
558571
559572 self .pending_queue = None
@@ -618,7 +631,9 @@ def _send_messages_task_thread( # noqa: C901, PLR0912
618631 item = next (send_items_iter )
619632 else :
620633 if self .buffer_send_queue is None :
621- raise RuntimeError ("buffer_send_queue is None; was stop() already called?" )
634+ raise RuntimeError (
635+ "buffer_send_queue is None; was stop() already called?"
636+ )
622637 item = self .buffer_send_queue .sync_get (
623638 timeout = self .poll_interval
624639 )
@@ -632,16 +647,22 @@ def _send_messages_task_thread( # noqa: C901, PLR0912
632647 if self .worker_index is None :
633648 # Main publisher
634649 if self .pending_queue is None :
635- raise RuntimeError ("pending_queue is None; was stop() already called?" )
650+ raise RuntimeError (
651+ "pending_queue is None; was stop() already called?"
652+ )
636653 self .pending_queue .put (pending_item , timeout = self .poll_interval )
637654 else :
638655 # Worker
639656 if self .done_queue is None :
640- raise RuntimeError ("done_queue is None; was stop() already called?" )
657+ raise RuntimeError (
658+ "done_queue is None; was stop() already called?"
659+ )
641660 self .done_queue .put (pending_item , timeout = self .poll_interval )
642661 if send_items_iter is None :
643662 if self .buffer_send_queue is None :
644- raise RuntimeError ("buffer_send_queue is None; was stop() already called?" )
663+ raise RuntimeError (
664+ "buffer_send_queue is None; was stop() already called?"
665+ )
645666 self .buffer_send_queue .task_done ()
646667 pending_item = None
647668 except (culsans .QueueFull , queue .Full ):
@@ -663,12 +684,16 @@ def _receive_messages_task_thread( # noqa: C901
663684 if self .worker_index is None :
664685 # Main publisher
665686 if self .done_queue is None :
666- raise RuntimeError ("done_queue is None; check start()/stop() calls" )
687+ raise RuntimeError (
688+ "done_queue is None; check start()/stop() calls"
689+ )
667690 item = self .done_queue .get (timeout = self .poll_interval )
668691 else :
669692 # Worker
670693 if self .pending_queue is None :
671- raise RuntimeError ("pending_queue is None; check start()/stop() calls" )
694+ raise RuntimeError (
695+ "pending_queue is None; check start()/stop() calls"
696+ )
672697 item = self .pending_queue .get (timeout = self .poll_interval )
673698 pending_item = message_encoding .decode (item )
674699 queue_empty_count = 0
@@ -685,8 +710,12 @@ def _receive_messages_task_thread( # noqa: C901
685710 )
686711
687712 if self .buffer_receive_queue is None :
688- raise RuntimeError ("buffer_receive_queue is None; check start()/stop() calls" )
689- self .buffer_receive_queue .sync_put (cast (ReceiveMessageT , received_item ))
713+ raise RuntimeError (
714+ "buffer_receive_queue is None; check start()/stop() calls"
715+ )
716+ self .buffer_receive_queue .sync_put (
717+ cast ("ReceiveMessageT" , received_item )
718+ )
690719 pending_item = None
691720 received_item = None
692721 except (culsans .QueueFull , queue .Full ):
@@ -863,9 +892,7 @@ def __init__(
863892
864893 self .pipes : list [tuple [Connection , Connection ]]
865894 if pipe is None :
866- self .pipes = [
867- self .mp_context .Pipe (duplex = True ) for _ in range (num_workers )
868- ]
895+ self .pipes = [self .mp_context .Pipe (duplex = True ) for _ in range (num_workers )]
869896 else :
870897 self .pipes = [pipe ]
871898
@@ -969,7 +996,7 @@ def create_receive_messages_threads(
969996 )
970997 ]
971998
972- def _send_messages_task_thread ( # noqa: C901, PLR0912
999+ def _send_messages_task_thread ( # noqa: C901, PLR0912, PLR0915
9731000 self ,
9741001 pipe : tuple [Connection , Connection ],
9751002 send_items : Iterable [Any ] | None ,
@@ -1009,7 +1036,9 @@ def _background_pipe_recv():
10091036 item = next (send_items_iter )
10101037 else :
10111038 if self .buffer_send_queue is None :
1012- raise RuntimeError ("buffer_send_queue is None; check start()/stop() calls" )
1039+ raise RuntimeError (
1040+ "buffer_send_queue is None; check start()/stop() calls" # noqa: E501
1041+ )
10131042 item = self .buffer_send_queue .sync_get (
10141043 timeout = self .poll_interval
10151044 )
@@ -1028,7 +1057,9 @@ def _background_pipe_recv():
10281057 pipe_item = pending_item
10291058 if send_items_iter is None :
10301059 if self .buffer_send_queue is None :
1031- raise RuntimeError ("buffer_send_queue is None; check start()/stop() calls" )
1060+ raise RuntimeError (
1061+ "buffer_send_queue is None; check start()/stop() calls" # noqa: E501
1062+ )
10321063 self .buffer_send_queue .task_done ()
10331064 pending_item = None
10341065 except (culsans .QueueFull , queue .Full ):
@@ -1071,8 +1102,12 @@ def _receive_messages_task_thread( # noqa: C901
10711102 else receive_callback (pending_item )
10721103 )
10731104 if self .buffer_receive_queue is None :
1074- raise RuntimeError ("buffer receive queue is None; check start()/stop() calls" )
1075- self .buffer_receive_queue .sync_put (cast (ReceiveMessageT , received_item ))
1105+ raise RuntimeError (
1106+ "buffer receive queue is None; check start()/stop() calls"
1107+ )
1108+ self .buffer_receive_queue .sync_put (
1109+ cast ("ReceiveMessageT" , received_item )
1110+ )
10761111 pending_item = None
10771112 received_item = None
10781113 except (culsans .QueueFull , queue .Full ):
0 commit comments