@@ -123,7 +123,7 @@ async def resolve_scheduler_request(
123123 results_queue : Queue [WorkerProcessResult [RequestT , ResponseT ]],
124124 process_id : int ,
125125 ) -> WorkerProcessRequest [RequestT , ResponseT ]:
126- request = process_request .session . get_next_request ()
126+ request = process_request .request
127127 timeout_time = process_request .timeout_time
128128 queued_time = process_request .queued_time
129129
@@ -170,22 +170,19 @@ async def resolve_scheduler_request(
170170 )
171171 asyncio .create_task (self .send_result (results_queue , result ))
172172
173- process_request .session .push_response (response )
174173 return process_request
175174
176175 def process_loop_asynchronous (
177176 self ,
178177 queues : MPQueues [RequestT , ResponseT ],
179178 strategy : SchedulingStrategy ,
180179 stop_event : Event ,
181- prioritize_sessions : bool ,
182180 max_concurrency : int ,
183181 process_id : int ,
184182 num_processes : int ,
185183 ):
186184 async def _process_runner ():
187185 lock = asyncio .Semaphore (max_concurrency )
188- pending_requests : list [WorkerProcessRequest [RequestT , ResponseT ]] = []
189186 times_iter = islice (
190187 strategy .request_times (),
191188 process_id ,
@@ -202,50 +199,18 @@ async def _process_runner():
202199 await asyncio .sleep (start_time - time .time () - 1 )
203200 await lock .acquire ()
204201
205- process_request = None
206202 try :
207- process_request = (
208- pending_requests .pop ()
209- if pending_requests
210- else queues .requests .get_nowait ()
211- )
203+ process_request = queues .requests .get_nowait ()
212204 dequeued_time = time .time ()
213205 except QueueEmpty :
214206 lock .release ()
215207 continue
216208
217- async def wait_then_requeue (
218- process_request : WorkerProcessRequest [RequestT , ResponseT ],
219- ):
220- # Wait to requeue the request session if it specifies a delay
221- if delay := process_request .session .get_next_delay ():
222- await asyncio .sleep (delay )
223-
224- # Push session to the stack
225- process_request .queued_time = time .time ()
226- pending_requests .append (process_request )
227- if prioritize_sessions :
228- # Release the lock with the session on top of the stack
229- lock .release ()
230-
231209 def _request_callback (
232- future : asyncio .Future [WorkerProcessRequest [RequestT , ResponseT ]],
210+ _ : asyncio .Future [WorkerProcessRequest [RequestT , ResponseT ]],
233211 ):
234- # If we are prioritizing sessions, hold
235- # the lock until the session is done
236212 nonlocal lock
237- if not prioritize_sessions :
238- lock .release ()
239-
240- try :
241- process_request = future .result ()
242- except asyncio .CancelledError :
243- return
244- if not process_request .session .complete :
245- asyncio .create_task (wait_then_requeue (process_request ))
246- elif prioritize_sessions :
247- # no more requests in this session, release the lock
248- lock .release ()
213+ lock .release ()
249214
250215 task = asyncio .create_task (
251216 self .resolve_scheduler_request (
@@ -319,7 +284,6 @@ def process_loop_asynchronous(
319284 queues : MPQueues [GenerationRequest , ResponseSummary ],
320285 strategy : SchedulingStrategy ,
321286 stop_event : Event ,
322- prioritize_sessions : bool ,
323287 max_concurrency : int ,
324288 process_id : int ,
325289 num_processes : int ,
@@ -329,7 +293,6 @@ def process_loop_asynchronous(
329293 queues = queues ,
330294 strategy = strategy ,
331295 stop_event = stop_event ,
332- prioritize_sessions = prioritize_sessions ,
333296 max_concurrency = max_concurrency ,
334297 process_id = process_id ,
335298 num_processes = num_processes ,
0 commit comments