@@ -130,17 +130,15 @@ async def run(
130130 futures , queues , stop_event = await self ._start_processes (
131131 manager , executor , scheduling_strategy
132132 )
133- run_info , requests_iter , times_iter = self ._run_setup (
133+ run_info , requests_iter = self ._run_setup (
134134 futures , scheduling_strategy , max_number , max_duration
135135 )
136136
137137 # Add some initial requests to the queue
138138 requests_iter = self ._add_requests (
139139 requests_iter ,
140140 queues .requests ,
141- times_iter ,
142141 run_info ,
143- loop_limit = run_info .strategy .queued_requests_limit ,
144142 )
145143 # Wait for the test to start
146144 await asyncio .sleep (time .time () - scheduling_strategy .start_time )
@@ -171,7 +169,6 @@ async def run(
171169 requests_iter = self ._add_requests (
172170 requests_iter ,
173171 queues .requests ,
174- times_iter ,
175172 run_info ,
176173 )
177174 await asyncio .sleep (0 ) # enable requests to start
@@ -244,6 +241,7 @@ async def _start_processes(
244241 queues ,
245242 scheduling_strategy ,
246243 stop_event ,
244+ False , # TODO: Make configurable
247245 requests_limit ,
248246 id_ ,
249247 num_processes ,
@@ -260,9 +258,8 @@ def _run_setup(
260258 scheduling_strategy : SchedulingStrategy ,
261259 max_number : Optional [int ],
262260 max_duration : Optional [float ],
263- ) -> tuple [SchedulerRunInfo , Iterator [Any ], Iterator [ float ] ]:
261+ ) -> tuple [SchedulerRunInfo , Iterator [Any ]]:
264262 requests_iter = iter (self .request_loader )
265- times_iter = iter (scheduling_strategy .request_times ())
266263 end_time = scheduling_strategy .start_time + (max_duration or math .inf )
267264 end_number = max_number or math .inf
268265
@@ -288,42 +285,39 @@ def _run_setup(
288285 strategy = scheduling_strategy ,
289286 )
290287
291- return info , requests_iter , times_iter
288+ return info , requests_iter
292289
293290 def _add_requests (
294291 self ,
295292 requests_iter : Optional [Iterator [Any ]],
296293 requests_queue : Queue [WorkerProcessRequest [RequestT , ResponseT ]],
297- times_iter : Iterator [float ],
298294 run_info : SchedulerRunInfo ,
299- loop_limit : Optional [int ] = None ,
300295 ) -> Optional [Iterator [Any ]]:
301296 if requests_iter is not None :
302297 try :
303298 added_count = 0
304299
300+ if time .time () >= run_info .end_time :
301+ raise StopIteration
302+
305303 while not requests_queue .full () and added_count < (
306- loop_limit or settings .max_add_requests_per_loop
304+ run_info .strategy .queued_requests_limit
305+ or settings .max_add_requests_per_loop
307306 ):
308307 if run_info .created_requests >= run_info .end_number :
309308 raise StopIteration
310309
311- if (
312- next (times_iter ) >= run_info .end_time
313- or time .time () >= run_info .end_time
314- ):
315- raise StopIteration
316-
317- work_req = WorkerProcessRequest [RequestT , ResponseT ](
318- request = next (requests_iter ),
310+ session = next (requests_iter )
311+ work_req = WorkerProcessRequest (
312+ session = session ,
319313 timeout_time = run_info .end_time ,
320314 queued_time = time .time (),
321315 )
322316 requests_queue .put (work_req )
323317
324- run_info .created_requests += 1
325- run_info .queued_requests += 1
326- added_count += 1
318+ run_info .created_requests += len ( session )
319+ run_info .queued_requests += len ( session )
320+ added_count += len ( session )
327321 except StopIteration :
328322 # we've reached the limit number, limit time, or exhausted the requests
329323 # set to None to stop adding more and tell the loop no more requests
0 commit comments