Skip to content

Commit 4f03b8d

Browse files
authored
refactor: stream spill triggering for partial aggregation (#18943)
* refactor: refactor aggregate spiller to support future change * chore: add new aggregate spill writer * chore: make standalone work * fix: forget return * chore: support in cluster mode * test enable_experiment_aggregate * fix * fix: keep experimental spill buckets ordered * fixup * disable settings
1 parent 63ad3ac commit 4f03b8d

24 files changed

+957
-365
lines changed

src/query/expression/src/aggregate/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ pub(crate) const L2_CACHE_SIZE: usize = 1048576 / 2;
6060
// Assume (1 << 20) + (1 << 19) = 1.5MB L3 cache per core (shared), divided by two because hyperthreading
6161
pub(crate) const L3_CACHE_SIZE: usize = 1572864 / 2;
6262

63+
pub(crate) const MAX_RADIX_BITS: u64 = 7;
64+
pub const MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM: u64 = 1 << MAX_RADIX_BITS;
65+
6366
#[derive(Clone, Debug)]
6467
pub struct HashTableConfig {
6568
// Max radix bits across all threads, this is a hint to repartition
@@ -77,7 +80,7 @@ impl Default for HashTableConfig {
7780
Self {
7881
current_max_radix_bits: Arc::new(AtomicU64::new(3)),
7982
initial_radix_bits: 3,
80-
max_radix_bits: 7,
83+
max_radix_bits: MAX_RADIX_BITS,
8184
repartition_radix_bits_incr: 2,
8285
block_fill_factor: 1.8,
8386
partial_agg: false,

src/query/service/src/physical_plans/physical_aggregate_final.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ impl IPhysicalPlan for AggregateFinal {
142142
}
143143

144144
fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
145-
let max_block_size = builder.settings.get_max_block_size()?;
145+
let max_block_rows = builder.settings.get_max_block_size()? as usize;
146+
let max_block_bytes = builder.settings.get_max_block_bytes()? as usize;
146147
let enable_experimental_aggregate_hashtable = builder
147148
.settings
148149
.get_enable_experimental_aggregate_hashtable()?;
@@ -161,9 +162,10 @@ impl IPhysicalPlan for AggregateFinal {
161162
&self.agg_funcs,
162163
enable_experimental_aggregate_hashtable,
163164
is_cluster_aggregate,
164-
max_block_size as usize,
165165
max_spill_io_requests as usize,
166166
enable_experiment_aggregate,
167+
max_block_rows,
168+
max_block_bytes,
167169
)?;
168170

169171
if params.group_columns.is_empty() {

src/query/service/src/physical_plans/physical_aggregate_partial.rs

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use databend_common_expression::DataSchemaRefExt;
2727
use databend_common_expression::HashTableConfig;
2828
use databend_common_expression::LimitType;
2929
use databend_common_expression::SortColumnDescription;
30+
use databend_common_expression::MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM;
3031
use databend_common_functions::aggregates::AggregateFunctionFactory;
3132
use databend_common_pipeline::core::ProcessorPtr;
3233
use databend_common_pipeline_transforms::sorts::TransformSortPartial;
@@ -44,7 +45,9 @@ use crate::physical_plans::physical_plan::IPhysicalPlan;
4445
use crate::physical_plans::physical_plan::PhysicalPlan;
4546
use crate::physical_plans::physical_plan::PhysicalPlanMeta;
4647
use crate::pipelines::processors::transforms::aggregator::AggregateInjector;
48+
use crate::pipelines::processors::transforms::aggregator::NewTransformAggregateSpillWriter;
4749
use crate::pipelines::processors::transforms::aggregator::PartialSingleStateAggregator;
50+
use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream;
4851
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillWriter;
4952
use crate::pipelines::processors::transforms::aggregator::TransformPartialAggregate;
5053
use crate::pipelines::PipelineBuilder;
@@ -169,7 +172,8 @@ impl IPhysicalPlan for AggregatePartial {
169172
fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
170173
self.input.build_pipeline(builder)?;
171174

172-
let max_block_size = builder.settings.get_max_block_size()?;
175+
let max_block_rows = builder.settings.get_max_block_size()? as usize;
176+
let max_block_bytes = builder.settings.get_max_block_bytes()? as usize;
173177
let max_threads = builder.settings.get_max_threads()?;
174178
let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?;
175179

@@ -185,9 +189,10 @@ impl IPhysicalPlan for AggregatePartial {
185189
&self.agg_funcs,
186190
enable_experimental_aggregate_hashtable,
187191
builder.is_exchange_parent(),
188-
max_block_size as usize,
189192
max_spill_io_requests as usize,
190193
enable_experiment_aggregate,
194+
max_block_rows,
195+
max_block_bytes,
191196
)?;
192197

193198
if params.group_columns.is_empty() {
@@ -241,19 +246,37 @@ impl IPhysicalPlan for AggregatePartial {
241246
if !builder.is_exchange_parent() {
242247
let operator = DataOperator::instance().spill_operator();
243248
let location_prefix = builder.ctx.query_id_spill_prefix();
244-
245-
builder.main_pipeline.add_transform(|input, output| {
246-
Ok(ProcessorPtr::create(
247-
TransformAggregateSpillWriter::try_create(
248-
builder.ctx.clone(),
249-
input,
250-
output,
251-
operator.clone(),
252-
params.clone(),
253-
location_prefix.clone(),
254-
)?,
255-
))
256-
})?;
249+
if params.enable_experiment_aggregate {
250+
let shared_partition_stream = SharedPartitionStream::new(
251+
builder.main_pipeline.output_len(),
252+
max_block_rows,
253+
max_block_bytes,
254+
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
255+
);
256+
builder.main_pipeline.add_transform(|input, output| {
257+
Ok(ProcessorPtr::create(
258+
NewTransformAggregateSpillWriter::try_create(
259+
input,
260+
output,
261+
builder.ctx.clone(),
262+
shared_partition_stream.clone(),
263+
)?,
264+
))
265+
})?;
266+
} else {
267+
builder.main_pipeline.add_transform(|input, output| {
268+
Ok(ProcessorPtr::create(
269+
TransformAggregateSpillWriter::try_create(
270+
builder.ctx.clone(),
271+
input,
272+
output,
273+
operator.clone(),
274+
params.clone(),
275+
location_prefix.clone(),
276+
)?,
277+
))
278+
})?;
279+
}
257280
}
258281

259282
builder.exchange_injector = AggregateInjector::create(builder.ctx.clone(), params.clone());

src/query/service/src/pipelines/builders/builder_aggregate.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ impl PipelineBuilder {
3636
agg_funcs: &[AggregateFunctionDesc],
3737
enable_experimental_aggregate_hashtable: bool,
3838
cluster_aggregator: bool,
39-
max_block_size: usize,
4039
max_spill_io_requests: usize,
4140
enable_experiment_aggregate: bool,
41+
max_block_rows: usize,
42+
max_block_bytes: usize,
4243
) -> Result<Arc<AggregatorParams>> {
4344
let mut agg_args = Vec::with_capacity(agg_funcs.len());
4445
let (group_by, group_data_types) = group_by
@@ -131,9 +132,10 @@ impl PipelineBuilder {
131132
&agg_args,
132133
enable_experimental_aggregate_hashtable,
133134
cluster_aggregator,
134-
max_block_size,
135135
max_spill_io_requests,
136136
enable_experiment_aggregate,
137+
max_block_rows,
138+
max_block_bytes,
137139
)?;
138140

139141
log::debug!("aggregate states layout: {:?}", params.states_layout);

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use databend_common_expression::DataBlock;
2222
use databend_common_expression::PartitionedPayload;
2323
use databend_common_expression::Payload;
2424
use databend_common_expression::PayloadFlushState;
25+
use databend_common_expression::MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM;
2526
use databend_common_pipeline::core::Pipeline;
2627
use databend_common_pipeline::core::ProcessorPtr;
2728
use databend_common_settings::FlightCompression;
@@ -31,6 +32,8 @@ use crate::pipelines::processors::transforms::aggregator::aggregate_meta::Aggreg
3132
use crate::pipelines::processors::transforms::aggregator::serde::TransformExchangeAggregateSerializer;
3233
use crate::pipelines::processors::transforms::aggregator::serde::TransformExchangeAsyncBarrier;
3334
use crate::pipelines::processors::transforms::aggregator::AggregatorParams;
35+
use crate::pipelines::processors::transforms::aggregator::NewTransformAggregateSpillWriter;
36+
use crate::pipelines::processors::transforms::aggregator::SharedPartitionStream;
3437
use crate::pipelines::processors::transforms::aggregator::TransformAggregateDeserializer;
3538
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSerializer;
3639
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillWriter;
@@ -59,7 +62,6 @@ impl ExchangeSorting for AggregateExchangeSorting {
5962
))),
6063
Some(meta_info) => match meta_info {
6164
AggregateMeta::Partitioned { .. } => unreachable!(),
62-
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
6365
AggregateMeta::Serialized(v) => {
6466
compute_block_number(v.bucket, v.max_partition_count)
6567
}
@@ -68,7 +70,9 @@ impl ExchangeSorting for AggregateExchangeSorting {
6870
}
6971
AggregateMeta::AggregateSpilling(_)
7072
| AggregateMeta::Spilled(_)
71-
| AggregateMeta::BucketSpilled(_) => Ok(-1),
73+
| AggregateMeta::BucketSpilled(_)
74+
| AggregateMeta::NewBucketSpilled(_)
75+
| AggregateMeta::NewSpilled(_) => Ok(-1),
7276
},
7377
},
7478
}
@@ -183,6 +187,7 @@ impl FlightScatter for HashTableHashScatter {
183187
AggregateMeta::Serialized(_) => unreachable!(),
184188
AggregateMeta::Partitioned { .. } => unreachable!(),
185189
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
190+
AggregateMeta::NewSpilled(_) => unreachable!(),
186191
AggregateMeta::AggregateSpilling(payload) => {
187192
for p in scatter_partitioned_payload(payload, self.buckets)? {
188193
blocks.push(DataBlock::empty_with_meta(
@@ -259,21 +264,41 @@ impl ExchangeInjector for AggregateInjector {
259264
) -> Result<()> {
260265
let params = self.aggregator_params.clone();
261266

262-
let operator = DataOperator::instance().spill_operator();
263-
let location_prefix = self.ctx.query_id_spill_prefix();
264-
265-
pipeline.add_transform(|input, output| {
266-
Ok(ProcessorPtr::create(
267-
TransformAggregateSpillWriter::try_create(
268-
self.ctx.clone(),
269-
input,
270-
output,
271-
operator.clone(),
272-
params.clone(),
273-
location_prefix.clone(),
274-
)?,
275-
))
276-
})?;
267+
if self.aggregator_params.enable_experiment_aggregate {
268+
let shared_partition_stream = SharedPartitionStream::new(
269+
pipeline.output_len(),
270+
self.aggregator_params.max_block_rows,
271+
self.aggregator_params.max_block_bytes,
272+
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
273+
);
274+
275+
pipeline.add_transform(|input, output| {
276+
Ok(ProcessorPtr::create(
277+
NewTransformAggregateSpillWriter::try_create(
278+
input,
279+
output,
280+
self.ctx.clone(),
281+
shared_partition_stream.clone(),
282+
)?,
283+
))
284+
})?;
285+
} else {
286+
let operator = DataOperator::instance().spill_operator();
287+
let location_prefix = self.ctx.query_id_spill_prefix();
288+
289+
pipeline.add_transform(|input, output| {
290+
Ok(ProcessorPtr::create(
291+
TransformAggregateSpillWriter::try_create(
292+
self.ctx.clone(),
293+
input,
294+
output,
295+
operator.clone(),
296+
params.clone(),
297+
location_prefix.clone(),
298+
)?,
299+
))
300+
})?;
301+
}
277302

278303
pipeline.add_transform(|input, output| {
279304
TransformAggregateSerializer::try_create(input, output, params.clone())
@@ -290,14 +315,25 @@ impl ExchangeInjector for AggregateInjector {
290315
let operator = DataOperator::instance().spill_operator();
291316
let location_prefix = self.ctx.query_id_spill_prefix();
292317

293-
let schema = shuffle_params.schema.clone();
294318
let local_id = &shuffle_params.executor_id;
295319
let local_pos = shuffle_params
296320
.destination_ids
297321
.iter()
298322
.position(|x| x == local_id)
299323
.unwrap();
300324

325+
let mut partition_streams = vec![];
326+
if self.aggregator_params.enable_experiment_aggregate {
327+
for _i in 0..shuffle_params.destination_ids.len() {
328+
partition_streams.push(SharedPartitionStream::new(
329+
pipeline.output_len(),
330+
self.aggregator_params.max_block_rows,
331+
self.aggregator_params.max_block_bytes,
332+
MAX_AGGREGATE_HASHTABLE_BUCKETS_NUM as usize,
333+
));
334+
}
335+
}
336+
301337
pipeline.add_transform(|input, output| {
302338
Ok(ProcessorPtr::create(
303339
TransformExchangeAggregateSerializer::try_create(
@@ -308,12 +344,11 @@ impl ExchangeInjector for AggregateInjector {
308344
location_prefix.clone(),
309345
params.clone(),
310346
compression,
311-
schema.clone(),
312347
local_pos,
348+
partition_streams.clone(),
313349
)?,
314350
))
315351
})?;
316-
317352
pipeline.add_transform(TransformExchangeAsyncBarrier::try_create)
318353
}
319354

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,9 @@ pub enum AggregateMeta {
138138
Spilled(Vec<BucketSpilledPayload>),
139139

140140
Partitioned { bucket: isize, data: Vec<Self> },
141+
141142
NewBucketSpilled(NewSpilledPayload),
143+
NewSpilled(Vec<NewSpilledPayload>),
142144
}
143145

144146
impl AggregateMeta {
@@ -182,9 +184,13 @@ impl AggregateMeta {
182184
Box::new(AggregateMeta::Partitioned { data, bucket })
183185
}
184186

185-
pub fn create_new_spilled(payload: NewSpilledPayload) -> BlockMetaInfoPtr {
187+
pub fn create_new_bucket_spilled(payload: NewSpilledPayload) -> BlockMetaInfoPtr {
186188
Box::new(AggregateMeta::NewBucketSpilled(payload))
187189
}
190+
191+
pub fn create_new_spilled(payloads: Vec<NewSpilledPayload>) -> BlockMetaInfoPtr {
192+
Box::new(AggregateMeta::NewSpilled(payloads))
193+
}
188194
}
189195

190196
impl serde::Serialize for AggregateMeta {
@@ -215,6 +221,7 @@ impl Debug for AggregateMeta {
215221
AggregateMeta::NewBucketSpilled(_) => {
216222
f.debug_struct("Aggregate::NewBucketSpilled").finish()
217223
}
224+
AggregateMeta::NewSpilled(_) => f.debug_struct("Aggregate::NewSpilled").finish(),
218225
AggregateMeta::AggregatePayload(_) => {
219226
f.debug_struct("AggregateMeta:AggregatePayload").finish()
220227
}

src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,16 @@ pub struct AggregatorParams {
3939

4040
pub enable_experimental_aggregate_hashtable: bool,
4141
pub cluster_aggregator: bool,
42-
pub max_block_size: usize,
4342
pub max_spill_io_requests: usize,
4443

4544
pub enable_experiment_aggregate: bool,
45+
46+
pub max_block_rows: usize,
47+
pub max_block_bytes: usize,
4648
}
4749

4850
impl AggregatorParams {
51+
#[allow(clippy::too_many_arguments)]
4952
pub fn try_create(
5053
input_schema: DataSchemaRef,
5154
group_data_types: Vec<DataType>,
@@ -54,9 +57,10 @@ impl AggregatorParams {
5457
agg_args: &[Vec<usize>],
5558
enable_experimental_aggregate_hashtable: bool,
5659
cluster_aggregator: bool,
57-
max_block_size: usize,
5860
max_spill_io_requests: usize,
5961
enable_experiment_aggregate: bool,
62+
max_block_rows: usize,
63+
max_block_bytes: usize,
6064
) -> Result<Arc<AggregatorParams>> {
6165
let states_layout = if !agg_funcs.is_empty() {
6266
Some(get_states_layout(agg_funcs)?)
@@ -73,7 +77,8 @@ impl AggregatorParams {
7377
states_layout,
7478
enable_experimental_aggregate_hashtable,
7579
cluster_aggregator,
76-
max_block_size,
80+
max_block_rows,
81+
max_block_bytes,
7782
max_spill_io_requests,
7883
enable_experiment_aggregate,
7984
}))

0 commit comments

Comments
 (0)