Skip to content

Commit a8bd6cb

Browse files
committed
add TransformRuntimeFilterWait
1 parent 04c6ce3 commit a8bd6cb

File tree

5 files changed

+173
-104
lines changed

5 files changed

+173
-104
lines changed

src/query/storages/fuse/src/operations/read/fuse_source.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ use crate::operations::read::native_data_transform_reader::ReadNativeDataTransfo
4242
use crate::operations::read::parquet_data_transform_reader::ReadParquetDataTransform;
4343
use crate::operations::read::DeserializeDataTransform;
4444
use crate::operations::read::NativeDeserializeDataTransform;
45+
use crate::operations::read::TransformRuntimeFilterWait;
4546

4647
#[allow(clippy::too_many_arguments)]
4748
pub fn build_fuse_native_source_pipeline(
@@ -59,6 +60,8 @@ pub fn build_fuse_native_source_pipeline(
5960
(max_threads, max_io_requests) =
6061
adjust_threads_and_request(true, max_threads, max_io_requests, plan);
6162

63+
let need_runtime_filter_wait = !ctx.get_runtime_filter_ready(plan.scan_id).is_empty();
64+
6265
if topk.is_some() {
6366
max_threads = max_threads.min(16);
6467
max_io_requests = max_io_requests.min(16);
@@ -82,6 +85,16 @@ pub fn build_fuse_native_source_pipeline(
8285
pipeline.add_pipe(pipe);
8386
}
8487
}
88+
if need_runtime_filter_wait {
89+
pipeline.add_transform(|input, output| {
90+
Ok(TransformRuntimeFilterWait::create(
91+
ctx.clone(),
92+
plan.scan_id,
93+
input,
94+
output,
95+
))
96+
})?;
97+
}
8598
pipeline.add_transform(|input, output| {
8699
ReadNativeDataTransform::<true>::create(
87100
plan.scan_id,
@@ -118,6 +131,17 @@ pub fn build_fuse_native_source_pipeline(
118131
}
119132
}
120133

134+
if need_runtime_filter_wait {
135+
pipeline.add_transform(|input, output| {
136+
Ok(TransformRuntimeFilterWait::create(
137+
ctx.clone(),
138+
plan.scan_id,
139+
input,
140+
output,
141+
))
142+
})?;
143+
}
144+
121145
pipeline.add_transform(|input, output| {
122146
ReadNativeDataTransform::<false>::create(
123147
plan.scan_id,
@@ -171,6 +195,7 @@ pub fn build_fuse_parquet_source_pipeline(
171195
blocks_total: AtomicU64::new(0),
172196
blocks_pruned: AtomicU64::new(0),
173197
});
198+
let need_runtime_filter_wait = !ctx.get_runtime_filter_ready(plan.scan_id).is_empty();
174199

175200
match block_reader.support_blocking_api() {
176201
true => {
@@ -190,6 +215,17 @@ pub fn build_fuse_parquet_source_pipeline(
190215
let unfinished_processors_count =
191216
Arc::new(AtomicU64::new(pipeline.output_len() as u64));
192217

218+
if need_runtime_filter_wait {
219+
pipeline.add_transform(|input, output| {
220+
Ok(TransformRuntimeFilterWait::create(
221+
ctx.clone(),
222+
plan.scan_id,
223+
input,
224+
output,
225+
))
226+
})?;
227+
}
228+
193229
pipeline.add_transform(|input, output| {
194230
ReadParquetDataTransform::<true>::create(
195231
plan.scan_id,
@@ -233,6 +269,17 @@ pub fn build_fuse_parquet_source_pipeline(
233269
let unfinished_processors_count =
234270
Arc::new(AtomicU64::new(pipeline.output_len() as u64));
235271

272+
if need_runtime_filter_wait {
273+
pipeline.add_transform(|input, output| {
274+
Ok(TransformRuntimeFilterWait::create(
275+
ctx.clone(),
276+
plan.scan_id,
277+
input,
278+
output,
279+
))
280+
})?;
281+
}
282+
236283
pipeline.add_transform(|input, output| {
237284
ReadParquetDataTransform::<false>::create(
238285
plan.table_index,

src/query/storages/fuse/src/operations/read/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod parquet_data_source;
2121
mod parquet_data_source_deserializer;
2222
mod parquet_data_transform_reader;
2323
mod parquet_rows_fetcher;
24+
mod runtime_filter_wait;
2425

2526
mod block_partition_meta;
2627
mod block_partition_receiver_source;
@@ -32,4 +33,5 @@ pub use fuse_rows_fetcher::row_fetch_processor;
3233
pub use fuse_source::build_fuse_parquet_source_pipeline;
3334
pub use native_data_source_deserializer::NativeDeserializeDataTransform;
3435
pub use parquet_data_source_deserializer::DeserializeDataTransform;
36+
pub use runtime_filter_wait::TransformRuntimeFilterWait;
3537
pub use util::need_reserve_block_info;

src/query/storages/fuse/src/operations/read/native_data_source_deserializer.rs

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,8 @@ use databend_common_catalog::plan::PartInfoPtr;
2828
use databend_common_catalog::plan::PushDownInfo;
2929
use databend_common_catalog::plan::TopK;
3030
use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry;
31-
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
3231
use databend_common_catalog::runtime_filter_info::RuntimeFilterStats;
3332
use databend_common_catalog::table_context::TableContext;
34-
use databend_common_exception::ErrorCode;
3533
use databend_common_exception::Result;
3634
use databend_common_expression::filter_helper::FilterHelpers;
3735
use databend_common_expression::types::BooleanType;
@@ -217,8 +215,6 @@ pub struct NativeDeserializeDataTransform {
217215
// Structures for the bloom runtime filter:
218216
ctx: Arc<dyn TableContext>,
219217
bloom_runtime_filter: Option<Vec<BloomRuntimeFilterRef>>,
220-
need_wait_runtime_filter: bool,
221-
runtime_filter_ready: Vec<Arc<RuntimeFilterReady>>,
222218

223219
// Structures for aggregating index:
224220
index_reader: Arc<Option<AggIndexReader>>,
@@ -311,9 +307,6 @@ impl NativeDeserializeDataTransform {
311307
let mut output_schema = plan.schema().as_ref().clone();
312308
output_schema.remove_internal_fields();
313309
let output_schema: DataSchema = (&output_schema).into();
314-
let need_wait_runtime_filter =
315-
!ctx.get_cluster().is_empty() && ctx.get_wait_runtime_filter(plan.scan_id);
316-
317310
Ok(ProcessorPtr::create(Box::new(
318311
NativeDeserializeDataTransform {
319312
ctx,
@@ -339,8 +332,6 @@ impl NativeDeserializeDataTransform {
339332
bloom_runtime_filter: None,
340333
read_state: ReadPartState::new(),
341334
need_reserve_block_info,
342-
need_wait_runtime_filter,
343-
runtime_filter_ready: Vec::new(),
344335
},
345336
)))
346337
}
@@ -781,20 +772,6 @@ impl NativeDeserializeDataTransform {
781772
}
782773
}
783774

784-
fn prepare_runtime_filter_wait(&mut self) -> bool {
785-
if !self.need_wait_runtime_filter {
786-
return false;
787-
}
788-
self.need_wait_runtime_filter = false;
789-
let runtime_filter_ready = self.ctx.get_runtime_filter_ready(self.scan_id);
790-
if !runtime_filter_ready.is_empty() {
791-
self.runtime_filter_ready = runtime_filter_ready;
792-
true
793-
} else {
794-
false
795-
}
796-
}
797-
798775
/// Pre-process the partition before reading it.
799776
fn pre_process_partition(&mut self) -> Result<()> {
800777
debug_assert!(!self.columns.is_empty());
@@ -880,10 +857,6 @@ impl Processor for NativeDeserializeDataTransform {
880857
}
881858

882859
fn event(&mut self) -> Result<Event> {
883-
if self.prepare_runtime_filter_wait() {
884-
return Ok(Event::Async);
885-
}
886-
887860
if self.output.is_finished() {
888861
self.input.finish();
889862
return Ok(Event::Finished);
@@ -931,33 +904,6 @@ impl Processor for NativeDeserializeDataTransform {
931904

932905
#[async_backtrace::framed]
933906
async fn async_process(&mut self) -> Result<()> {
934-
use std::time::Duration;
935-
936-
use databend_common_base::base::tokio::time::timeout;
937-
938-
let timeout_duration = Duration::from_secs(30);
939-
940-
for runtime_filter_ready in &self.runtime_filter_ready {
941-
let mut rx = runtime_filter_ready.runtime_filter_watcher.subscribe();
942-
if (*rx.borrow()).is_some() {
943-
continue;
944-
}
945-
946-
match timeout(timeout_duration, rx.changed()).await {
947-
Ok(Ok(())) => {}
948-
Ok(Err(_)) => {
949-
return Err(ErrorCode::TokioError("watcher's sender is dropped"));
950-
}
951-
Err(_) => {
952-
log::warn!(
953-
"Runtime filter wait timeout after {:?} for scan_id: {}",
954-
timeout_duration,
955-
self.scan_id
956-
);
957-
}
958-
}
959-
}
960-
self.runtime_filter_ready.clear();
961907
Ok(())
962908
}
963909

src/query/storages/fuse/src/operations/read/parquet_data_source_deserializer.rs

Lines changed: 0 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@ use databend_common_base::runtime::profile::ProfileStatisticsName;
2424
use databend_common_catalog::plan::DataSourcePlan;
2525
use databend_common_catalog::plan::PartInfoPtr;
2626
use databend_common_catalog::runtime_filter_info::RuntimeFilterEntry;
27-
use databend_common_catalog::runtime_filter_info::RuntimeFilterReady;
2827
use databend_common_catalog::runtime_filter_info::RuntimeFilterStats;
2928
use databend_common_catalog::table_context::TableContext;
30-
use databend_common_exception::ErrorCode;
3129
use databend_common_exception::Result;
3230
use databend_common_expression::types::Bitmap;
3331
use databend_common_expression::types::DataType;
@@ -44,7 +42,6 @@ use databend_common_pipeline_core::processors::InputPort;
4442
use databend_common_pipeline_core::processors::OutputPort;
4543
use databend_common_pipeline_core::processors::Processor;
4644
use databend_common_pipeline_core::processors::ProcessorPtr;
47-
use databend_common_sql::IndexType;
4845
use xorf::BinaryFuse16;
4946

5047
use super::parquet_data_source::ParquetDataSource;
@@ -77,8 +74,6 @@ pub struct DeserializeDataTransform {
7774
base_block_ids: Option<Scalar>,
7875
cached_runtime_filter: Option<Vec<BloomRuntimeFilterRef>>,
7976
need_reserve_block_info: bool,
80-
already_waited_runtime_filter: bool,
81-
runtime_filter_ready: Vec<Arc<RuntimeFilterReady>>,
8277
}
8378

8479
#[derive(Clone)]
@@ -136,8 +131,6 @@ impl DeserializeDataTransform {
136131
base_block_ids: plan.base_block_ids.clone(),
137132
cached_runtime_filter: None,
138133
need_reserve_block_info,
139-
already_waited_runtime_filter: false,
140-
runtime_filter_ready: ctx.get_runtime_filter_ready(plan.scan_id),
141134
})))
142135
}
143136

@@ -192,14 +185,6 @@ impl DeserializeDataTransform {
192185
Ok(None)
193186
}
194187
}
195-
196-
fn need_wait_runtime_filter(&mut self) -> bool {
197-
if self.already_waited_runtime_filter {
198-
return false;
199-
}
200-
self.already_waited_runtime_filter = true;
201-
!self.runtime_filter_ready.is_empty()
202-
}
203188
}
204189

205190
#[async_trait::async_trait]
@@ -213,10 +198,6 @@ impl Processor for DeserializeDataTransform {
213198
}
214199

215200
fn event(&mut self) -> Result<Event> {
216-
if self.need_wait_runtime_filter() {
217-
return Ok(Event::Async);
218-
}
219-
220201
if self.output.is_finished() {
221202
self.input.finish();
222203
return Ok(Event::Finished);
@@ -374,35 +355,4 @@ impl Processor for DeserializeDataTransform {
374355

375356
Ok(())
376357
}
377-
378-
#[async_backtrace::framed]
379-
async fn async_process(&mut self) -> Result<()> {
380-
use std::time::Duration;
381-
382-
use databend_common_base::base::tokio::time::timeout;
383-
384-
let timeout_duration = Duration::from_secs(30);
385-
386-
for runtime_filter_ready in &self.runtime_filter_ready {
387-
let mut rx = runtime_filter_ready.runtime_filter_watcher.subscribe();
388-
if (*rx.borrow()).is_some() {
389-
continue;
390-
}
391-
392-
match timeout(timeout_duration, rx.changed()).await {
393-
Ok(Ok(())) => {}
394-
Ok(Err(_)) => {
395-
return Err(ErrorCode::TokioError("watcher's sender is dropped"));
396-
}
397-
Err(_) => {
398-
log::warn!(
399-
"Runtime filter wait timeout after {:?} for scan_id: {}",
400-
timeout_duration,
401-
self.scan_id
402-
);
403-
}
404-
}
405-
}
406-
Ok(())
407-
}
408358
}

0 commit comments

Comments
 (0)