1717from loguru import logger
1818
1919from guidellm .config import settings
20- from guidellm .request .session import RequestSession
2120from guidellm .request .types import (
2221 RequestT ,
2322 ResponseT ,
2726 SchedulerRequestResult ,
2827 SchedulerResult ,
2928 SchedulerRunInfo ,
30- WorkerProcessRequestTime ,
29+ WorkerProcessRequest ,
3130 WorkerProcessResult ,
3231)
3332from guidellm .scheduler .strategy import SchedulingStrategy
@@ -127,10 +126,14 @@ async def run(
127126 ) as executor ,
128127 ):
129128 requests_iter : Optional [Iterator [Any ]] = None
129+ # TODO: Configurable delay and move somewhere more appropriate
130+ scheduling_strategy .start_time = (
131+ time .time ()
132+ ) # Add a small delay to allow processes to start
130133 futures , queues , stop_event = await self ._start_processes (
131134 manager , executor , scheduling_strategy
132135 )
133- run_info , requests_iter , times_iter = self ._run_setup (
136+ run_info , requests_iter = self ._run_setup (
134137 futures , scheduling_strategy , max_number , max_duration
135138 )
136139 yield SchedulerResult (
@@ -145,19 +148,14 @@ async def run(
145148 if future .done () and (err := future .exception ()) is not None :
146149 raise err
147150
148- if (
149- requests_iter is None
150- and run_info .completed_requests >= run_info .created_requests
151- ):
151+ if requests_iter is None and run_info .processing_requests <= 0 :
152152 # we've exhausted all requests we've wanted to run
153153 # and yielded all responses
154154 break
155155
156156 requests_iter = self ._add_requests (
157157 requests_iter ,
158- times_iter ,
159158 queues .requests ,
160- queues .times ,
161159 run_info ,
162160 )
163161 await asyncio .sleep (0 ) # enable requests to start
@@ -196,7 +194,6 @@ async def _start_processes(
196194 requests = manager .Queue (
197195 maxsize = scheduling_strategy .processing_requests_limit
198196 ),
199- times = manager .Queue (maxsize = scheduling_strategy .processing_requests_limit ),
200197 responses = manager .Queue (),
201198 )
202199 stop_event = manager .Event ()
@@ -229,10 +226,12 @@ async def _start_processes(
229226 executor ,
230227 self .worker .process_loop_asynchronous ,
231228 queues ,
229+ scheduling_strategy ,
232230 stop_event ,
233231 False , # TODO: Make configurable
234232 requests_limit ,
235233 id_ ,
234+ num_processes ,
236235 )
237236 )
238237
@@ -246,11 +245,9 @@ def _run_setup(
246245 scheduling_strategy : SchedulingStrategy ,
247246 max_number : Optional [int ],
248247 max_duration : Optional [float ],
249- ) -> tuple [SchedulerRunInfo , Iterator [Any ], Iterator [ float ] ]:
248+ ) -> tuple [SchedulerRunInfo , Iterator [Any ]]:
250249 requests_iter = iter (self .request_loader )
251- start_time = time .time ()
252- times_iter = iter (scheduling_strategy .request_times ())
253- end_time = time .time () + (max_duration or math .inf )
250+ end_time = scheduling_strategy .start_time + (max_duration or math .inf )
254251 end_number = max_number or math .inf
255252
256253 try :
@@ -268,27 +265,28 @@ def _run_setup(
268265 )
269266
270267 info = SchedulerRunInfo (
271- start_time = start_time ,
268+ start_time = scheduling_strategy . start_time ,
272269 end_time = end_time ,
273270 end_number = end_number ,
274271 processes = len (processes ),
275272 strategy = scheduling_strategy ,
276273 )
277274
278- return info , requests_iter , times_iter
275+ return info , requests_iter
279276
280277 def _add_requests (
281278 self ,
282279 requests_iter : Optional [Iterator [Any ]],
283- times_iter : Iterator [float ],
284- requests_queue : Queue [RequestSession [RequestT , ResponseT ]],
285- times_queue : Queue [WorkerProcessRequestTime ],
280+ requests_queue : Queue [WorkerProcessRequest [RequestT , ResponseT ]],
286281 run_info : SchedulerRunInfo ,
287282 ) -> Optional [Iterator [Any ]]:
288283 if requests_iter is not None :
289284 try :
290285 added_count = 0
291286
287+ if time .time () >= run_info .end_time :
288+ raise StopIteration
289+
292290 while (
293291 not requests_queue .full ()
294292 and added_count < settings .max_add_requests_per_loop
@@ -297,23 +295,16 @@ def _add_requests(
297295 raise StopIteration
298296
299297 session = next (requests_iter )
300- requests_queue .put (session )
301- for _ in range (len (session )):
302- if (
303- request_time := next (times_iter )
304- ) >= run_info .end_time or time .time () >= run_info .end_time :
305- raise StopIteration
306-
307- work_req = WorkerProcessRequestTime (
308- start_time = request_time ,
309- timeout_time = run_info .end_time ,
310- queued_time = time .time (),
311- )
312- times_queue .put (work_req )
313-
314- run_info .created_requests += 1
315- run_info .queued_requests += 1
316- added_count += 1
298+ work_req = WorkerProcessRequest (
299+ session = session ,
300+ timeout_time = run_info .end_time ,
301+ queued_time = time .time (),
302+ )
303+ requests_queue .put (work_req )
304+
305+ run_info .created_requests += len (session )
306+ run_info .queued_requests += len (session )
307+ added_count += len (session )
317308 except StopIteration :
318309 # we've reached the limit number, limit time, or exhausted the requests
319310 # set to None to stop adding more and tell the loop no more requests
0 commit comments