3030from guidellm .scheduler .result import (
3131 MPQueues ,
3232 SchedulerRequestInfo ,
33+ WorkerProcessRequestTime ,
3334 WorkerProcessResult ,
3435)
3536from guidellm .scheduler .types import RequestT , ResponseT
@@ -107,6 +108,11 @@ async def resolve(
107108 """
108109 ...
109110
111+ async def get_request_time (
112+ self , times_queue : Queue [WorkerProcessRequestTime ]
113+ ) -> WorkerProcessRequestTime :
114+ return await asyncio .to_thread (times_queue .get ) # type: ignore[attr-defined]
115+
110116 async def send_result (
111117 self ,
112118 results_queue : Queue [WorkerProcessResult [RequestT , ResponseT ]],
@@ -170,7 +176,7 @@ async def resolve_scheduler_request(
170176 asyncio .create_task (self .send_result (results_queue , result ))
171177
172178 request_session .push_response (response )
173- return request
179+ return request_session
174180
175181 def process_loop_asynchronous (
176182 self ,
@@ -194,7 +200,7 @@ async def _process_runner():
194200 else queues .requests .get_nowait ()
195201 )
196202 dequeued_time = time .time ()
197- request_times = queues .times . get ( )
203+ request_times = await self . get_request_time ( queues .times )
198204 except (QueueEmpty , IndexError ):
199205 # Requeue the session if we don't have a next time yet
200206 if request_session is not None :
@@ -215,7 +221,7 @@ async def wait_then_requeue(
215221 # Release the lock with the session on top of the stack
216222 lock .release ()
217223
218- async def _request_callback (
224+ def _request_callback (
219225 session_future : asyncio .Future [RequestSession [RequestT , ResponseT ]],
220226 ):
221227 # If we are prioritizing sessions, hold
@@ -224,7 +230,7 @@ async def _request_callback(
224230 if not prioritize_sessions :
225231 lock .release ()
226232
227- session = await session_future
233+ session = session_future . result ()
228234 if not session .complete :
229235 asyncio .create_task (wait_then_requeue (session ))
230236 elif prioritize_sessions :
0 commit comments