Skip to content

Commit 5b874fa

Browse files
authored
refactor: refine experimental final aggregate spill (#18907)
* fix: repartition not work fix: re-enable check spill by setting max spill depth refactor: split local/shared state * write should close * fix: try improve performance * used for CI * fix: result incorrect when cluster+force_spill * fixup * max_aggregate_spill_level = 0 * refactor: release hashtable instead of resetting when memory pressure is high * make clippy happy * try fix * revert the previous performance improve with final_aggregate * clean * refactor: try to improve hits benchmark and clean * disable
1 parent 0509e10 commit 5b874fa

21 files changed

+1142
-827
lines changed

src/query/service/src/physical_plans/physical_aggregate_final.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,7 @@ impl IPhysicalPlan for AggregateFinal {
148148
.get_enable_experimental_aggregate_hashtable()?;
149149
let max_spill_io_requests = builder.settings.get_max_spill_io_requests()?;
150150
let max_restore_worker = builder.settings.get_max_aggregate_restore_worker()?;
151-
let experiment_aggregate_final =
152-
builder.settings.get_enable_experiment_aggregate_final()?;
151+
let enable_experiment_aggregate = builder.settings.get_enable_experiment_aggregate()?;
153152

154153
let mut is_cluster_aggregate = false;
155154
if ExchangeSource::check_physical_plan(&self.input) {
@@ -164,6 +163,7 @@ impl IPhysicalPlan for AggregateFinal {
164163
is_cluster_aggregate,
165164
max_block_size as usize,
166165
max_spill_io_requests as usize,
166+
enable_experiment_aggregate,
167167
)?;
168168

169169
if params.group_columns.is_empty() {
@@ -201,7 +201,6 @@ impl IPhysicalPlan for AggregateFinal {
201201
params.clone(),
202202
max_restore_worker,
203203
after_group_parallel,
204-
experiment_aggregate_final,
205204
builder.ctx.clone(),
206205
)
207206
}

src/query/service/src/physical_plans/physical_aggregate_partial.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ impl IPhysicalPlan for AggregatePartial {
178178
.settings
179179
.get_enable_experimental_aggregate_hashtable()?;
180180

181+
let enable_experiment_aggregate = builder.settings.get_enable_experiment_aggregate()?;
182+
181183
let params = PipelineBuilder::build_aggregator_params(
182184
self.input.output_schema()?,
183185
&self.group_by,
@@ -186,6 +188,7 @@ impl IPhysicalPlan for AggregatePartial {
186188
builder.is_exchange_parent(),
187189
max_block_size as usize,
188190
max_spill_io_requests as usize,
191+
enable_experiment_aggregate,
189192
)?;
190193

191194
if params.group_columns.is_empty() {

src/query/service/src/pipelines/builders/builder_aggregate.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ impl PipelineBuilder {
3838
cluster_aggregator: bool,
3939
max_block_size: usize,
4040
max_spill_io_requests: usize,
41+
enable_experiment_aggregate: bool,
4142
) -> Result<Arc<AggregatorParams>> {
4243
let mut agg_args = Vec::with_capacity(agg_funcs.len());
4344
let (group_by, group_data_types) = group_by
@@ -132,6 +133,7 @@ impl PipelineBuilder {
132133
cluster_aggregator,
133134
max_block_size,
134135
max_spill_io_requests,
136+
enable_experiment_aggregate,
135137
)?;
136138

137139
log::debug!("aggregate states layout: {:?}", params.states_layout);

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_exchange_injector.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ impl ExchangeSorting for AggregateExchangeSorting {
5959
))),
6060
Some(meta_info) => match meta_info {
6161
AggregateMeta::Partitioned { .. } => unreachable!(),
62+
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
6263
AggregateMeta::Serialized(v) => {
6364
compute_block_number(v.bucket, v.max_partition_count)
6465
}
@@ -181,6 +182,7 @@ impl FlightScatter for HashTableHashScatter {
181182
AggregateMeta::BucketSpilled(_) => unreachable!(),
182183
AggregateMeta::Serialized(_) => unreachable!(),
183184
AggregateMeta::Partitioned { .. } => unreachable!(),
185+
AggregateMeta::NewBucketSpilled(_) => unreachable!(),
184186
AggregateMeta::AggregateSpilling(payload) => {
185187
for p in scatter_partitioned_payload(payload, self.buckets)? {
186188
blocks.push(DataBlock::empty_with_meta(

src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_meta.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use databend_common_expression::PartitionedPayload;
3131
use databend_common_expression::Payload;
3232
use databend_common_expression::ProbeState;
3333
use databend_common_expression::ProjectedBlock;
34+
use parquet::file::metadata::RowGroupMetaData;
3435

3536
pub struct SerializedPayload {
3637
pub bucket: isize,
@@ -116,6 +117,12 @@ pub struct BucketSpilledPayload {
116117
pub max_partition_count: usize,
117118
}
118119

120+
pub struct NewSpilledPayload {
121+
pub bucket: isize,
122+
pub location: String,
123+
pub row_group: RowGroupMetaData,
124+
}
125+
119126
pub struct AggregatePayload {
120127
pub bucket: isize,
121128
pub payload: Payload,
@@ -131,6 +138,7 @@ pub enum AggregateMeta {
131138
Spilled(Vec<BucketSpilledPayload>),
132139

133140
Partitioned { bucket: isize, data: Vec<Self> },
141+
NewBucketSpilled(NewSpilledPayload),
134142
}
135143

136144
impl AggregateMeta {
@@ -173,6 +181,10 @@ impl AggregateMeta {
173181
pub fn create_partitioned(bucket: isize, data: Vec<Self>) -> BlockMetaInfoPtr {
174182
Box::new(AggregateMeta::Partitioned { data, bucket })
175183
}
184+
185+
pub fn create_new_spilled(payload: NewSpilledPayload) -> BlockMetaInfoPtr {
186+
Box::new(AggregateMeta::NewBucketSpilled(payload))
187+
}
176188
}
177189

178190
impl serde::Serialize for AggregateMeta {
@@ -200,6 +212,9 @@ impl Debug for AggregateMeta {
200212
}
201213
AggregateMeta::Spilled(_) => f.debug_struct("Aggregate::Spilled").finish(),
202214
AggregateMeta::BucketSpilled(_) => f.debug_struct("Aggregate::BucketSpilled").finish(),
215+
AggregateMeta::NewBucketSpilled(_) => {
216+
f.debug_struct("Aggregate::NewBucketSpilled").finish()
217+
}
203218
AggregateMeta::AggregatePayload(_) => {
204219
f.debug_struct("AggregateMeta:AggregatePayload").finish()
205220
}

src/query/service/src/pipelines/processors/transforms/aggregator/aggregator_params.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ pub struct AggregatorParams {
4141
pub cluster_aggregator: bool,
4242
pub max_block_size: usize,
4343
pub max_spill_io_requests: usize,
44+
45+
pub enable_experiment_aggregate: bool,
4446
}
4547

4648
impl AggregatorParams {
@@ -54,6 +56,7 @@ impl AggregatorParams {
5456
cluster_aggregator: bool,
5557
max_block_size: usize,
5658
max_spill_io_requests: usize,
59+
enable_experiment_aggregate: bool,
5760
) -> Result<Arc<AggregatorParams>> {
5861
let states_layout = if !agg_funcs.is_empty() {
5962
Some(get_states_layout(agg_funcs)?)
@@ -72,6 +75,7 @@ impl AggregatorParams {
7275
cluster_aggregator,
7376
max_block_size,
7477
max_spill_io_requests,
78+
enable_experiment_aggregate,
7579
}))
7680
}
7781

src/query/service/src/pipelines/processors/transforms/aggregator/build_partition_bucket.rs

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17+
use databend_common_catalog::table_context::TableContext;
1718
use databend_common_exception::Result;
1819
use databend_common_pipeline_core::processors::InputPort;
1920
use databend_common_pipeline_core::processors::OutputPort;
@@ -27,10 +28,10 @@ use parking_lot::Mutex;
2728
use tokio::sync::Barrier;
2829
use tokio::sync::Semaphore;
2930

30-
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::FinalAggregateSharedState;
31-
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::FinalAggregateSpiller;
32-
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::NewFinalAggregateTransform;
33-
use crate::pipelines::processors::transforms::aggregator::new_final_aggregate::TransformPartitionBucketScatter;
31+
use crate::pipelines::processors::transforms::aggregator::new_aggregate::FinalAggregateSharedState;
32+
use crate::pipelines::processors::transforms::aggregator::new_aggregate::NewAggregateSpiller;
33+
use crate::pipelines::processors::transforms::aggregator::new_aggregate::NewFinalAggregateTransform;
34+
use crate::pipelines::processors::transforms::aggregator::new_aggregate::TransformPartitionBucketScatter;
3435
use crate::pipelines::processors::transforms::aggregator::transform_partition_bucket::TransformPartitionBucket;
3536
use crate::pipelines::processors::transforms::aggregator::AggregatorParams;
3637
use crate::pipelines::processors::transforms::aggregator::TransformAggregateSpillReader;
@@ -43,8 +44,6 @@ fn build_partition_bucket_experimental(
4344
after_worker: usize,
4445
ctx: Arc<QueryContext>,
4546
) -> Result<()> {
46-
let operator = DataOperator::instance().spill_operator();
47-
4847
// PartitionedPayload only accept power of two partitions
4948
let mut output_num = after_worker.next_power_of_two();
5049
const MAX_PARTITION_COUNT: usize = 128;
@@ -71,8 +70,13 @@ fn build_partition_bucket_experimental(
7170
let barrier = Arc::new(Barrier::new(output_num));
7271
let shared_state = Arc::new(Mutex::new(FinalAggregateSharedState::new(output_num)));
7372

73+
let settings = ctx.get_settings();
74+
let rows = settings.get_max_block_size()? as usize;
75+
let bytes = settings.get_max_block_bytes()? as usize;
76+
let max_aggregate_spill_level = settings.get_max_aggregate_spill_level()? as usize;
77+
7478
for id in 0..output_num {
75-
let spiller = FinalAggregateSpiller::try_create(ctx.clone(), operator.clone())?;
79+
let spiller = NewAggregateSpiller::try_create(ctx.clone(), output_num, rows, bytes)?;
7680
let input_port = InputPort::create();
7781
let output_port = OutputPort::create();
7882
let processor = NewFinalAggregateTransform::try_create(
@@ -84,7 +88,7 @@ fn build_partition_bucket_experimental(
8488
barrier.clone(),
8589
shared_state.clone(),
8690
spiller,
87-
ctx.clone(),
91+
max_aggregate_spill_level,
8892
)?;
8993
builder.add_transform(input_port, output_port, ProcessorPtr::create(processor));
9094
}
@@ -103,8 +107,8 @@ fn build_partition_bucket_legacy(
103107
) -> Result<()> {
104108
let operator = DataOperator::instance().spill_operator();
105109

106-
let input_nums = pipeline.output_len();
107-
let transform = TransformPartitionBucket::create(input_nums, params.clone())?;
110+
let input_num = pipeline.output_len();
111+
let transform = TransformPartitionBucket::create(input_num, params.clone())?;
108112

109113
let output = transform.get_output();
110114
let inputs_port = transform.get_inputs();
@@ -115,7 +119,7 @@ fn build_partition_bucket_legacy(
115119
vec![output],
116120
)]));
117121

118-
pipeline.try_resize(std::cmp::min(input_nums, max_restore_worker as usize))?;
122+
pipeline.try_resize(std::cmp::min(input_num, max_restore_worker as usize))?;
119123
let semaphore = Arc::new(Semaphore::new(params.max_spill_io_requests));
120124
pipeline.add_transform(|input, output| {
121125
let operator = operator.clone();
@@ -133,17 +137,16 @@ fn build_partition_bucket_legacy(
133137
Ok(())
134138
}
135139

136-
/// Build partition bucket pipeline based on the experiment_aggregate_final flag.
140+
/// Build partition bucket pipeline based on the experiment_aggregate flag.
137141
/// Dispatches to either experimental or legacy implementation.
138142
pub fn build_partition_bucket(
139143
pipeline: &mut Pipeline,
140144
params: Arc<AggregatorParams>,
141145
max_restore_worker: u64,
142146
after_worker: usize,
143-
experiment_aggregate_final: bool,
144147
ctx: Arc<QueryContext>,
145148
) -> Result<()> {
146-
if experiment_aggregate_final {
149+
if params.enable_experiment_aggregate {
147150
build_partition_bucket_experimental(pipeline, params, after_worker, ctx)
148151
} else {
149152
build_partition_bucket_legacy(pipeline, params, max_restore_worker, after_worker)

src/query/service/src/pipelines/processors/transforms/aggregator/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ mod aggregate_exchange_injector;
1616
mod aggregate_meta;
1717
mod aggregator_params;
1818
mod build_partition_bucket;
19-
mod new_final_aggregate;
19+
mod new_aggregate;
2020
mod serde;
2121
mod transform_aggregate_expand;
2222
mod transform_aggregate_final;
Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@
1313
// limitations under the License.
1414

1515
mod datablock_splitter;
16-
mod final_aggregate_spiller;
16+
mod new_aggregate_spiller;
17+
mod new_final_aggregate_state;
1718
mod new_transform_final_aggregate;
1819
mod transform_partition_bucket_scatter;
1920

2021
pub use datablock_splitter::split_partitioned_meta_into_datablocks;
21-
pub use final_aggregate_spiller::FinalAggregateSpiller;
22-
pub use new_transform_final_aggregate::FinalAggregateSharedState;
22+
pub use new_aggregate_spiller::NewAggregateSpiller;
23+
pub use new_final_aggregate_state::FinalAggregateSharedState;
2324
pub use new_transform_final_aggregate::NewFinalAggregateTransform;
2425
pub use transform_partition_bucket_scatter::TransformPartitionBucketScatter;

0 commit comments

Comments
 (0)