From 9cae3e6dc93ed1f848efd5262234fa35d76f1954 Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Wed, 16 Jul 2025 13:58:04 -0400 Subject: [PATCH 01/17] Unify rps and concurrent scheduler paths Signed-off-by: Samuel Monson --- src/guidellm/request/loader.py | 9 +- src/guidellm/request/session.py | 52 ++++++++ src/guidellm/scheduler/__init__.py | 2 - src/guidellm/scheduler/result.py | 31 +++++ src/guidellm/scheduler/scheduler.py | 127 ++++++++++--------- src/guidellm/scheduler/strategy.py | 4 +- src/guidellm/scheduler/types.py | 5 +- src/guidellm/scheduler/worker.py | 187 +++++++++++++--------------- 8 files changed, 244 insertions(+), 173 deletions(-) create mode 100644 src/guidellm/request/session.py diff --git a/src/guidellm/request/loader.py b/src/guidellm/request/loader.py index 50ab3cca..452e4733 100644 --- a/src/guidellm/request/loader.py +++ b/src/guidellm/request/loader.py @@ -15,6 +15,7 @@ from guidellm.dataset import ColumnInputTypes, load_dataset from guidellm.objects import StandardBaseModel from guidellm.request.request import GenerationRequest +from guidellm.request.session import GenerativeRequestSession __all__ = [ "GenerativeRequestLoader", @@ -30,10 +31,10 @@ class RequestLoaderDescription(StandardBaseModel): class RequestLoader(Iterable): @abstractmethod - def __iter__(self): ... + def __iter__(self) -> Iterator: ... @abstractmethod - def __len__(self): ... + def __len__(self) -> int: ... @property @abstractmethod @@ -105,14 +106,14 @@ def __init__( self.preserve_iter_state = iter_type == "infinite" # ensure no caching requests self._preserved_iter = None - def __iter__(self) -> Iterator[GenerationRequest]: + def __iter__(self) -> Iterator[GenerativeRequestSession]: scope_create_count = 0 while (dataset_iter := self._get_dataset_iter(scope_create_count)) is not None: scope_create_count += 1 for item in dataset_iter: - yield self._create_request(item) + yield GenerativeRequestSession(self._create_request(item)) self._preserved_iter = None diff --git a/src/guidellm/request/session.py b/src/guidellm/request/session.py new file mode 100644 index 00000000..6ea7633b --- /dev/null +++ b/src/guidellm/request/session.py @@ -0,0 +1,52 @@ +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from guidellm.backend.response import ResponseSummary +from guidellm.request.request import GenerationRequest + +__all__ = ["GenerativeRequestSession", "RequestSession"] + +# TODO: Replace with specific types that implement needed features +RequestT = TypeVar("RequestT") +ResponseT = TypeVar("ResponseT") + + +class RequestSession(ABC, Generic[RequestT, ResponseT]): + @abstractmethod + def __len__(self) -> int: ... + + @abstractmethod + def get_next_request(self) -> RequestT: ... + + @abstractmethod + def get_next_delay(self) -> float: ... + + @abstractmethod + def push_response(self, response: ResponseT) -> None: ... + + @property + @abstractmethod + def complete(self) -> bool: ... + + +# TODO: Implement multiturn support +class GenerativeRequestSession(RequestSession[GenerationRequest, ResponseSummary]): + def __init__(self, request: GenerationRequest) -> None: + self.request = request + self._complete = False + + def __len__(self) -> int: + return 1 + + def get_next_request(self) -> GenerationRequest: + return self.request + + def get_next_delay(self) -> float: + return 0.0 + + def push_response(self, response: ResponseSummary) -> None: # noqa: ARG002 + self._complete = True + + @property + def complete(self) -> bool: + return self._complete diff --git a/src/guidellm/scheduler/__init__.py b/src/guidellm/scheduler/__init__.py index 37bf1fd5..69443ce6 100644 --- a/src/guidellm/scheduler/__init__.py +++ b/src/guidellm/scheduler/__init__.py @@ -22,7 +22,6 @@ RequestsWorker, ResolveStatus, WorkerDescription, - WorkerProcessRequest, WorkerProcessResult, ) @@ -46,7 +45,6 @@ "SynchronousStrategy", "ThroughputStrategy", "WorkerDescription", - "WorkerProcessRequest", "WorkerProcessResult", "strategy_display_str", ] diff --git a/src/guidellm/scheduler/result.py b/src/guidellm/scheduler/result.py index 0f12687f..1181a7fa 100644 --- a/src/guidellm/scheduler/result.py +++ b/src/guidellm/scheduler/result.py @@ -1,3 +1,5 @@ +from dataclasses import dataclass +from queue import Queue from typing import ( Generic, Literal, @@ -5,14 +7,18 @@ ) from guidellm.objects import StandardBaseModel +from guidellm.request.session import RequestSession from guidellm.scheduler.strategy import SchedulingStrategy from guidellm.scheduler.types import RequestT, ResponseT __all__ = [ + "MPQueues", "SchedulerRequestInfo", "SchedulerRequestResult", "SchedulerResult", "SchedulerRunInfo", + "WorkerProcessRequestTime", + "WorkerProcessResult", ] @@ -135,3 +141,28 @@ class SchedulerRequestResult( request: RequestT request_info: SchedulerRequestInfo response: Optional[ResponseT] = None + + +# TODO: Move dataclasses somewhere else + + +@dataclass +class WorkerProcessRequestTime: + start_time: float + timeout_time: float + queued_time: float + + +@dataclass +class WorkerProcessResult(Generic[RequestT, ResponseT]): + type_: Literal["request_scheduled", "request_start", "request_complete"] + request: RequestT + response: Optional[ResponseT] + info: SchedulerRequestInfo + + +@dataclass +class MPQueues(Generic[RequestT, ResponseT]): + requests: Queue[RequestSession[RequestT, ResponseT]] + times: Queue[WorkerProcessRequestTime] + responses: Queue[WorkerProcessResult[RequestT, ResponseT]] diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 06203827..f5acc634 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -1,10 +1,12 @@ import asyncio import math -import multiprocessing -import multiprocessing.queues import time from collections.abc import AsyncGenerator, Iterable, Iterator from concurrent.futures import ProcessPoolExecutor +from multiprocessing import Manager +from queue import Empty as QueueEmpty +from queue import Queue +from threading import Event from typing import ( Any, Generic, @@ -15,17 +17,22 @@ from loguru import logger from guidellm.config import settings +from guidellm.request.session import RequestSession from guidellm.scheduler.result import ( + MPQueues, SchedulerRequestResult, SchedulerResult, SchedulerRunInfo, + WorkerProcessRequestTime, + WorkerProcessResult, ) from guidellm.scheduler.strategy import SchedulingStrategy -from guidellm.scheduler.types import RequestT, ResponseT +from guidellm.scheduler.types import ( + RequestT, + ResponseT, +) from guidellm.scheduler.worker import ( RequestsWorker, - WorkerProcessRequest, - WorkerProcessResult, ) __all__ = ["Scheduler"] @@ -114,13 +121,13 @@ async def run( raise ValueError(f"Invalid max_duration: {max_duration}") with ( - multiprocessing.Manager() as manager, + Manager() as manager, ProcessPoolExecutor( max_workers=scheduling_strategy.processes_limit ) as executor, ): requests_iter: Optional[Iterator[Any]] = None - futures, requests_queue, responses_queue = await self._start_processes( + futures, queues, stop_event = await self._start_processes( manager, executor, scheduling_strategy ) run_info, requests_iter, times_iter = self._run_setup( @@ -149,13 +156,14 @@ async def run( requests_iter = self._add_requests( requests_iter, times_iter, - requests_queue, + queues.requests, + queues.times, run_info, ) await asyncio.sleep(0) # enable requests to start iter_result = self._check_result_ready( - responses_queue, + queues.responses, run_info, ) if iter_result is not None: @@ -171,7 +179,7 @@ async def run( run_info=run_info, ) - await self._stop_processes(futures, requests_queue) + await self._stop_processes(futures, stop_event) async def _start_processes( self, @@ -180,14 +188,18 @@ async def _start_processes( scheduling_strategy: SchedulingStrategy, ) -> tuple[ list[asyncio.Future], - multiprocessing.Queue, - multiprocessing.Queue, + MPQueues[RequestT, ResponseT], + Event, ]: await self.worker.prepare_multiprocessing() - requests_queue = manager.Queue( - maxsize=scheduling_strategy.queued_requests_limit + queues: MPQueues[RequestT, ResponseT] = MPQueues( + requests=manager.Queue( + maxsize=scheduling_strategy.processing_requests_limit + ), + times=manager.Queue(maxsize=scheduling_strategy.processing_requests_limit), + responses=manager.Queue(), ) - responses_queue = manager.Queue() + stop_event = manager.Event() num_processes = min( scheduling_strategy.processes_limit, @@ -212,36 +224,21 @@ async def _start_processes( futures = [] loop = asyncio.get_event_loop() for id_, requests_limit in zip(process_ids, process_requests_limits): - if scheduling_strategy.processing_mode == "sync": - futures.append( - loop.run_in_executor( - executor, - self.worker.process_loop_synchronous, - requests_queue, - responses_queue, - id_, - ) - ) - elif scheduling_strategy.processing_mode == "async": - futures.append( - loop.run_in_executor( - executor, - self.worker.process_loop_asynchronous, - requests_queue, - responses_queue, - requests_limit, - id_, - ) - ) - else: - raise ValueError( - f"Invalid processing mode: {scheduling_strategy.processing_mode} " - f"for strategy: {scheduling_strategy}" + futures.append( + loop.run_in_executor( + executor, + self.worker.process_loop_asynchronous, + queues, + stop_event, + False, # TODO: Make configurable + requests_limit, + id_, ) + ) await asyncio.sleep(0.1) # give time for processes to start - return futures, requests_queue, responses_queue + return futures, queues, stop_event def _run_setup( self, @@ -284,7 +281,8 @@ def _add_requests( self, requests_iter: Optional[Iterator[Any]], times_iter: Iterator[float], - requests_queue: multiprocessing.Queue, + requests_queue: Queue[RequestSession[RequestT, ResponseT]], + times_queue: Queue[WorkerProcessRequestTime], run_info: SchedulerRunInfo, ) -> Optional[Iterator[Any]]: if requests_iter is not None: @@ -298,23 +296,24 @@ def _add_requests( if run_info.created_requests >= run_info.end_number: raise StopIteration - if ( - request_time := next(times_iter) - ) >= run_info.end_time or time.time() >= run_info.end_time: - raise StopIteration - - request = next(requests_iter) - work_req: WorkerProcessRequest[RequestT] = WorkerProcessRequest( - request=request, - start_time=request_time, - timeout_time=run_info.end_time, - queued_time=time.time(), - ) - requests_queue.put(work_req) - - run_info.created_requests += 1 - run_info.queued_requests += 1 - added_count += 1 + session = next(requests_iter) + requests_queue.put(session) + for _ in range(len(session)): + if ( + request_time := next(times_iter) + ) >= run_info.end_time or time.time() >= run_info.end_time: + raise StopIteration + + work_req = WorkerProcessRequestTime( + start_time=request_time, + timeout_time=run_info.end_time, + queued_time=time.time(), + ) + times_queue.put(work_req) + + run_info.created_requests += 1 + run_info.queued_requests += 1 + added_count += 1 except StopIteration: # we've reached the limit number, limit time, or exhausted the requests # set to None to stop adding more and tell the loop no more requests @@ -324,14 +323,14 @@ def _add_requests( def _check_result_ready( self, - responses_queue: multiprocessing.Queue, + responses_queue: Queue[WorkerProcessResult[RequestT, ResponseT]], run_info: SchedulerRunInfo, ) -> Optional[SchedulerRequestResult[RequestT, ResponseT]]: try: process_response: WorkerProcessResult[RequestT, ResponseT] = ( responses_queue.get_nowait() ) - except multiprocessing.queues.Empty: # type: ignore[attr-defined] + except QueueEmpty: return None if process_response.type_ == "request_scheduled": @@ -374,9 +373,9 @@ def _check_result_ready( async def _stop_processes( self, futures: list[asyncio.Future], - requests_queue: multiprocessing.Queue, + stop_event: Event, ): - for _ in futures: - requests_queue.put(None) + # stop all processes + stop_event.set() await asyncio.gather(*futures) diff --git a/src/guidellm/scheduler/strategy.py b/src/guidellm/scheduler/strategy.py index 200c799e..60dd799e 100644 --- a/src/guidellm/scheduler/strategy.py +++ b/src/guidellm/scheduler/strategy.py @@ -226,7 +226,9 @@ def processes_limit(self) -> int: :return: {self.streams} for the concurrent scheduling strategy to limit the worker processes to the number of streams. """ - return self.streams + cpu_cores = os.cpu_count() or 1 + + return min(max(1, cpu_cores - 1), self.streams) @property def queued_requests_limit(self) -> int: diff --git a/src/guidellm/scheduler/types.py b/src/guidellm/scheduler/types.py index 42535d71..f82493be 100644 --- a/src/guidellm/scheduler/types.py +++ b/src/guidellm/scheduler/types.py @@ -1,6 +1,9 @@ from typing import TypeVar -__all__ = ["RequestT", "ResponseT"] +__all__ = [ + "RequestT", + "ResponseT", +] RequestT = TypeVar("RequestT") diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index a53b14c2..6677e99d 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -1,11 +1,12 @@ import asyncio import math -import multiprocessing -import multiprocessing.queues import time from abc import ABC, abstractmethod from collections.abc import AsyncGenerator from dataclasses import dataclass +from queue import Empty as QueueEmpty +from queue import Queue +from threading import Event from typing import ( Any, Generic, @@ -26,7 +27,12 @@ ) from guidellm.objects import StandardBaseModel from guidellm.request import GenerationRequest -from guidellm.scheduler.result import SchedulerRequestInfo +from guidellm.request.session import RequestSession +from guidellm.scheduler.result import ( + MPQueues, + SchedulerRequestInfo, + WorkerProcessResult, +) from guidellm.scheduler.types import RequestT, ResponseT __all__ = [ @@ -35,27 +41,9 @@ "RequestsWorker", "ResolveStatus", "WorkerDescription", - "WorkerProcessRequest", - "WorkerProcessResult", ] -@dataclass -class WorkerProcessRequest(Generic[RequestT]): - request: RequestT - start_time: float - timeout_time: float - queued_time: float - - -@dataclass -class WorkerProcessResult(Generic[RequestT, ResponseT]): - type_: Literal["request_scheduled", "request_start", "request_complete"] - request: RequestT - response: Optional[ResponseT] - info: SchedulerRequestInfo - - @dataclass class ResolveStatus: requested: bool @@ -120,28 +108,25 @@ async def resolve( """ ... - async def get_request( - self, requests_queue: multiprocessing.Queue - ) -> Optional[WorkerProcessRequest[RequestT]]: - return await asyncio.to_thread(requests_queue.get) # type: ignore[attr-defined] - async def send_result( self, - results_queue: multiprocessing.Queue, + results_queue: Queue[WorkerProcessResult[RequestT, ResponseT]], result: WorkerProcessResult[RequestT, ResponseT], ): await asyncio.to_thread(results_queue.put, result) # type: ignore[attr-defined] async def resolve_scheduler_request( self, - request: Any, + request_session: RequestSession[RequestT, ResponseT], queued_time: float, dequeued_time: float, start_time: float, timeout_time: float, - results_queue: multiprocessing.Queue, + results_queue: Queue[WorkerProcessResult[RequestT, ResponseT]], process_id: int, - ): + ) -> Any: + request = request_session.get_next_request() + info = SchedulerRequestInfo( targeted_start_time=start_time, queued_time=queued_time, @@ -185,74 +170,85 @@ async def resolve_scheduler_request( ) asyncio.create_task(self.send_result(results_queue, result)) - def process_loop_synchronous( - self, - requests_queue: multiprocessing.Queue, - results_queue: multiprocessing.Queue, - process_id: int, - ): - async def _process_runner(): - while ( - process_request := await self.get_request(requests_queue) - ) is not None: - dequeued_time = time.time() - - await self.resolve_scheduler_request( - request=process_request.request, - queued_time=process_request.queued_time, - dequeued_time=dequeued_time, - start_time=process_request.start_time, - timeout_time=process_request.timeout_time, - results_queue=results_queue, - process_id=process_id, - ) - - try: - asyncio.run(_process_runner()) - except Exception as exc: # noqa: BLE001 - logger.error( - f"Error in worker process {process_id}: {exc}", - exc_info=True, - stack_info=True, - ) + request_session.push_response(response) + return request_session def process_loop_asynchronous( self, - requests_queue: multiprocessing.Queue, - results_queue: multiprocessing.Queue, + queues: MPQueues[RequestT, ResponseT], + stop_event: Event, + prioritize_sessions: bool, max_concurrency: int, process_id: int, ): async def _process_runner(): - pending = asyncio.Semaphore(max_concurrency) - - if pending.locked(): - raise ValueError("Async worker called with max_concurrency < 1") - - while ( - process_request := await self.get_request(requests_queue) - ) is not None: - dequeued_time = time.time() - - await pending.acquire() - - def _task_done(_: asyncio.Task): - nonlocal pending - pending.release() + lock = asyncio.Semaphore(max_concurrency) + pending_sessions: list[RequestSession[RequestT, ResponseT]] = [] + + while True: + await asyncio.sleep(0) # Yield control to the event loop + await lock.acquire() + + request_session = None + try: + request_session = ( + pending_sessions.pop() + if pending_sessions + else queues.requests.get_nowait() + ) + dequeued_time = time.time() + request_times = queues.times.get_nowait() + except (QueueEmpty, IndexError): + # Requeue the session if we don't have a next time yet + if request_session is not None: + pending_sessions.append(request_session) + lock.release() + if stop_event.is_set(): + return # Exit if stop event is set + else: + continue + + async def wait_then_requeue( + session: RequestSession[RequestT, ResponseT], + ): + # Wait to requeue the request session if it specifies a delay + if delay := session.get_next_delay(): + await asyncio.sleep(delay) + + # Push session to the stack + pending_sessions.append(session) + if prioritize_sessions: + # Release the lock with the session on top of the stack + lock.release() + + def _request_callback( + session_future: asyncio.Future[RequestSession[RequestT, ResponseT]], + ): + # If we are prioritizing sessions, hold + # the lock until the session is done + nonlocal lock + if not prioritize_sessions: + lock.release() + + session = session_future.result() + if not session.complete: + asyncio.create_task(wait_then_requeue(session)) + elif prioritize_sessions: + # no more requests in this session, release the lock + lock.release() task = asyncio.create_task( self.resolve_scheduler_request( - request=process_request.request, - queued_time=process_request.queued_time, + request_session=request_session, + queued_time=request_times.queued_time, dequeued_time=dequeued_time, - start_time=process_request.start_time, - timeout_time=process_request.timeout_time, - results_queue=results_queue, + start_time=request_times.start_time, + timeout_time=request_times.timeout_time, + results_queue=queues.responses, process_id=process_id, ) ) - task.add_done_callback(_task_done) - await asyncio.sleep(0) # enable start task immediately + task.add_done_callback(_request_callback) try: asyncio.run(_process_runner()) @@ -309,30 +305,19 @@ async def prepare_multiprocessing(self): """ await self.backend.prepare_multiprocessing() - def process_loop_synchronous( - self, - requests_queue: multiprocessing.Queue, - results_queue: multiprocessing.Queue, - process_id: int, - ): - asyncio.run(self.backend.validate()) - super().process_loop_synchronous( - requests_queue=requests_queue, - results_queue=results_queue, - process_id=process_id, - ) - def process_loop_asynchronous( self, - requests_queue: multiprocessing.Queue, - results_queue: multiprocessing.Queue, + queues: MPQueues[GenerationRequest, ResponseSummary], + stop_event: Event, + prioritize_sessions: bool, max_concurrency: int, process_id: int, ): asyncio.run(self.backend.validate()) super().process_loop_asynchronous( - requests_queue=requests_queue, - results_queue=results_queue, + queues=queues, + stop_event=stop_event, + prioritize_sessions=prioritize_sessions, max_concurrency=max_concurrency, process_id=process_id, ) From b4b9d126611799c50b19ba3c91f1c5fd712f384e Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Wed, 23 Jul 2025 16:42:06 -0400 Subject: [PATCH 02/17] Move Generic request types to avoid circular import Signed-off-by: Samuel Monson --- src/guidellm/benchmark/aggregator.py | 4 ++-- src/guidellm/benchmark/benchmarker.py | 4 ++-- src/guidellm/request/__init__.py | 6 ++++++ src/guidellm/{scheduler => request}/types.py | 0 src/guidellm/scheduler/__init__.py | 3 --- src/guidellm/scheduler/result.py | 2 +- src/guidellm/scheduler/scheduler.py | 8 ++++---- src/guidellm/scheduler/worker.py | 2 +- 8 files changed, 16 insertions(+), 13 deletions(-) rename src/guidellm/{scheduler => request}/types.py (100%) diff --git a/src/guidellm/benchmark/aggregator.py b/src/guidellm/benchmark/aggregator.py index af7f1a13..f10eb5ed 100644 --- a/src/guidellm/benchmark/aggregator.py +++ b/src/guidellm/benchmark/aggregator.py @@ -32,11 +32,11 @@ GenerationRequest, GenerativeRequestLoaderDescription, RequestLoaderDescription, + RequestT, + ResponseT, ) from guidellm.scheduler import ( GenerativeRequestsWorkerDescription, - RequestT, - ResponseT, SchedulerRequestResult, WorkerDescription, ) diff --git a/src/guidellm/benchmark/benchmarker.py b/src/guidellm/benchmark/benchmarker.py index 11b6d245..0e34e322 100644 --- a/src/guidellm/benchmark/benchmarker.py +++ b/src/guidellm/benchmark/benchmarker.py @@ -27,12 +27,12 @@ GenerationRequest, GenerativeRequestLoaderDescription, RequestLoaderDescription, + RequestT, + ResponseT, ) from guidellm.scheduler import ( GenerativeRequestsWorker, RequestsWorker, - RequestT, - ResponseT, Scheduler, SchedulerRequestResult, SchedulingStrategy, diff --git a/src/guidellm/request/__init__.py b/src/guidellm/request/__init__.py index db3059cc..fd0ec355 100644 --- a/src/guidellm/request/__init__.py +++ b/src/guidellm/request/__init__.py @@ -5,11 +5,17 @@ RequestLoaderDescription, ) from .request import GenerationRequest +from .session import GenerativeRequestSession, RequestSession +from .types import RequestT, ResponseT __all__ = [ "GenerationRequest", "GenerativeRequestLoader", "GenerativeRequestLoaderDescription", + "GenerativeRequestSession", "RequestLoader", "RequestLoaderDescription", + "RequestSession", + "RequestT", + "ResponseT", ] diff --git a/src/guidellm/scheduler/types.py b/src/guidellm/request/types.py similarity index 100% rename from src/guidellm/scheduler/types.py rename to src/guidellm/request/types.py diff --git a/src/guidellm/scheduler/__init__.py b/src/guidellm/scheduler/__init__.py index 69443ce6..d3aa0aab 100644 --- a/src/guidellm/scheduler/__init__.py +++ b/src/guidellm/scheduler/__init__.py @@ -15,7 +15,6 @@ ThroughputStrategy, strategy_display_str, ) -from .types import RequestT, ResponseT from .worker import ( GenerativeRequestsWorker, GenerativeRequestsWorkerDescription, @@ -31,10 +30,8 @@ "ConcurrentStrategy", "GenerativeRequestsWorker", "GenerativeRequestsWorkerDescription", - "RequestT", "RequestsWorker", "ResolveStatus", - "ResponseT", "Scheduler", "SchedulerRequestInfo", "SchedulerRequestResult", diff --git a/src/guidellm/scheduler/result.py b/src/guidellm/scheduler/result.py index 1181a7fa..95944a9e 100644 --- a/src/guidellm/scheduler/result.py +++ b/src/guidellm/scheduler/result.py @@ -8,8 +8,8 @@ from guidellm.objects import StandardBaseModel from guidellm.request.session import RequestSession +from guidellm.request.types import RequestT, ResponseT from guidellm.scheduler.strategy import SchedulingStrategy -from guidellm.scheduler.types import RequestT, ResponseT __all__ = [ "MPQueues", diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index f5acc634..ce986eb7 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -18,6 +18,10 @@ from guidellm.config import settings from guidellm.request.session import RequestSession +from guidellm.request.types import ( + RequestT, + ResponseT, +) from guidellm.scheduler.result import ( MPQueues, SchedulerRequestResult, @@ -27,10 +31,6 @@ WorkerProcessResult, ) from guidellm.scheduler.strategy import SchedulingStrategy -from guidellm.scheduler.types import ( - RequestT, - ResponseT, -) from guidellm.scheduler.worker import ( RequestsWorker, ) diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index 6677e99d..542d0346 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -28,12 +28,12 @@ from guidellm.objects import StandardBaseModel from guidellm.request import GenerationRequest from guidellm.request.session import RequestSession +from guidellm.request.types import RequestT, ResponseT from guidellm.scheduler.result import ( MPQueues, SchedulerRequestInfo, WorkerProcessResult, ) -from guidellm.scheduler.types import RequestT, ResponseT __all__ = [ "GenerativeRequestsWorker", From 1aacd3a0a7e37b4380a42445e086e2edc19a9658 Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Mon, 28 Jul 2025 14:43:17 -0400 Subject: [PATCH 03/17] Remove times queue Signed-off-by: Samuel Monson --- src/guidellm/scheduler/result.py | 9 ++- src/guidellm/scheduler/scheduler.py | 63 +++++++++------------ src/guidellm/scheduler/strategy.py | 34 +++++++----- src/guidellm/scheduler/worker.py | 85 +++++++++++++++++------------ 4 files changed, 100 insertions(+), 91 deletions(-) diff --git a/src/guidellm/scheduler/result.py b/src/guidellm/scheduler/result.py index 95944a9e..e8b0f877 100644 --- a/src/guidellm/scheduler/result.py +++ b/src/guidellm/scheduler/result.py @@ -17,7 +17,7 @@ "SchedulerRequestResult", "SchedulerResult", "SchedulerRunInfo", - "WorkerProcessRequestTime", + "WorkerProcessRequest", "WorkerProcessResult", ] @@ -147,8 +147,8 @@ class SchedulerRequestResult( @dataclass -class WorkerProcessRequestTime: - start_time: float +class WorkerProcessRequest(Generic[RequestT, ResponseT]): + session: RequestSession[RequestT, ResponseT] timeout_time: float queued_time: float @@ -163,6 +163,5 @@ class WorkerProcessResult(Generic[RequestT, ResponseT]): @dataclass class MPQueues(Generic[RequestT, ResponseT]): - requests: Queue[RequestSession[RequestT, ResponseT]] - times: Queue[WorkerProcessRequestTime] + requests: Queue[WorkerProcessRequest[RequestT, ResponseT]] responses: Queue[WorkerProcessResult[RequestT, ResponseT]] diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index ce986eb7..8bf27ed4 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -17,7 +17,6 @@ from loguru import logger from guidellm.config import settings -from guidellm.request.session import RequestSession from guidellm.request.types import ( RequestT, ResponseT, @@ -27,7 +26,7 @@ SchedulerRequestResult, SchedulerResult, SchedulerRunInfo, - WorkerProcessRequestTime, + WorkerProcessRequest, WorkerProcessResult, ) from guidellm.scheduler.strategy import SchedulingStrategy @@ -127,10 +126,14 @@ async def run( ) as executor, ): requests_iter: Optional[Iterator[Any]] = None + # TODO: Configurable delay and move somewhere more appropriate + scheduling_strategy.start_time = ( + time.time() + ) # Add a small delay to allow processes to start futures, queues, stop_event = await self._start_processes( manager, executor, scheduling_strategy ) - run_info, requests_iter, times_iter = self._run_setup( + run_info, requests_iter = self._run_setup( futures, scheduling_strategy, max_number, max_duration ) yield SchedulerResult( @@ -145,19 +148,14 @@ async def run( if future.done() and (err := future.exception()) is not None: raise err - if ( - requests_iter is None - and run_info.completed_requests >= run_info.created_requests - ): + if requests_iter is None and run_info.processing_requests <= 0: # we've exhausted all requests we've wanted to run # and yielded all responses break requests_iter = self._add_requests( requests_iter, - times_iter, queues.requests, - queues.times, run_info, ) await asyncio.sleep(0) # enable requests to start @@ -196,7 +194,6 @@ async def _start_processes( requests=manager.Queue( maxsize=scheduling_strategy.processing_requests_limit ), - times=manager.Queue(maxsize=scheduling_strategy.processing_requests_limit), responses=manager.Queue(), ) stop_event = manager.Event() @@ -229,10 +226,12 @@ async def _start_processes( executor, self.worker.process_loop_asynchronous, queues, + scheduling_strategy, stop_event, False, # TODO: Make configurable requests_limit, id_, + num_processes, ) ) @@ -246,11 +245,9 @@ def _run_setup( scheduling_strategy: SchedulingStrategy, max_number: Optional[int], max_duration: Optional[float], - ) -> tuple[SchedulerRunInfo, Iterator[Any], Iterator[float]]: + ) -> tuple[SchedulerRunInfo, Iterator[Any]]: requests_iter = iter(self.request_loader) - start_time = time.time() - times_iter = iter(scheduling_strategy.request_times()) - end_time = time.time() + (max_duration or math.inf) + end_time = scheduling_strategy.start_time + (max_duration or math.inf) end_number = max_number or math.inf try: @@ -268,27 +265,28 @@ def _run_setup( ) info = SchedulerRunInfo( - start_time=start_time, + start_time=scheduling_strategy.start_time, end_time=end_time, end_number=end_number, processes=len(processes), strategy=scheduling_strategy, ) - return info, requests_iter, times_iter + return info, requests_iter def _add_requests( self, requests_iter: Optional[Iterator[Any]], - times_iter: Iterator[float], - requests_queue: Queue[RequestSession[RequestT, ResponseT]], - times_queue: Queue[WorkerProcessRequestTime], + requests_queue: Queue[WorkerProcessRequest[RequestT, ResponseT]], run_info: SchedulerRunInfo, ) -> Optional[Iterator[Any]]: if requests_iter is not None: try: added_count = 0 + if time.time() >= run_info.end_time: + raise StopIteration + while ( not requests_queue.full() and added_count < settings.max_add_requests_per_loop @@ -297,23 +295,16 @@ def _add_requests( raise StopIteration session = next(requests_iter) - requests_queue.put(session) - for _ in range(len(session)): - if ( - request_time := next(times_iter) - ) >= run_info.end_time or time.time() >= run_info.end_time: - raise StopIteration - - work_req = WorkerProcessRequestTime( - start_time=request_time, - timeout_time=run_info.end_time, - queued_time=time.time(), - ) - times_queue.put(work_req) - - run_info.created_requests += 1 - run_info.queued_requests += 1 - added_count += 1 + work_req = WorkerProcessRequest( + session=session, + timeout_time=run_info.end_time, + queued_time=time.time(), + ) + requests_queue.put(work_req) + + run_info.created_requests += len(session) + run_info.queued_requests += len(session) + added_count += len(session) except StopIteration: # we've reached the limit number, limit time, or exhausted the requests # set to None to stop adding more and tell the loop no more requests diff --git a/src/guidellm/scheduler/strategy.py b/src/guidellm/scheduler/strategy.py index 60dd799e..329f0427 100644 --- a/src/guidellm/scheduler/strategy.py +++ b/src/guidellm/scheduler/strategy.py @@ -44,6 +44,10 @@ class SchedulingStrategy(StandardBaseModel): type_: Literal["strategy"] = Field( description="The type of scheduling strategy schedule requests with.", ) + start_time: float = Field( + default_factory=time.time, + description="The start time for the scheduling strategy.", + ) @property def processing_mode(self) -> Literal["sync", "async"]: @@ -175,8 +179,9 @@ def request_times(self) -> Generator[float, None, None]: :return: A generator that yields time.time() for immediate request scheduling. """ + init_time = self.start_time while True: - yield time.time() + yield max(init_time, time.time()) class ConcurrentStrategy(SchedulingStrategy): @@ -262,8 +267,9 @@ def request_times(self) -> Generator[float, None, None]: :return: A generator that yields time.time() for immediate request scheduling. """ + init_time = self.start_time while True: - yield time.time() + yield max(init_time, time.time()) class ThroughputStrategy(SchedulingStrategy): @@ -336,10 +342,9 @@ def request_times(self) -> Generator[float, None, None]: :return: A generator that yields the start time.time() for immediate request scheduling. """ - start_time = time.time() - + init_time = self.start_time while True: - yield start_time + yield init_time class AsyncConstantStrategy(ThroughputStrategy): @@ -391,24 +396,24 @@ def request_times(self) -> Generator[float, None, None]: :return: A generator that yields timestamps for request scheduling. """ - start_time = time.time() constant_increment = 1.0 / self.rate + init_time = self.start_time # handle bursts first to get to the desired rate if self.initial_burst is not None: # send an initial burst equal to the rate # to reach the target rate burst_count = math.floor(self.rate) for _ in range(burst_count): - yield start_time + yield init_time - start_time += constant_increment + init_time += constant_increment counter = 0 # continue with constant rate after bursting while True: - yield start_time + constant_increment * counter + yield init_time + constant_increment * counter counter += 1 @@ -461,24 +466,23 @@ def request_times(self) -> Generator[float, None, None]: :return: A generator that yields timestamps for request scheduling. """ - start_time = time.time() - + init_time = self.start_time if self.initial_burst is not None: # send an initial burst equal to the rate # to reach the target rate burst_count = math.floor(self.rate) for _ in range(burst_count): - yield start_time + yield init_time else: - yield start_time + yield init_time # set the random seed for reproducibility rand = random.Random(self.random_seed) # noqa: S311 while True: inter_arrival_time = rand.expovariate(self.rate) - start_time += inter_arrival_time - yield start_time + init_time += inter_arrival_time + yield init_time def strategy_display_str(strategy: Union[StrategyType, SchedulingStrategy]) -> str: diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index 542d0346..3d792a0c 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -4,6 +4,7 @@ from abc import ABC, abstractmethod from collections.abc import AsyncGenerator from dataclasses import dataclass +from itertools import islice from queue import Empty as QueueEmpty from queue import Queue from threading import Event @@ -27,13 +28,14 @@ ) from guidellm.objects import StandardBaseModel from guidellm.request import GenerationRequest -from guidellm.request.session import RequestSession from guidellm.request.types import RequestT, ResponseT from guidellm.scheduler.result import ( MPQueues, SchedulerRequestInfo, + WorkerProcessRequest, WorkerProcessResult, ) +from guidellm.scheduler.strategy import SchedulingStrategy __all__ = [ "GenerativeRequestsWorker", @@ -117,15 +119,15 @@ async def send_result( async def resolve_scheduler_request( self, - request_session: RequestSession[RequestT, ResponseT], - queued_time: float, + process_request: WorkerProcessRequest[RequestT, ResponseT], dequeued_time: float, start_time: float, - timeout_time: float, results_queue: Queue[WorkerProcessResult[RequestT, ResponseT]], process_id: int, - ) -> Any: - request = request_session.get_next_request() + ) -> WorkerProcessRequest[RequestT, ResponseT]: + request = process_request.session.get_next_request() + timeout_time = process_request.timeout_time + queued_time = process_request.queued_time info = SchedulerRequestInfo( targeted_start_time=start_time, @@ -170,59 +172,66 @@ async def resolve_scheduler_request( ) asyncio.create_task(self.send_result(results_queue, result)) - request_session.push_response(response) - return request_session + process_request.session.push_response(response) + return process_request def process_loop_asynchronous( self, queues: MPQueues[RequestT, ResponseT], + strategy: SchedulingStrategy, stop_event: Event, prioritize_sessions: bool, max_concurrency: int, process_id: int, + num_processes: int, ): async def _process_runner(): lock = asyncio.Semaphore(max_concurrency) - pending_sessions: list[RequestSession[RequestT, ResponseT]] = [] + pending_requests: list[WorkerProcessRequest[RequestT, ResponseT]] = [] + times_iter = islice( + strategy.request_times(), + process_id, + None, + num_processes, + ) + + start_time = None + while not stop_event.is_set(): + if start_time is None: + start_time = next(times_iter) - while True: - await asyncio.sleep(0) # Yield control to the event loop + # Yield control to the event loop. Sleep if we are way ahead + await asyncio.sleep(start_time - time.time() - 1) await lock.acquire() - request_session = None + process_request = None try: - request_session = ( - pending_sessions.pop() - if pending_sessions + process_request = ( + pending_requests.pop() + if pending_requests else queues.requests.get_nowait() ) dequeued_time = time.time() - request_times = queues.times.get_nowait() - except (QueueEmpty, IndexError): - # Requeue the session if we don't have a next time yet - if request_session is not None: - pending_sessions.append(request_session) + except QueueEmpty: lock.release() - if stop_event.is_set(): - return # Exit if stop event is set - else: - continue + continue async def wait_then_requeue( - session: RequestSession[RequestT, ResponseT], + process_request: WorkerProcessRequest[RequestT, ResponseT], ): # Wait to requeue the request session if it specifies a delay - if delay := session.get_next_delay(): + if delay := process_request.session.get_next_delay(): await asyncio.sleep(delay) # Push session to the stack - pending_sessions.append(session) + process_request.queued_time = time.time() + pending_requests.append(process_request) if prioritize_sessions: # Release the lock with the session on top of the stack lock.release() def _request_callback( - session_future: asyncio.Future[RequestSession[RequestT, ResponseT]], + future: asyncio.Future[WorkerProcessRequest[RequestT, ResponseT]], ): # If we are prioritizing sessions, hold # the lock until the session is done @@ -230,25 +239,27 @@ def _request_callback( if not prioritize_sessions: lock.release() - session = session_future.result() - if not session.complete: - asyncio.create_task(wait_then_requeue(session)) + try: + process_request = future.result() + except asyncio.CancelledError: + return + if not process_request.session.complete: + asyncio.create_task(wait_then_requeue(process_request)) elif prioritize_sessions: # no more requests in this session, release the lock lock.release() task = asyncio.create_task( self.resolve_scheduler_request( - request_session=request_session, - queued_time=request_times.queued_time, + process_request=process_request, dequeued_time=dequeued_time, - start_time=request_times.start_time, - timeout_time=request_times.timeout_time, + start_time=start_time, results_queue=queues.responses, process_id=process_id, ) ) task.add_done_callback(_request_callback) + start_time = None try: asyncio.run(_process_runner()) @@ -308,18 +319,22 @@ async def prepare_multiprocessing(self): def process_loop_asynchronous( self, queues: MPQueues[GenerationRequest, ResponseSummary], + strategy: SchedulingStrategy, stop_event: Event, prioritize_sessions: bool, max_concurrency: int, process_id: int, + num_processes: int, ): asyncio.run(self.backend.validate()) super().process_loop_asynchronous( queues=queues, + strategy=strategy, stop_event=stop_event, prioritize_sessions=prioritize_sessions, max_concurrency=max_concurrency, process_id=process_id, + num_processes=num_processes, ) async def resolve( From be9388d6402c4a969b7d8f62ec47803894473a1a Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Wed, 30 Jul 2025 14:50:36 -0400 Subject: [PATCH 04/17] Move queue config to seperate submodule Signed-off-by: Samuel Monson --- src/guidellm/request/session.py | 7 +++++-- src/guidellm/scheduler/queues.py | 25 +++++++++++++++++++++++++ src/guidellm/scheduler/result.py | 11 ----------- src/guidellm/scheduler/scheduler.py | 4 +--- src/guidellm/scheduler/worker.py | 4 +--- 5 files changed, 32 insertions(+), 19 deletions(-) create mode 100644 src/guidellm/scheduler/queues.py diff --git a/src/guidellm/request/session.py b/src/guidellm/request/session.py index 6ea7633b..9e00b37d 100644 --- a/src/guidellm/request/session.py +++ b/src/guidellm/request/session.py @@ -6,12 +6,16 @@ __all__ = ["GenerativeRequestSession", "RequestSession"] -# TODO: Replace with specific types that implement needed features RequestT = TypeVar("RequestT") ResponseT = TypeVar("ResponseT") class RequestSession(ABC, Generic[RequestT, ResponseT]): + """ + A series of requests that build upon each other to + form a conversion between the user and the model. + """ + @abstractmethod def __len__(self) -> int: ... @@ -29,7 +33,6 @@ def push_response(self, response: ResponseT) -> None: ... def complete(self) -> bool: ... -# TODO: Implement multiturn support class GenerativeRequestSession(RequestSession[GenerationRequest, ResponseSummary]): def __init__(self, request: GenerationRequest) -> None: self.request = request diff --git a/src/guidellm/scheduler/queues.py b/src/guidellm/scheduler/queues.py new file mode 100644 index 00000000..6ccc6704 --- /dev/null +++ b/src/guidellm/scheduler/queues.py @@ -0,0 +1,25 @@ +""" +Helper module for importing the correct queue types. +""" + +from dataclasses import dataclass +from queue import Empty as QueueEmpty +from queue import Full as QueueFull +from queue import Queue +from typing import Generic + +from guidellm.request.types import RequestT, ResponseT +from guidellm.scheduler.result import WorkerProcessRequest, WorkerProcessResult + +__all__ = [ + "MPQueues", + "Queue", + "QueueEmpty", + "QueueFull", +] + + +@dataclass +class MPQueues(Generic[RequestT, ResponseT]): + requests: Queue[WorkerProcessRequest[RequestT, ResponseT]] + responses: Queue[WorkerProcessResult[RequestT, ResponseT]] diff --git a/src/guidellm/scheduler/result.py b/src/guidellm/scheduler/result.py index e8b0f877..125b33a7 100644 --- a/src/guidellm/scheduler/result.py +++ b/src/guidellm/scheduler/result.py @@ -1,5 +1,4 @@ from dataclasses import dataclass -from queue import Queue from typing import ( Generic, Literal, @@ -12,7 +11,6 @@ from guidellm.scheduler.strategy import SchedulingStrategy __all__ = [ - "MPQueues", "SchedulerRequestInfo", "SchedulerRequestResult", "SchedulerResult", @@ -143,9 +141,6 @@ class SchedulerRequestResult( response: Optional[ResponseT] = None -# TODO: Move dataclasses somewhere else - - @dataclass class WorkerProcessRequest(Generic[RequestT, ResponseT]): session: RequestSession[RequestT, ResponseT] @@ -159,9 +154,3 @@ class WorkerProcessResult(Generic[RequestT, ResponseT]): request: RequestT response: Optional[ResponseT] info: SchedulerRequestInfo - - -@dataclass -class MPQueues(Generic[RequestT, ResponseT]): - requests: Queue[WorkerProcessRequest[RequestT, ResponseT]] - responses: Queue[WorkerProcessResult[RequestT, ResponseT]] diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 8bf27ed4..d74578b9 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -4,8 +4,6 @@ from collections.abc import AsyncGenerator, Iterable, Iterator from concurrent.futures import ProcessPoolExecutor from multiprocessing import Manager -from queue import Empty as QueueEmpty -from queue import Queue from threading import Event from typing import ( Any, @@ -21,8 +19,8 @@ RequestT, ResponseT, ) +from guidellm.scheduler.queues import MPQueues, Queue, QueueEmpty from guidellm.scheduler.result import ( - MPQueues, SchedulerRequestResult, SchedulerResult, SchedulerRunInfo, diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index 3d792a0c..aaefadaf 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -5,8 +5,6 @@ from collections.abc import AsyncGenerator from dataclasses import dataclass from itertools import islice -from queue import Empty as QueueEmpty -from queue import Queue from threading import Event from typing import ( Any, @@ -29,8 +27,8 @@ from guidellm.objects import StandardBaseModel from guidellm.request import GenerationRequest from guidellm.request.types import RequestT, ResponseT +from guidellm.scheduler.queues import MPQueues, Queue, QueueEmpty from guidellm.scheduler.result import ( - MPQueues, SchedulerRequestInfo, WorkerProcessRequest, WorkerProcessResult, From c0b4574c3482c4af6c321c7fdcfad3b02d08bc76 Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Thu, 31 Jul 2025 13:50:52 -0400 Subject: [PATCH 05/17] Delay process start time and allow the first requests to propgate Signed-off-by: Samuel Monson --- src/guidellm/config.py | 1 + src/guidellm/scheduler/scheduler.py | 18 +++++++++++++----- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/guidellm/config.py b/src/guidellm/config.py index beda55fc..12632002 100644 --- a/src/guidellm/config.py +++ b/src/guidellm/config.py @@ -133,6 +133,7 @@ class Settings(BaseSettings): max_concurrency: int = 512 max_worker_processes: int = 10 max_add_requests_per_loop: int = 20 + scheduler_start_delay: float = 5 # Data settings dataset: DatasetSettings = DatasetSettings() diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index d74578b9..751dbc0c 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -124,9 +124,8 @@ async def run( ) as executor, ): requests_iter: Optional[Iterator[Any]] = None - # TODO: Configurable delay and move somewhere more appropriate scheduling_strategy.start_time = ( - time.time() + time.time() + settings.scheduler_start_delay ) # Add a small delay to allow processes to start futures, queues, stop_event = await self._start_processes( manager, executor, scheduling_strategy @@ -134,6 +133,15 @@ async def run( run_info, requests_iter = self._run_setup( futures, scheduling_strategy, max_number, max_duration ) + + # Add some initial requests to the queue + requests_iter = self._add_requests( + requests_iter, + queues.requests, + run_info, + ) + # Wait for the test to start + await asyncio.sleep(time.time() - scheduling_strategy.start_time) yield SchedulerResult( type_="run_start", run_info=run_info, @@ -285,9 +293,9 @@ def _add_requests( if time.time() >= run_info.end_time: raise StopIteration - while ( - not requests_queue.full() - and added_count < settings.max_add_requests_per_loop + while not requests_queue.full() and added_count < ( + run_info.strategy.queued_requests_limit + or settings.max_add_requests_per_loop ): if run_info.created_requests >= run_info.end_number: raise StopIteration From a524469c24ef8aedb8fc5c072a717486b5ccde44 Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Thu, 4 Sep 2025 10:40:11 -0400 Subject: [PATCH 06/17] Strip out multiturn features Signed-off-by: Samuel Monson --- src/guidellm/request/__init__.py | 3 -- src/guidellm/request/loader.py | 5 ++- src/guidellm/request/session.py | 55 ----------------------------- src/guidellm/scheduler/result.py | 3 +- src/guidellm/scheduler/scheduler.py | 12 +++---- src/guidellm/scheduler/worker.py | 49 +++---------------------- 6 files changed, 13 insertions(+), 114 deletions(-) delete mode 100644 src/guidellm/request/session.py diff --git a/src/guidellm/request/__init__.py b/src/guidellm/request/__init__.py index fd0ec355..85b447d6 100644 --- a/src/guidellm/request/__init__.py +++ b/src/guidellm/request/__init__.py @@ -5,17 +5,14 @@ RequestLoaderDescription, ) from .request import GenerationRequest -from .session import GenerativeRequestSession, RequestSession from .types import RequestT, ResponseT __all__ = [ "GenerationRequest", "GenerativeRequestLoader", "GenerativeRequestLoaderDescription", - "GenerativeRequestSession", "RequestLoader", "RequestLoaderDescription", - "RequestSession", "RequestT", "ResponseT", ] diff --git a/src/guidellm/request/loader.py b/src/guidellm/request/loader.py index 452e4733..48566976 100644 --- a/src/guidellm/request/loader.py +++ b/src/guidellm/request/loader.py @@ -15,7 +15,6 @@ from guidellm.dataset import ColumnInputTypes, load_dataset from guidellm.objects import StandardBaseModel from guidellm.request.request import GenerationRequest -from guidellm.request.session import GenerativeRequestSession __all__ = [ "GenerativeRequestLoader", @@ -106,14 +105,14 @@ def __init__( self.preserve_iter_state = iter_type == "infinite" # ensure no caching requests self._preserved_iter = None - def __iter__(self) -> Iterator[GenerativeRequestSession]: + def __iter__(self) -> Iterator[GenerationRequest]: scope_create_count = 0 while (dataset_iter := self._get_dataset_iter(scope_create_count)) is not None: scope_create_count += 1 for item in dataset_iter: - yield GenerativeRequestSession(self._create_request(item)) + yield self._create_request(item) self._preserved_iter = None diff --git a/src/guidellm/request/session.py b/src/guidellm/request/session.py deleted file mode 100644 index 9e00b37d..00000000 --- a/src/guidellm/request/session.py +++ /dev/null @@ -1,55 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Generic, TypeVar - -from guidellm.backend.response import ResponseSummary -from guidellm.request.request import GenerationRequest - -__all__ = ["GenerativeRequestSession", "RequestSession"] - -RequestT = TypeVar("RequestT") -ResponseT = TypeVar("ResponseT") - - -class RequestSession(ABC, Generic[RequestT, ResponseT]): - """ - A series of requests that build upon each other to - form a conversion between the user and the model. - """ - - @abstractmethod - def __len__(self) -> int: ... - - @abstractmethod - def get_next_request(self) -> RequestT: ... - - @abstractmethod - def get_next_delay(self) -> float: ... - - @abstractmethod - def push_response(self, response: ResponseT) -> None: ... - - @property - @abstractmethod - def complete(self) -> bool: ... - - -class GenerativeRequestSession(RequestSession[GenerationRequest, ResponseSummary]): - def __init__(self, request: GenerationRequest) -> None: - self.request = request - self._complete = False - - def __len__(self) -> int: - return 1 - - def get_next_request(self) -> GenerationRequest: - return self.request - - def get_next_delay(self) -> float: - return 0.0 - - def push_response(self, response: ResponseSummary) -> None: # noqa: ARG002 - self._complete = True - - @property - def complete(self) -> bool: - return self._complete diff --git a/src/guidellm/scheduler/result.py b/src/guidellm/scheduler/result.py index 125b33a7..04fbf931 100644 --- a/src/guidellm/scheduler/result.py +++ b/src/guidellm/scheduler/result.py @@ -6,7 +6,6 @@ ) from guidellm.objects import StandardBaseModel -from guidellm.request.session import RequestSession from guidellm.request.types import RequestT, ResponseT from guidellm.scheduler.strategy import SchedulingStrategy @@ -143,7 +142,7 @@ class SchedulerRequestResult( @dataclass class WorkerProcessRequest(Generic[RequestT, ResponseT]): - session: RequestSession[RequestT, ResponseT] + request: RequestT timeout_time: float queued_time: float diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index 751dbc0c..b3f2dc61 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -234,7 +234,6 @@ async def _start_processes( queues, scheduling_strategy, stop_event, - False, # TODO: Make configurable requests_limit, id_, num_processes, @@ -300,17 +299,16 @@ def _add_requests( if run_info.created_requests >= run_info.end_number: raise StopIteration - session = next(requests_iter) - work_req = WorkerProcessRequest( - session=session, + work_req = WorkerProcessRequest[RequestT, ResponseT]( + request=next(requests_iter), timeout_time=run_info.end_time, queued_time=time.time(), ) requests_queue.put(work_req) - run_info.created_requests += len(session) - run_info.queued_requests += len(session) - added_count += len(session) + run_info.created_requests += 1 + run_info.queued_requests += 1 + added_count += 1 except StopIteration: # we've reached the limit number, limit time, or exhausted the requests # set to None to stop adding more and tell the loop no more requests diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index aaefadaf..ba36559e 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -122,8 +122,8 @@ async def resolve_scheduler_request( start_time: float, results_queue: Queue[WorkerProcessResult[RequestT, ResponseT]], process_id: int, - ) -> WorkerProcessRequest[RequestT, ResponseT]: - request = process_request.session.get_next_request() + ): + request = process_request.request timeout_time = process_request.timeout_time queued_time = process_request.queued_time @@ -170,22 +170,17 @@ async def resolve_scheduler_request( ) asyncio.create_task(self.send_result(results_queue, result)) - process_request.session.push_response(response) - return process_request - def process_loop_asynchronous( self, queues: MPQueues[RequestT, ResponseT], strategy: SchedulingStrategy, stop_event: Event, - prioritize_sessions: bool, max_concurrency: int, process_id: int, num_processes: int, ): async def _process_runner(): lock = asyncio.Semaphore(max_concurrency) - pending_requests: list[WorkerProcessRequest[RequestT, ResponseT]] = [] times_iter = islice( strategy.request_times(), process_id, @@ -202,50 +197,18 @@ async def _process_runner(): await asyncio.sleep(start_time - time.time() - 1) await lock.acquire() - process_request = None try: - process_request = ( - pending_requests.pop() - if pending_requests - else queues.requests.get_nowait() - ) + process_request = queues.requests.get_nowait() dequeued_time = time.time() except QueueEmpty: lock.release() continue - async def wait_then_requeue( - process_request: WorkerProcessRequest[RequestT, ResponseT], - ): - # Wait to requeue the request session if it specifies a delay - if delay := process_request.session.get_next_delay(): - await asyncio.sleep(delay) - - # Push session to the stack - process_request.queued_time = time.time() - pending_requests.append(process_request) - if prioritize_sessions: - # Release the lock with the session on top of the stack - lock.release() - def _request_callback( - future: asyncio.Future[WorkerProcessRequest[RequestT, ResponseT]], + _: asyncio.Future[WorkerProcessRequest[RequestT, ResponseT]], ): - # If we are prioritizing sessions, hold - # the lock until the session is done nonlocal lock - if not prioritize_sessions: - lock.release() - - try: - process_request = future.result() - except asyncio.CancelledError: - return - if not process_request.session.complete: - asyncio.create_task(wait_then_requeue(process_request)) - elif prioritize_sessions: - # no more requests in this session, release the lock - lock.release() + lock.release() task = asyncio.create_task( self.resolve_scheduler_request( @@ -319,7 +282,6 @@ def process_loop_asynchronous( queues: MPQueues[GenerationRequest, ResponseSummary], strategy: SchedulingStrategy, stop_event: Event, - prioritize_sessions: bool, max_concurrency: int, process_id: int, num_processes: int, @@ -329,7 +291,6 @@ def process_loop_asynchronous( queues=queues, strategy=strategy, stop_event=stop_event, - prioritize_sessions=prioritize_sessions, max_concurrency=max_concurrency, process_id=process_id, num_processes=num_processes, From 37b369096a9620ef7017a43f199f0909c6db8f91 Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Thu, 4 Sep 2025 13:46:47 -0400 Subject: [PATCH 07/17] Ensure we meet one of our end conditions before exiting Signed-off-by: Samuel Monson --- src/guidellm/scheduler/scheduler.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index b3f2dc61..da390f88 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -154,7 +154,14 @@ async def run( if future.done() and (err := future.exception()) is not None: raise err - if requests_iter is None and run_info.processing_requests <= 0: + if ( + requests_iter is None + and run_info.processing_requests <= 0 + and ( # Ensure we have met one of the end conditions + time.time() >= run_info.end_time + or run_info.completed_requests >= run_info.end_number + ) + ): # we've exhausted all requests we've wanted to run # and yielded all responses break From bcc2f8ca88dba4994b210d107ccac9c2c128515b Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Thu, 4 Sep 2025 11:34:00 -0400 Subject: [PATCH 08/17] Revert loop logic changes Signed-off-by: Samuel Monson --- src/guidellm/scheduler/scheduler.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index da390f88..b8e72946 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -130,7 +130,7 @@ async def run( futures, queues, stop_event = await self._start_processes( manager, executor, scheduling_strategy ) - run_info, requests_iter = self._run_setup( + run_info, requests_iter, times_iter = self._run_setup( futures, scheduling_strategy, max_number, max_duration ) @@ -138,7 +138,9 @@ async def run( requests_iter = self._add_requests( requests_iter, queues.requests, + times_iter, run_info, + loop_limit=run_info.strategy.queued_requests_limit, ) # Wait for the test to start await asyncio.sleep(time.time() - scheduling_strategy.start_time) @@ -169,6 +171,7 @@ async def run( requests_iter = self._add_requests( requests_iter, queues.requests, + times_iter, run_info, ) await asyncio.sleep(0) # enable requests to start @@ -257,8 +260,9 @@ def _run_setup( scheduling_strategy: SchedulingStrategy, max_number: Optional[int], max_duration: Optional[float], - ) -> tuple[SchedulerRunInfo, Iterator[Any]]: + ) -> tuple[SchedulerRunInfo, Iterator[Any], Iterator[float]]: requests_iter = iter(self.request_loader) + times_iter = iter(scheduling_strategy.request_times()) end_time = scheduling_strategy.start_time + (max_duration or math.inf) end_number = max_number or math.inf @@ -284,28 +288,32 @@ def _run_setup( strategy=scheduling_strategy, ) - return info, requests_iter + return info, requests_iter, times_iter def _add_requests( self, requests_iter: Optional[Iterator[Any]], requests_queue: Queue[WorkerProcessRequest[RequestT, ResponseT]], + times_iter: Iterator[float], run_info: SchedulerRunInfo, + loop_limit: Optional[int] = None, ) -> Optional[Iterator[Any]]: if requests_iter is not None: try: added_count = 0 - if time.time() >= run_info.end_time: - raise StopIteration - while not requests_queue.full() and added_count < ( - run_info.strategy.queued_requests_limit - or settings.max_add_requests_per_loop + loop_limit or settings.max_add_requests_per_loop ): if run_info.created_requests >= run_info.end_number: raise StopIteration + if ( + next(times_iter) >= run_info.end_time + or time.time() >= run_info.end_time + ): + raise StopIteration + work_req = WorkerProcessRequest[RequestT, ResponseT]( request=next(requests_iter), timeout_time=run_info.end_time, From 89b501fdb9f24fe2a9394a6760d6c9194d0ed71e Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Thu, 4 Sep 2025 15:02:20 -0400 Subject: [PATCH 09/17] Set default worker process limit based on number of CPUs Signed-off-by: Samuel Monson --- src/guidellm/config.py | 6 +++++- src/guidellm/scheduler/strategy.py | 8 ++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/guidellm/config.py b/src/guidellm/config.py index 12632002..f7928d4c 100644 --- a/src/guidellm/config.py +++ b/src/guidellm/config.py @@ -1,4 +1,5 @@ import json +import os from collections.abc import Sequence from enum import Enum from typing import Literal, Optional @@ -131,7 +132,10 @@ class Settings(BaseSettings): # Scheduler settings max_concurrency: int = 512 - max_worker_processes: int = 10 + max_worker_processes: int = Field( + # use number of CPUs - 1, but at least 10 + default_factory=lambda: max((os.cpu_count() or 1) - 1, 10) + ) max_add_requests_per_loop: int = 20 scheduler_start_delay: float = 5 diff --git a/src/guidellm/scheduler/strategy.py b/src/guidellm/scheduler/strategy.py index 329f0427..74d19266 100644 --- a/src/guidellm/scheduler/strategy.py +++ b/src/guidellm/scheduler/strategy.py @@ -1,5 +1,4 @@ import math -import os import random import time from collections.abc import Generator @@ -72,9 +71,7 @@ def processes_limit(self) -> int: :return: The number of processes for the scheduling strategy. """ - cpu_cores = os.cpu_count() or 1 - - return min(max(1, cpu_cores - 1), settings.max_worker_processes) + return settings.max_worker_processes @property def queued_requests_limit(self) -> Optional[int]: @@ -231,9 +228,8 @@ def processes_limit(self) -> int: :return: {self.streams} for the concurrent scheduling strategy to limit the worker processes to the number of streams. """ - cpu_cores = os.cpu_count() or 1 - return min(max(1, cpu_cores - 1), self.streams) + return min(self.streams, settings.max_worker_processes) @property def queued_requests_limit(self) -> int: From 9d4d2e8c9c1348b04b9c6454fe607ffe1a83f41c Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Thu, 4 Sep 2025 16:24:57 -0400 Subject: [PATCH 10/17] Prep scheduler for multiturn Revert "Revert loop logic changes" This reverts commit bcc2f8ca88dba4994b210d107ccac9c2c128515b. Revert "Strip out multiturn features" This reverts commit a524469c24ef8aedb8fc5c072a717486b5ccde44. --- src/guidellm/request/__init__.py | 3 ++ src/guidellm/request/loader.py | 5 +-- src/guidellm/request/session.py | 55 +++++++++++++++++++++++++++++ src/guidellm/scheduler/result.py | 3 +- src/guidellm/scheduler/scheduler.py | 36 ++++++++----------- src/guidellm/scheduler/worker.py | 49 ++++++++++++++++++++++--- 6 files changed, 122 insertions(+), 29 deletions(-) create mode 100644 src/guidellm/request/session.py diff --git a/src/guidellm/request/__init__.py b/src/guidellm/request/__init__.py index 85b447d6..fd0ec355 100644 --- a/src/guidellm/request/__init__.py +++ b/src/guidellm/request/__init__.py @@ -5,14 +5,17 @@ RequestLoaderDescription, ) from .request import GenerationRequest +from .session import GenerativeRequestSession, RequestSession from .types import RequestT, ResponseT __all__ = [ "GenerationRequest", "GenerativeRequestLoader", "GenerativeRequestLoaderDescription", + "GenerativeRequestSession", "RequestLoader", "RequestLoaderDescription", + "RequestSession", "RequestT", "ResponseT", ] diff --git a/src/guidellm/request/loader.py b/src/guidellm/request/loader.py index 48566976..452e4733 100644 --- a/src/guidellm/request/loader.py +++ b/src/guidellm/request/loader.py @@ -15,6 +15,7 @@ from guidellm.dataset import ColumnInputTypes, load_dataset from guidellm.objects import StandardBaseModel from guidellm.request.request import GenerationRequest +from guidellm.request.session import GenerativeRequestSession __all__ = [ "GenerativeRequestLoader", @@ -105,14 +106,14 @@ def __init__( self.preserve_iter_state = iter_type == "infinite" # ensure no caching requests self._preserved_iter = None - def __iter__(self) -> Iterator[GenerationRequest]: + def __iter__(self) -> Iterator[GenerativeRequestSession]: scope_create_count = 0 while (dataset_iter := self._get_dataset_iter(scope_create_count)) is not None: scope_create_count += 1 for item in dataset_iter: - yield self._create_request(item) + yield GenerativeRequestSession(self._create_request(item)) self._preserved_iter = None diff --git a/src/guidellm/request/session.py b/src/guidellm/request/session.py new file mode 100644 index 00000000..9e00b37d --- /dev/null +++ b/src/guidellm/request/session.py @@ -0,0 +1,55 @@ +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from guidellm.backend.response import ResponseSummary +from guidellm.request.request import GenerationRequest + +__all__ = ["GenerativeRequestSession", "RequestSession"] + +RequestT = TypeVar("RequestT") +ResponseT = TypeVar("ResponseT") + + +class RequestSession(ABC, Generic[RequestT, ResponseT]): + """ + A series of requests that build upon each other to + form a conversion between the user and the model. + """ + + @abstractmethod + def __len__(self) -> int: ... + + @abstractmethod + def get_next_request(self) -> RequestT: ... + + @abstractmethod + def get_next_delay(self) -> float: ... + + @abstractmethod + def push_response(self, response: ResponseT) -> None: ... + + @property + @abstractmethod + def complete(self) -> bool: ... + + +class GenerativeRequestSession(RequestSession[GenerationRequest, ResponseSummary]): + def __init__(self, request: GenerationRequest) -> None: + self.request = request + self._complete = False + + def __len__(self) -> int: + return 1 + + def get_next_request(self) -> GenerationRequest: + return self.request + + def get_next_delay(self) -> float: + return 0.0 + + def push_response(self, response: ResponseSummary) -> None: # noqa: ARG002 + self._complete = True + + @property + def complete(self) -> bool: + return self._complete diff --git a/src/guidellm/scheduler/result.py b/src/guidellm/scheduler/result.py index 04fbf931..125b33a7 100644 --- a/src/guidellm/scheduler/result.py +++ b/src/guidellm/scheduler/result.py @@ -6,6 +6,7 @@ ) from guidellm.objects import StandardBaseModel +from guidellm.request.session import RequestSession from guidellm.request.types import RequestT, ResponseT from guidellm.scheduler.strategy import SchedulingStrategy @@ -142,7 +143,7 @@ class SchedulerRequestResult( @dataclass class WorkerProcessRequest(Generic[RequestT, ResponseT]): - request: RequestT + session: RequestSession[RequestT, ResponseT] timeout_time: float queued_time: float diff --git a/src/guidellm/scheduler/scheduler.py b/src/guidellm/scheduler/scheduler.py index b8e72946..345879af 100644 --- a/src/guidellm/scheduler/scheduler.py +++ b/src/guidellm/scheduler/scheduler.py @@ -130,7 +130,7 @@ async def run( futures, queues, stop_event = await self._start_processes( manager, executor, scheduling_strategy ) - run_info, requests_iter, times_iter = self._run_setup( + run_info, requests_iter = self._run_setup( futures, scheduling_strategy, max_number, max_duration ) @@ -138,9 +138,7 @@ async def run( requests_iter = self._add_requests( requests_iter, queues.requests, - times_iter, run_info, - loop_limit=run_info.strategy.queued_requests_limit, ) # Wait for the test to start await asyncio.sleep(time.time() - scheduling_strategy.start_time) @@ -171,7 +169,6 @@ async def run( requests_iter = self._add_requests( requests_iter, queues.requests, - times_iter, run_info, ) await asyncio.sleep(0) # enable requests to start @@ -244,6 +241,7 @@ async def _start_processes( queues, scheduling_strategy, stop_event, + False, # TODO: Make configurable requests_limit, id_, num_processes, @@ -260,9 +258,8 @@ def _run_setup( scheduling_strategy: SchedulingStrategy, max_number: Optional[int], max_duration: Optional[float], - ) -> tuple[SchedulerRunInfo, Iterator[Any], Iterator[float]]: + ) -> tuple[SchedulerRunInfo, Iterator[Any]]: requests_iter = iter(self.request_loader) - times_iter = iter(scheduling_strategy.request_times()) end_time = scheduling_strategy.start_time + (max_duration or math.inf) end_number = max_number or math.inf @@ -288,42 +285,39 @@ def _run_setup( strategy=scheduling_strategy, ) - return info, requests_iter, times_iter + return info, requests_iter def _add_requests( self, requests_iter: Optional[Iterator[Any]], requests_queue: Queue[WorkerProcessRequest[RequestT, ResponseT]], - times_iter: Iterator[float], run_info: SchedulerRunInfo, - loop_limit: Optional[int] = None, ) -> Optional[Iterator[Any]]: if requests_iter is not None: try: added_count = 0 + if time.time() >= run_info.end_time: + raise StopIteration + while not requests_queue.full() and added_count < ( - loop_limit or settings.max_add_requests_per_loop + run_info.strategy.queued_requests_limit + or settings.max_add_requests_per_loop ): if run_info.created_requests >= run_info.end_number: raise StopIteration - if ( - next(times_iter) >= run_info.end_time - or time.time() >= run_info.end_time - ): - raise StopIteration - - work_req = WorkerProcessRequest[RequestT, ResponseT]( - request=next(requests_iter), + session = next(requests_iter) + work_req = WorkerProcessRequest( + session=session, timeout_time=run_info.end_time, queued_time=time.time(), ) requests_queue.put(work_req) - run_info.created_requests += 1 - run_info.queued_requests += 1 - added_count += 1 + run_info.created_requests += len(session) + run_info.queued_requests += len(session) + added_count += len(session) except StopIteration: # we've reached the limit number, limit time, or exhausted the requests # set to None to stop adding more and tell the loop no more requests diff --git a/src/guidellm/scheduler/worker.py b/src/guidellm/scheduler/worker.py index ba36559e..aaefadaf 100644 --- a/src/guidellm/scheduler/worker.py +++ b/src/guidellm/scheduler/worker.py @@ -122,8 +122,8 @@ async def resolve_scheduler_request( start_time: float, results_queue: Queue[WorkerProcessResult[RequestT, ResponseT]], process_id: int, - ): - request = process_request.request + ) -> WorkerProcessRequest[RequestT, ResponseT]: + request = process_request.session.get_next_request() timeout_time = process_request.timeout_time queued_time = process_request.queued_time @@ -170,17 +170,22 @@ async def resolve_scheduler_request( ) asyncio.create_task(self.send_result(results_queue, result)) + process_request.session.push_response(response) + return process_request + def process_loop_asynchronous( self, queues: MPQueues[RequestT, ResponseT], strategy: SchedulingStrategy, stop_event: Event, + prioritize_sessions: bool, max_concurrency: int, process_id: int, num_processes: int, ): async def _process_runner(): lock = asyncio.Semaphore(max_concurrency) + pending_requests: list[WorkerProcessRequest[RequestT, ResponseT]] = [] times_iter = islice( strategy.request_times(), process_id, @@ -197,18 +202,50 @@ async def _process_runner(): await asyncio.sleep(start_time - time.time() - 1) await lock.acquire() + process_request = None try: - process_request = queues.requests.get_nowait() + process_request = ( + pending_requests.pop() + if pending_requests + else queues.requests.get_nowait() + ) dequeued_time = time.time() except QueueEmpty: lock.release() continue + async def wait_then_requeue( + process_request: WorkerProcessRequest[RequestT, ResponseT], + ): + # Wait to requeue the request session if it specifies a delay + if delay := process_request.session.get_next_delay(): + await asyncio.sleep(delay) + + # Push session to the stack + process_request.queued_time = time.time() + pending_requests.append(process_request) + if prioritize_sessions: + # Release the lock with the session on top of the stack + lock.release() + def _request_callback( - _: asyncio.Future[WorkerProcessRequest[RequestT, ResponseT]], + future: asyncio.Future[WorkerProcessRequest[RequestT, ResponseT]], ): + # If we are prioritizing sessions, hold + # the lock until the session is done nonlocal lock - lock.release() + if not prioritize_sessions: + lock.release() + + try: + process_request = future.result() + except asyncio.CancelledError: + return + if not process_request.session.complete: + asyncio.create_task(wait_then_requeue(process_request)) + elif prioritize_sessions: + # no more requests in this session, release the lock + lock.release() task = asyncio.create_task( self.resolve_scheduler_request( @@ -282,6 +319,7 @@ def process_loop_asynchronous( queues: MPQueues[GenerationRequest, ResponseSummary], strategy: SchedulingStrategy, stop_event: Event, + prioritize_sessions: bool, max_concurrency: int, process_id: int, num_processes: int, @@ -291,6 +329,7 @@ def process_loop_asynchronous( queues=queues, strategy=strategy, stop_event=stop_event, + prioritize_sessions=prioritize_sessions, max_concurrency=max_concurrency, process_id=process_id, num_processes=num_processes, From 92b2691d2b4da88d273055591e786706f8d30d22 Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Wed, 16 Jul 2025 14:41:16 -0400 Subject: [PATCH 11/17] Implement initial multiturn support --- src/guidellm/request/loader.py | 9 ++++++- src/guidellm/request/session.py | 42 ++++++++++++++++++++++++++------- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/src/guidellm/request/loader.py b/src/guidellm/request/loader.py index 452e4733..04471cc9 100644 --- a/src/guidellm/request/loader.py +++ b/src/guidellm/request/loader.py @@ -107,13 +107,20 @@ def __init__( self._preserved_iter = None def __iter__(self) -> Iterator[GenerativeRequestSession]: + turns = 1 + + data_iter = self._create_requests() + while requests := [i for i, _ in zip(data_iter, range(turns))]: + yield GenerativeRequestSession(requests) + + def _create_requests(self) -> Iterator[GenerationRequest]: scope_create_count = 0 while (dataset_iter := self._get_dataset_iter(scope_create_count)) is not None: scope_create_count += 1 for item in dataset_iter: - yield GenerativeRequestSession(self._create_request(item)) + yield self._create_request(item) self._preserved_iter = None diff --git a/src/guidellm/request/session.py b/src/guidellm/request/session.py index 9e00b37d..2fd9bfbd 100644 --- a/src/guidellm/request/session.py +++ b/src/guidellm/request/session.py @@ -1,3 +1,4 @@ +import itertools from abc import ABC, abstractmethod from typing import Generic, TypeVar @@ -34,22 +35,47 @@ def complete(self) -> bool: ... class GenerativeRequestSession(RequestSession[GenerationRequest, ResponseSummary]): - def __init__(self, request: GenerationRequest) -> None: - self.request = request - self._complete = False + def __init__(self, prompts: list[GenerationRequest]) -> None: + if not prompts: + raise ValueError("Prompts cannot be empty") + + self.prompts = prompts + self.responses: list[str] = [] def __len__(self) -> int: - return 1 + return len(self.prompts) def get_next_request(self) -> GenerationRequest: - return self.request + completed_responses = len(self.responses) + base_request = self.prompts[completed_responses].model_copy(deep=True) + base_request.content = "".join( + itertools.chain.from_iterable( + zip((x.content for x in self.prompts), self.responses + [""]) + ) + ) + base_request.stats["prompt_tokens"] = sum( + x.stats["prompt_tokens"] for x in self.prompts[: completed_responses + 1] + ) + base_request.constraints["output_tokens"] = sum( + x.constraints["output_tokens"] + for x in self.prompts[: completed_responses + 1] + ) + + return base_request def get_next_delay(self) -> float: return 0.0 - def push_response(self, response: ResponseSummary) -> None: # noqa: ARG002 - self._complete = True + def push_response(self, response: ResponseSummary) -> None: + if len(self.responses) < len(self.prompts): + if response.response_output_tokens is not None: + self.prompts[len(self.responses)].constraints["output_tokens"] = ( + response.response_output_tokens + ) + self.responses.append(response.value) + else: + raise ValueError("Response list full") @property def complete(self) -> bool: - return self._complete + return len(self.responses) >= len(self.prompts) From 308840af0178b7aa1dd478d1aa1c5233e1440e0a Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Thu, 17 Jul 2025 17:12:21 -0400 Subject: [PATCH 12/17] Implement item type --- src/guidellm/preprocess/item.py | 60 +++++++++++++++++++++++++++++++ src/guidellm/request/loader.py | 38 ++++++-------------- src/guidellm/request/session.py | 63 +++++++++++++++++++++------------ 3 files changed, 111 insertions(+), 50 deletions(-) create mode 100644 src/guidellm/preprocess/item.py diff --git a/src/guidellm/preprocess/item.py b/src/guidellm/preprocess/item.py new file mode 100644 index 00000000..7a4fb3e3 --- /dev/null +++ b/src/guidellm/preprocess/item.py @@ -0,0 +1,60 @@ +from collections.abc import Sequence +from typing import Generic, Optional, TypeVar, Union + +from pydantic import Field + +from guidellm.objects.pydantic import StandardBaseModel + +PromptT = TypeVar("PromptT") + + +class Item(StandardBaseModel, Generic[PromptT]): + """ + Represents a single item in a dataset, containing a prompt and its associated metadata. + """ + + value: PromptT = Field( + description="The prompt text or data for the item.", + examples=[ + "What is the capital of France?", + "Explain quantum computing in simple terms.", + ], + ) + prompt_tokens: Optional[int] = Field( + default=None, gt=0, description="Number of tokens in the prompt" + ) + output_tokens: Optional[int] = Field( + default=None, gt=0, description="Number of tokens in the output" + ) + + +class ItemList(Sequence[Item[PromptT]]): + """ + Represents a list of items, each containing a prompt and its metadata. + """ + + def __init__(self, *items: Item[PromptT], shared_prefix: Optional[PromptT] = None): + self.shared_prefix: Optional[PromptT] = shared_prefix + self._items: list[Item[PromptT]] = list(items) + + def __getitem__(self, key) -> Union[Item[PromptT], Sequence[Item[PromptT]]]: + return self._items[key] + + def __len__(self) -> int: + return len(self._items) + + @classmethod + def from_lists( + cls, + prompts: list[PromptT], + prompts_tokens: list[Optional[int]], + outputs_tokens: list[Optional[int]], + ) -> "ItemList": + return cls( + *[ + Item(value=prompt, output_tokens=in_t, prompt_tokens=out_t) + for prompt, in_t, out_t in zip( + prompts, prompts_tokens, outputs_tokens, strict=True + ) + ] + ) diff --git a/src/guidellm/request/loader.py b/src/guidellm/request/loader.py index 04471cc9..4ed781f5 100644 --- a/src/guidellm/request/loader.py +++ b/src/guidellm/request/loader.py @@ -11,10 +11,9 @@ from datasets import Dataset, DatasetDict, IterableDataset, IterableDatasetDict from transformers import PreTrainedTokenizerBase # type: ignore[import] -from guidellm.config import settings from guidellm.dataset import ColumnInputTypes, load_dataset from guidellm.objects import StandardBaseModel -from guidellm.request.request import GenerationRequest +from guidellm.preprocess.item import ItemList from guidellm.request.session import GenerativeRequestSession __all__ = [ @@ -107,20 +106,13 @@ def __init__( self._preserved_iter = None def __iter__(self) -> Iterator[GenerativeRequestSession]: - turns = 1 - - data_iter = self._create_requests() - while requests := [i for i, _ in zip(data_iter, range(turns))]: - yield GenerativeRequestSession(requests) - - def _create_requests(self) -> Iterator[GenerationRequest]: scope_create_count = 0 while (dataset_iter := self._get_dataset_iter(scope_create_count)) is not None: scope_create_count += 1 for item in dataset_iter: - yield self._create_request(item) + yield GenerativeRequestSession(self._create_items(item)) self._preserved_iter = None @@ -268,25 +260,17 @@ def _get_dataset_iter( return dataset_iter - def _create_request(self, item: dict[str, Any]) -> GenerationRequest: - prompt_tokens = ( - item[self.column_mappings["prompt_tokens_count_column"]] + def _create_items(self, item: dict[str, Any]) -> ItemList: + prompts = list(item[self.column_mappings["prompt_column"]]) + prompt_tokens: list[Optional[int]] = ( + list(item[self.column_mappings["prompt_tokens_count_column"]]) if "prompt_tokens_count_column" in self.column_mappings - else None + else [None] ) - output_tokens = ( - item[self.column_mappings["output_tokens_count_column"]] + output_tokens: list[Optional[int]] = ( + list(item[self.column_mappings["output_tokens_count_column"]]) if "output_tokens_count_column" in self.column_mappings - else None + else [None] ) - return GenerationRequest( - request_type=settings.preferred_route, - content=item[self.column_mappings["prompt_column"]], - stats=( - {"prompt_tokens": prompt_tokens} if prompt_tokens is not None else {} - ), - constraints=( - {"output_tokens": output_tokens} if output_tokens is not None else {} - ), - ) + return ItemList.from_lists(prompts, prompt_tokens, output_tokens) diff --git a/src/guidellm/request/session.py b/src/guidellm/request/session.py index 2fd9bfbd..c65a6cf9 100644 --- a/src/guidellm/request/session.py +++ b/src/guidellm/request/session.py @@ -1,15 +1,16 @@ import itertools from abc import ABC, abstractmethod -from typing import Generic, TypeVar +from collections.abc import Sequence +from typing import Generic from guidellm.backend.response import ResponseSummary +from guidellm.config import settings +from guidellm.preprocess.item import Item, ItemList from guidellm.request.request import GenerationRequest +from guidellm.request.types import RequestT, ResponseT __all__ = ["GenerativeRequestSession", "RequestSession"] -RequestT = TypeVar("RequestT") -ResponseT = TypeVar("ResponseT") - class RequestSession(ABC, Generic[RequestT, ResponseT]): """ @@ -35,44 +36,60 @@ def complete(self) -> bool: ... class GenerativeRequestSession(RequestSession[GenerationRequest, ResponseSummary]): - def __init__(self, prompts: list[GenerationRequest]) -> None: - if not prompts: + def __init__(self, items: ItemList) -> None: + if len(items) < 1: raise ValueError("Prompts cannot be empty") - self.prompts = prompts - self.responses: list[str] = [] + self.prompts: Sequence[Item] = items + self.responses: list[Item] = [] def __len__(self) -> int: return len(self.prompts) def get_next_request(self) -> GenerationRequest: completed_responses = len(self.responses) - base_request = self.prompts[completed_responses].model_copy(deep=True) - base_request.content = "".join( + + # FIXME: Can only handle string requests + content = "".join( itertools.chain.from_iterable( - zip((x.content for x in self.prompts), self.responses + [""]) + (x.value, y.value) + for x, y in zip(self.prompts, self.responses + [Item(value="")]) ) ) - base_request.stats["prompt_tokens"] = sum( - x.stats["prompt_tokens"] for x in self.prompts[: completed_responses + 1] + + prev_prompt_tokens = sum( + (x.prompt_tokens or 0) + (x.output_tokens or 0) for x in self.responses ) - base_request.constraints["output_tokens"] = sum( - x.constraints["output_tokens"] - for x in self.prompts[: completed_responses + 1] + prompt_tokens = ( + self.prompts[completed_responses].prompt_tokens or 0 + ) + prev_prompt_tokens + + output_tokens = self.prompts[completed_responses].output_tokens + + return GenerationRequest( + request_type=settings.preferred_route, + content=content, + stats=( + {"prompt_tokens": prompt_tokens} if prompt_tokens is not None else {} + ), + constraints=( + {"output_tokens": output_tokens} if output_tokens is not None else {} + ), ) - return base_request - def get_next_delay(self) -> float: return 0.0 def push_response(self, response: ResponseSummary) -> None: if len(self.responses) < len(self.prompts): - if response.response_output_tokens is not None: - self.prompts[len(self.responses)].constraints["output_tokens"] = ( - response.response_output_tokens - ) - self.responses.append(response.value) + resp = Item( + value=response.value, + prompt_tokens=response.response_prompt_tokens + or response.request_prompt_tokens, + output_tokens=response.response_output_tokens + or response.request_output_tokens, + ) + self.responses.append(resp) else: raise ValueError("Response list full") From d020c4bce5fea0298d161c4a3d3aad4ca3adc280 Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Fri, 18 Jul 2025 12:03:33 -0400 Subject: [PATCH 13/17] If prompt/output token count is 0, don't set stats/constraints --- src/guidellm/request/session.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/guidellm/request/session.py b/src/guidellm/request/session.py index c65a6cf9..5e28e35c 100644 --- a/src/guidellm/request/session.py +++ b/src/guidellm/request/session.py @@ -1,7 +1,9 @@ import itertools from abc import ABC, abstractmethod -from collections.abc import Sequence -from typing import Generic +from typing import TYPE_CHECKING, Generic + +if TYPE_CHECKING: + from collections.abc import Sequence from guidellm.backend.response import ResponseSummary from guidellm.config import settings @@ -69,12 +71,8 @@ def get_next_request(self) -> GenerationRequest: return GenerationRequest( request_type=settings.preferred_route, content=content, - stats=( - {"prompt_tokens": prompt_tokens} if prompt_tokens is not None else {} - ), - constraints=( - {"output_tokens": output_tokens} if output_tokens is not None else {} - ), + stats=({"prompt_tokens": prompt_tokens} if prompt_tokens else {}), + constraints=({"output_tokens": output_tokens} if output_tokens else {}), ) def get_next_delay(self) -> float: From f5d7c32edb76317c92e9cde48c21e735794c39a3 Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Fri, 18 Jul 2025 12:08:33 -0400 Subject: [PATCH 14/17] Fix hand in item creation code --- src/guidellm/preprocess/item.py | 16 ---------------- src/guidellm/request/loader.py | 26 +++++++++++++++++--------- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/src/guidellm/preprocess/item.py b/src/guidellm/preprocess/item.py index 7a4fb3e3..9539e63c 100644 --- a/src/guidellm/preprocess/item.py +++ b/src/guidellm/preprocess/item.py @@ -42,19 +42,3 @@ def __getitem__(self, key) -> Union[Item[PromptT], Sequence[Item[PromptT]]]: def __len__(self) -> int: return len(self._items) - - @classmethod - def from_lists( - cls, - prompts: list[PromptT], - prompts_tokens: list[Optional[int]], - outputs_tokens: list[Optional[int]], - ) -> "ItemList": - return cls( - *[ - Item(value=prompt, output_tokens=in_t, prompt_tokens=out_t) - for prompt, in_t, out_t in zip( - prompts, prompts_tokens, outputs_tokens, strict=True - ) - ] - ) diff --git a/src/guidellm/request/loader.py b/src/guidellm/request/loader.py index 4ed781f5..082b8697 100644 --- a/src/guidellm/request/loader.py +++ b/src/guidellm/request/loader.py @@ -13,7 +13,7 @@ from guidellm.dataset import ColumnInputTypes, load_dataset from guidellm.objects import StandardBaseModel -from guidellm.preprocess.item import ItemList +from guidellm.preprocess.item import Item, ItemList from guidellm.request.session import GenerativeRequestSession __all__ = [ @@ -261,16 +261,24 @@ def _get_dataset_iter( return dataset_iter def _create_items(self, item: dict[str, Any]) -> ItemList: - prompts = list(item[self.column_mappings["prompt_column"]]) - prompt_tokens: list[Optional[int]] = ( - list(item[self.column_mappings["prompt_tokens_count_column"]]) + prompts = item[self.column_mappings["prompt_column"]] + prompt_tokens = ( + item[self.column_mappings["prompt_tokens_count_column"]] if "prompt_tokens_count_column" in self.column_mappings - else [None] + else None ) - output_tokens: list[Optional[int]] = ( - list(item[self.column_mappings["output_tokens_count_column"]]) + output_tokens = ( + item[self.column_mappings["output_tokens_count_column"]] if "output_tokens_count_column" in self.column_mappings - else [None] + else None ) - return ItemList.from_lists(prompts, prompt_tokens, output_tokens) + items = ( + Item(value=prompt, output_tokens=out_t, prompt_tokens=in_t) + for prompt, in_t, out_t in zip( + prompts if isinstance(prompts, list) else [prompts], + prompt_tokens if isinstance(prompt_tokens, list) else [prompt_tokens], + output_tokens if isinstance(output_tokens, list) else [output_tokens], + ) + ) + return ItemList(*items) From 1f295f4841ff39fe13223be4414f8a066cf61c41 Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Fri, 18 Jul 2025 12:59:22 -0400 Subject: [PATCH 15/17] Fix ItemList typing --- src/guidellm/preprocess/item.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/guidellm/preprocess/item.py b/src/guidellm/preprocess/item.py index 9539e63c..91801de8 100644 --- a/src/guidellm/preprocess/item.py +++ b/src/guidellm/preprocess/item.py @@ -1,5 +1,5 @@ from collections.abc import Sequence -from typing import Generic, Optional, TypeVar, Union +from typing import Generic, Optional, TypeVar from pydantic import Field @@ -10,7 +10,8 @@ class Item(StandardBaseModel, Generic[PromptT]): """ - Represents a single item in a dataset, containing a prompt and its associated metadata. + Represents a single item in a dataset, + containing a prompt and its associated metadata. """ value: PromptT = Field( @@ -33,11 +34,13 @@ class ItemList(Sequence[Item[PromptT]]): Represents a list of items, each containing a prompt and its metadata. """ + shared_prefix: Optional[PromptT] + def __init__(self, *items: Item[PromptT], shared_prefix: Optional[PromptT] = None): - self.shared_prefix: Optional[PromptT] = shared_prefix - self._items: list[Item[PromptT]] = list(items) + self.shared_prefix = shared_prefix + self._items = list(items) - def __getitem__(self, key) -> Union[Item[PromptT], Sequence[Item[PromptT]]]: + def __getitem__(self, key): return self._items[key] def __len__(self) -> int: From 1c002e1a5bdba6c71ed269f5ceace41fd11c732d Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Fri, 18 Jul 2025 13:17:05 -0400 Subject: [PATCH 16/17] Add turns support to synthetic dataset --- src/guidellm/dataset/synthetic.py | 103 ++++++++++++++++++++---------- 1 file changed, 71 insertions(+), 32 deletions(-) diff --git a/src/guidellm/dataset/synthetic.py b/src/guidellm/dataset/synthetic.py index 8c30f0f7..06972643 100644 --- a/src/guidellm/dataset/synthetic.py +++ b/src/guidellm/dataset/synthetic.py @@ -3,7 +3,7 @@ from collections.abc import Iterable, Iterator from itertools import cycle from pathlib import Path -from typing import Any, Literal, Optional, Union +from typing import Any, Optional, TypedDict, Union import yaml from datasets import ( @@ -69,6 +69,26 @@ class SyntheticDatasetConfig(BaseModel): gt=0, default=None, ) + turns: int = Field( + description="The number of turns in the conversation.", + gt=0, + default=1, + ) + turns_stdev: Optional[int] = Field( + description="The standard deviation of the number of turns.", + gt=0, + default=None, + ) + turns_min: Optional[int] = Field( + description="The minimum number of turns in the conversation.", + gt=0, + default=None, + ) + turns_max: Optional[int] = Field( + description="The maximum number of turns in the conversation.", + gt=0, + default=None, + ) samples: int = Field( description="The number of samples to generate for the dataset.", gt=0, @@ -124,14 +144,13 @@ def parse_config_file(data: Union[str, Path]) -> "SyntheticDatasetConfig": return SyntheticDatasetConfig(**config_dict) -class SyntheticTextItemsGenerator( - Iterable[ - dict[ - Literal["prompt", "prompt_tokens_count", "output_tokens_count"], - Union[str, int], - ] - ] -): +class SyntheticDatasetRow(TypedDict): + prompt: list[str] + prompt_tokens_count: list[int] + output_tokens_count: list[int] + + +class SyntheticTextItemsGenerator(Iterable[SyntheticDatasetRow]): def __init__( self, config: SyntheticDatasetConfig, @@ -147,12 +166,7 @@ def __init__( def __iter__( self, - ) -> Iterator[ - dict[ - Literal["prompt", "prompt_tokens_count", "output_tokens_count"], - Union[str, int], - ] - ]: + ) -> Iterator[SyntheticDatasetRow]: prompt_tokens_sampler = IntegerRangeSampler( average=self.config.prompt_tokens, variance=self.config.prompt_tokens_stdev, @@ -167,6 +181,13 @@ def __iter__( max_value=self.config.output_tokens_max, random_seed=self.random_seed + 1, # ensure diff dist from prompts ) + turns_sampler = IntegerRangeSampler( + average=self.config.turns, + variance=self.config.turns_stdev, + min_value=self.config.turns_min, + max_value=self.config.turns_max, + random_seed=self.random_seed + 7, # ensure diff dist + ) # ensure diff distribution from output tokens rand = random.Random(self.random_seed + 2) # noqa: S311 unique_prefix_iter = cycle(self.processor.get_vocab().values()) @@ -174,24 +195,42 @@ def __iter__( prefix_index = rand.randint(0, len(self.text_creator.words)) prefix_tokens = self._create_prompt(self.config.prefix_tokens, prefix_index) - for _, prompt_tokens, output_tokens in zip( - range(self.config.samples), - prompt_tokens_sampler, - output_tokens_sampler, - ): - start_index = rand.randint(0, len(self.text_creator.words)) - prompt_text = self.processor.decode( - prefix_tokens - + self._create_prompt( - prompt_tokens, start_index, next(unique_prefix_iter) - ), - skip_special_tokens=True, - ) - yield { - "prompt": prompt_text, - "prompt_tokens_count": self.config.prefix_tokens + prompt_tokens, - "output_tokens_count": output_tokens, + for _, turns in zip(range(self.config.samples), turns_sampler): + row: SyntheticDatasetRow = { + "prompt": [], + "prompt_tokens_count": [], + "output_tokens_count": [], } + for i, prompt_tokens, output_tokens in zip( + range(turns), + prompt_tokens_sampler, + output_tokens_sampler, + ): + start_index = rand.randint(0, len(self.text_creator.words)) + # Append the prefix tokens only for the first turn + if i == 0: + prompt_text = self.processor.decode( + prefix_tokens + + self._create_prompt( + prompt_tokens, start_index, next(unique_prefix_iter) + ), + skip_special_tokens=True, + ) + row["prompt"].append(prompt_text) + row["prompt_tokens_count"].append(self.config.prefix_tokens + prompt_tokens) + row["output_tokens_count"].append(output_tokens) + else: + prompt_text = self.processor.decode( + self._create_prompt( + prompt_tokens, start_index, next(unique_prefix_iter) + ), + skip_special_tokens=True, + ) + row["prompt"].append(prompt_text) + row["prompt_tokens_count"].append(prompt_tokens) + row["output_tokens_count"].append(output_tokens) + + yield row def _create_prompt( self, prompt_tokens: int, start_index: int, unique_prefix: Optional[int] = None From d782bae3107976411f55def7072ff691e835208d Mon Sep 17 00:00:00 2001 From: Samuel Monson Date: Wed, 23 Jul 2025 13:40:05 -0400 Subject: [PATCH 17/17] Add multi-turn documentation to readme --- README.md | 7 ++++++- src/guidellm/benchmark/entrypoints.py | 8 ++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 55f8e815..d544673a 100644 --- a/README.md +++ b/README.md @@ -145,7 +145,8 @@ The `guidellm benchmark` command is used to run benchmarks against a generative - `prompt_tokens`: Average number of tokens for prompts. - `output_tokens`: Average number of tokens for outputs. - - `TYPE_stdev`, `TYPE_min`, `TYPE_max`: Standard deviation, minimum, and maximum values for the specified type (e.g., `prompt_tokens`, `output_tokens`). If not provided, will use the provided tokens value only. + - `turns`: Average number of request-response pairs per sample. Values above `1` result in a multi-turn[^1] benchmark. + - `TYPE_stdev`, `TYPE_min`, `TYPE_max`: Standard deviation, minimum, and maximum values for the specified type (e.g., `prompt_tokens`, `output_tokens`, `turns`). If not provided, will use the provided tokens value only. - `samples`: Number of samples to generate, defaults to 1000. - `source`: Source text data for generation, defaults to a local copy of Pride and Prejudice. @@ -261,3 +262,7 @@ If you find GuideLLM helpful in your research or projects, please consider citin howpublished={\url{https://github.com/vllm-project/guidellm}}, } ``` + +- - - + +[^1]: Multi-turn refers to a benchmark where each dataset row represents a series of sequential requests, with each subsequent request building upon the context of the previous ones. diff --git a/src/guidellm/benchmark/entrypoints.py b/src/guidellm/benchmark/entrypoints.py index 2ef85c3e..31f936af 100644 --- a/src/guidellm/benchmark/entrypoints.py +++ b/src/guidellm/benchmark/entrypoints.py @@ -90,11 +90,11 @@ async def benchmark_generative_text( ), random_seed=random_seed, ) - unique_requests = request_loader.num_unique_items(raise_err=False) + unique_samples = request_loader.num_unique_items(raise_err=False) console.print_line( - f"Created loader with {unique_requests} unique requests from {data}.\n\n" - if unique_requests > 0 - else f"Created loader with unknown number unique requests from {data}.\n\n" + f"Created loader with {unique_samples} unique samples from {data}.\n\n" + if unique_samples > 0 + else f"Created loader with unknown number unique samples from {data}.\n\n" ) profile = create_profile(rate_type=rate_type, rate=rate)