66from multiprocessing import Manager
77from queue import Empty as QueueEmpty
88from queue import Queue
9+ from threading import Event
910from typing import (
1011 Any ,
1112 Generic ,
@@ -126,7 +127,7 @@ async def run(
126127 ) as executor ,
127128 ):
128129 requests_iter : Optional [Iterator [Any ]] = None
129- futures , queues = await self ._start_processes (
130+ futures , queues , stop_event = await self ._start_processes (
130131 manager , executor , scheduling_strategy
131132 )
132133 run_info , requests_iter , times_iter = self ._run_setup (
@@ -178,7 +179,7 @@ async def run(
178179 run_info = run_info ,
179180 )
180181
181- await self ._stop_processes (futures , queues . requests )
182+ await self ._stop_processes (futures , stop_event )
182183
183184 async def _start_processes (
184185 self ,
@@ -188,6 +189,7 @@ async def _start_processes(
188189 ) -> tuple [
189190 list [asyncio .Future ],
190191 MPQueues [RequestT , ResponseT ],
192+ Event ,
191193 ]:
192194 await self .worker .prepare_multiprocessing ()
193195 queues : MPQueues [RequestT , ResponseT ] = MPQueues (
@@ -197,6 +199,7 @@ async def _start_processes(
197199 times = manager .Queue (maxsize = scheduling_strategy .processing_requests_limit ),
198200 responses = manager .Queue (),
199201 )
202+ stop_event = manager .Event ()
200203
201204 num_processes = min (
202205 scheduling_strategy .processes_limit ,
@@ -226,6 +229,7 @@ async def _start_processes(
226229 executor ,
227230 self .worker .process_loop_asynchronous ,
228231 queues ,
232+ stop_event ,
229233 False , # TODO: Make configurable
230234 requests_limit ,
231235 id_ ,
@@ -234,7 +238,7 @@ async def _start_processes(
234238
235239 await asyncio .sleep (0.1 ) # give time for processes to start
236240
237- return futures , queues
241+ return futures , queues , stop_event
238242
239243 def _run_setup (
240244 self ,
@@ -369,10 +373,9 @@ def _check_result_ready(
369373 async def _stop_processes (
370374 self ,
371375 futures : list [asyncio .Future ],
372- requests_queue : Queue [ RequestSession [ RequestT , ResponseT ]] ,
376+ stop_event : Event ,
373377 ):
374- # FIXME: Need new method for stopping workers
375- for _ in futures :
376- requests_queue .put (None )
378+ # stop all processes
379+ stop_event .set ()
377380
378381 await asyncio .gather (* futures )
0 commit comments