1- import time
2- import uuid
3- from abc import ABC , abstractmethod
4- from collections .abc import AsyncGenerator , Iterable
5- from pathlib import Path
1+ from abc import ABC
2+ from collections .abc import AsyncIterator , Iterable
63from typing import (
7- Any ,
8- Callable ,
94 Generic ,
105 Optional ,
116 Union ,
127)
138
14- from transformers import PreTrainedTokenizerBase # type: ignore # noqa: PGH003
15-
16- from guidellm .backend import Backend
179from guidellm .benchmark .aggregator import (
1810 AggregatorT ,
1911 BenchmarkT ,
20- GenerativeBenchmarkAggregator ,
2112)
22- from guidellm .benchmark .benchmark import BenchmarkArgs , GenerativeBenchmark
2313from guidellm .benchmark .profile import Profile
24- from guidellm .request import (
25- GenerativeRequestLoaderDescription ,
26- )
2714from guidellm .scheduler import (
2815 BackendT ,
2916 Environment ,
3017 RequestT ,
18+ RequestTimingsT ,
3119 ResponseT ,
32- ScheduledRequestInfo ,
20+ Scheduler ,
3321 SchedulerState ,
34- SchedulerUpdateAction ,
3522 SchedulingStrategy ,
3623)
3724from guidellm .utils import ThreadSafeSingletonMixin
3825
39- __all__ = ["Benchmarker" , "GenerativeBenchmarker" ]
40-
41-
42- """
43- Scheduler:
44- requests: Iterable[
45- Union[RequestT, Iterable[Union[RequestT, tuple[RequestT, float]]]]
46- ],
47- backend: BackendT[RequestT, ResponseT],
48- strategy: SchedulingStrategy,
49- env: Environment,
50- **constraints: dict[
51- str, Union[int, float, str, ConstraintsResolveArgs, CallableConstraint]
52- ],
53-
54- CallableConstraint = Callable[
55- [SchedulerState, ScheduledRequestInfo], SchedulerUpdateAction
56- ]
57- """
58-
59-
60- CallableConstraintInitializer = Callable [
61- [AggregatorT , BenchmarkT ],
62- Callable [[SchedulerState , ScheduledRequestInfo ], SchedulerUpdateAction ],
63- ]
26+ __all__ = ["Benchmarker" ]
6427
6528
6629class Benchmarker (
67- Generic [AggregatorT , BenchmarkT , RequestT , ResponseT ], ABC , ThreadSafeSingletonMixin
30+ Generic [AggregatorT , BenchmarkT , RequestT , RequestTimingsT , ResponseT ],
31+ ABC ,
32+ ThreadSafeSingletonMixin ,
6833):
6934 async def run (
7035 self ,
@@ -74,186 +39,44 @@ async def run(
7439 backend : BackendT [RequestT , ResponseT ],
7540 profile : Profile ,
7641 environment : Environment ,
77- aggregator : type [AggregatorT ],
78- ) -> AsyncGenerator [
79- BenchmarkerResult [AggregatorT , BenchmarkT , RequestT , ResponseT ], None
42+ aggregator_class : type [AggregatorT ],
43+ ) -> AsyncIterator [
44+ tuple [
45+ Optional [BenchmarkT ],
46+ AggregatorT ,
47+ SchedulingStrategy ,
48+ Optional [SchedulerState ],
49+ ]
8050 ]:
81- try :
82- requests_loader_size = len (self .scheduler .request_loader ) # type: ignore[arg-type]
83- except Exception : # noqa: BLE001
84- requests_loader_size = None
85-
86- strategy_limits = BenchmarkerStrategyLimits (
87- requests_loader_size = requests_loader_size ,
88- max_number_per_strategy = max_number_per_strategy ,
89- max_duration_per_strategy = max_duration_per_strategy ,
90- warmup_percent_per_strategy = warmup_percent_per_strategy ,
91- cooldown_percent_per_strategy = cooldown_percent_per_strategy ,
92- )
93- start_time = time .time ()
94- end_number = len (profile .strategy_types )
95- current_index = - 1
96- run_id = str (uuid .uuid4 ())
97-
98- yield BenchmarkerResult (
99- type_ = "run_start" ,
100- start_time = start_time ,
101- end_number = end_number ,
102- profile = profile ,
103- current_index = current_index ,
104- current_strategy = None ,
105- current_aggregator = None ,
106- current_benchmark = None ,
107- current_result = None ,
108- )
109-
110- while scheduling_strategy := profile .next_strategy ():
111- current_index += 1
112- aggregator = self .create_benchmark_aggregator (
113- run_id = run_id ,
114- profile = profile ,
115- strategy_index = current_index ,
116- strategy = scheduling_strategy ,
117- limits = strategy_limits ,
118- )
119-
120- async for result in self .scheduler .run (
121- scheduling_strategy = scheduling_strategy ,
122- max_number = max_number_per_strategy ,
123- max_duration = max_duration_per_strategy ,
124- ):
125- if result .type_ == "run_start" :
126- yield BenchmarkerResult (
127- type_ = "scheduler_start" ,
128- start_time = start_time ,
129- end_number = end_number ,
130- profile = profile ,
131- current_index = current_index ,
132- current_strategy = scheduling_strategy ,
133- current_aggregator = aggregator ,
134- current_benchmark = None ,
135- current_result = None ,
51+ with self .thread_lock :
52+ strategies_generator = profile .strategies_generator ()
53+ strategy , constraints = next (strategies_generator )
54+
55+ while strategy is not None :
56+ aggregator = aggregator_class (
57+ strategy = strategy , constraints = constraints
58+ )
59+ yield None , aggregator , strategy , None
60+
61+ async for (
62+ response ,
63+ request ,
64+ request_info ,
65+ scheduler_state ,
66+ ) in Scheduler [BackendT , RequestT , RequestTimingsT , ResponseT ].run (
67+ requests = requests ,
68+ backend = backend ,
69+ strategy = strategy ,
70+ env = environment ,
71+ ** constraints ,
72+ ):
73+ aggregator .update (
74+ response = response ,
75+ request = request ,
76+ request_info = request_info ,
13677 )
137- elif result .type_ == "run_complete" :
138- yield BenchmarkerResult (
139- type_ = "scheduler_complete" ,
140- start_time = start_time ,
141- end_number = end_number ,
142- profile = profile ,
143- current_index = current_index ,
144- current_strategy = scheduling_strategy ,
145- current_aggregator = aggregator ,
146- current_benchmark = None ,
147- current_result = None ,
148- )
149- elif isinstance (result , SchedulerRequestResult ):
150- aggregator .add_result (result )
151-
152- yield BenchmarkerResult (
153- type_ = "scheduler_update" ,
154- start_time = start_time ,
155- end_number = end_number ,
156- profile = profile ,
157- current_index = current_index ,
158- current_strategy = scheduling_strategy ,
159- current_aggregator = aggregator ,
160- current_benchmark = None ,
161- current_result = result ,
162- )
163- else :
164- raise ValueError (f"Unexpected result type: { type (result )} " )
165-
166- benchmark : BenchmarkT = aggregator .compile ()
167- profile .completed_strategy (
168- average_rate = benchmark .metrics .requests_per_second .successful .mean ,
169- average_concurrency = benchmark .metrics .request_concurrency .successful .mean ,
170- )
78+ yield None , aggregator , strategy , scheduler_state
17179
172- yield BenchmarkerResult (
173- type_ = "benchmark_compiled" ,
174- start_time = start_time ,
175- end_number = end_number ,
176- profile = profile ,
177- current_index = current_index ,
178- current_strategy = scheduling_strategy ,
179- current_aggregator = None ,
180- current_benchmark = benchmark ,
181- current_result = None ,
182- )
183-
184- yield BenchmarkerResult (
185- type_ = "run_complete" ,
186- start_time = start_time ,
187- end_number = end_number ,
188- profile = profile ,
189- current_index = current_index ,
190- current_strategy = None ,
191- current_aggregator = None ,
192- current_benchmark = None ,
193- current_result = None ,
194- )
195-
196- @abstractmethod
197- def create_benchmark_aggregator (
198- self ,
199- run_id : str ,
200- profile : Profile ,
201- strategy_index : int ,
202- strategy : SchedulingStrategy ,
203- limits : BenchmarkerStrategyLimits ,
204- ) -> AggregatorT : ...
205-
206-
207- class GenerativeBenchmarker (
208- Benchmarker [
209- GenerativeBenchmarkAggregator ,
210- GenerativeBenchmark ,
211- GenerationRequest ,
212- ResponseSummary ,
213- ],
214- ):
215- def __init__ (
216- self ,
217- backend : Backend ,
218- request_loader : Iterable [GenerationRequest ],
219- request_loader_description : GenerativeRequestLoaderDescription ,
220- benchmark_save_extras : Optional [dict [str , Any ]] = None ,
221- processor : Optional [Union [str , Path , PreTrainedTokenizerBase ]] = None ,
222- processor_args : Optional [dict [str , Any ]] = None ,
223- ):
224- super ().__init__ (
225- worker = GenerativeRequestsWorker (backend ),
226- request_loader = request_loader ,
227- requests_loader_description = request_loader_description ,
228- benchmark_save_extras = benchmark_save_extras ,
229- )
230- self .processor = processor
231- self .processor_args = processor_args
232-
233- def create_benchmark_aggregator (
234- self ,
235- run_id : str ,
236- profile : Profile ,
237- strategy_index : int ,
238- strategy : SchedulingStrategy ,
239- limits : BenchmarkerStrategyLimits ,
240- ) -> GenerativeBenchmarkAggregator :
241- return GenerativeBenchmarkAggregator (
242- run_id = run_id ,
243- args = BenchmarkArgs (
244- profile = profile ,
245- strategy_index = strategy_index ,
246- strategy = strategy ,
247- max_number = limits .max_number ,
248- max_duration = limits .max_duration ,
249- warmup_number = limits .warmup_number ,
250- warmup_duration = limits .warmup_duration ,
251- cooldown_number = limits .cooldown_number ,
252- cooldown_duration = limits .cooldown_duration ,
253- ),
254- worker_description = self .worker .description , # type: ignore[arg-type]
255- request_loader_description = self .requests_loader_description , # type: ignore[arg-type]
256- extras = self .benchmark_save_extras or {},
257- processor = self .processor ,
258- processor_args = self .processor_args ,
259- )
80+ benchmark = aggregator .compile ()
81+ yield benchmark , aggregator , strategy , None
82+ strategy , constraints = strategies_generator .send ((benchmark , aggregator ))
0 commit comments