@@ -122,8 +122,8 @@ async def resolve_scheduler_request(
122122 start_time : float ,
123123 results_queue : Queue [WorkerProcessResult [RequestT , ResponseT ]],
124124 process_id : int ,
125- ) -> WorkerProcessRequest [ RequestT , ResponseT ] :
126- request = process_request .session . get_next_request ()
125+ ):
126+ request = process_request .request
127127 timeout_time = process_request .timeout_time
128128 queued_time = process_request .queued_time
129129
@@ -170,22 +170,17 @@ async def resolve_scheduler_request(
170170 )
171171 asyncio .create_task (self .send_result (results_queue , result ))
172172
173- process_request .session .push_response (response )
174- return process_request
175-
176173 def process_loop_asynchronous (
177174 self ,
178175 queues : MPQueues [RequestT , ResponseT ],
179176 strategy : SchedulingStrategy ,
180177 stop_event : Event ,
181- prioritize_sessions : bool ,
182178 max_concurrency : int ,
183179 process_id : int ,
184180 num_processes : int ,
185181 ):
186182 async def _process_runner ():
187183 lock = asyncio .Semaphore (max_concurrency )
188- pending_requests : list [WorkerProcessRequest [RequestT , ResponseT ]] = []
189184 times_iter = islice (
190185 strategy .request_times (),
191186 process_id ,
@@ -202,50 +197,18 @@ async def _process_runner():
202197 await asyncio .sleep (start_time - time .time () - 1 )
203198 await lock .acquire ()
204199
205- process_request = None
206200 try :
207- process_request = (
208- pending_requests .pop ()
209- if pending_requests
210- else queues .requests .get_nowait ()
211- )
201+ process_request = queues .requests .get_nowait ()
212202 dequeued_time = time .time ()
213203 except QueueEmpty :
214204 lock .release ()
215205 continue
216206
217- async def wait_then_requeue (
218- process_request : WorkerProcessRequest [RequestT , ResponseT ],
219- ):
220- # Wait to requeue the request session if it specifies a delay
221- if delay := process_request .session .get_next_delay ():
222- await asyncio .sleep (delay )
223-
224- # Push session to the stack
225- process_request .queued_time = time .time ()
226- pending_requests .append (process_request )
227- if prioritize_sessions :
228- # Release the lock with the session on top of the stack
229- lock .release ()
230-
231207 def _request_callback (
232- future : asyncio .Future [WorkerProcessRequest [RequestT , ResponseT ]],
208+ _ : asyncio .Future [WorkerProcessRequest [RequestT , ResponseT ]],
233209 ):
234- # If we are prioritizing sessions, hold
235- # the lock until the session is done
236210 nonlocal lock
237- if not prioritize_sessions :
238- lock .release ()
239-
240- try :
241- process_request = future .result ()
242- except asyncio .CancelledError :
243- return
244- if not process_request .session .complete :
245- asyncio .create_task (wait_then_requeue (process_request ))
246- elif prioritize_sessions :
247- # no more requests in this session, release the lock
248- lock .release ()
211+ lock .release ()
249212
250213 task = asyncio .create_task (
251214 self .resolve_scheduler_request (
@@ -319,7 +282,6 @@ def process_loop_asynchronous(
319282 queues : MPQueues [GenerationRequest , ResponseSummary ],
320283 strategy : SchedulingStrategy ,
321284 stop_event : Event ,
322- prioritize_sessions : bool ,
323285 max_concurrency : int ,
324286 process_id : int ,
325287 num_processes : int ,
@@ -329,7 +291,6 @@ def process_loop_asynchronous(
329291 queues = queues ,
330292 strategy = strategy ,
331293 stop_event = stop_event ,
332- prioritize_sessions = prioritize_sessions ,
333294 max_concurrency = max_concurrency ,
334295 process_id = process_id ,
335296 num_processes = num_processes ,
0 commit comments