@@ -78,7 +78,7 @@ pub struct DeserializeDataTransform {
7878 base_block_ids : Option < Scalar > ,
7979 cached_runtime_filter : Option < Vec < BloomRuntimeFilterRef > > ,
8080 need_reserve_block_info : bool ,
81- need_wait_runtime_filter : bool ,
81+ already_waited_runtime_filter : bool ,
8282 runtime_filter_ready : Vec < Arc < RuntimeFilterReady > > ,
8383}
8484
@@ -102,7 +102,6 @@ impl DeserializeDataTransform {
102102 virtual_reader : Arc < Option < VirtualColumnReader > > ,
103103 ) -> Result < ProcessorPtr > {
104104 let scan_progress = ctx. get_scan_progress ( ) ;
105- let need_wait_runtime_filter = !ctx. get_cluster ( ) . is_empty ( ) ;
106105
107106 let mut src_schema: DataSchema = ( block_reader. schema ( ) . as_ref ( ) ) . into ( ) ;
108107 if let Some ( virtual_reader) = virtual_reader. as_ref ( ) {
@@ -122,7 +121,7 @@ impl DeserializeDataTransform {
122121 let output_schema: DataSchema = ( & output_schema) . into ( ) ;
123122 let ( need_reserve_block_info, _) = need_reserve_block_info ( ctx. clone ( ) , plan. table_index ) ;
124123 Ok ( ProcessorPtr :: create ( Box :: new ( DeserializeDataTransform {
125- ctx,
124+ ctx : ctx . clone ( ) ,
126125 table_index : plan. table_index ,
127126 scan_id : plan. scan_id ,
128127 scan_progress,
@@ -139,8 +138,8 @@ impl DeserializeDataTransform {
139138 base_block_ids : plan. base_block_ids . clone ( ) ,
140139 cached_runtime_filter : None ,
141140 need_reserve_block_info,
142- need_wait_runtime_filter ,
143- runtime_filter_ready : vec ! [ ] ,
141+ already_waited_runtime_filter : false ,
142+ runtime_filter_ready : ctx . get_runtime_filter_ready ( plan . scan_id ) ,
144143 } ) ) )
145144 }
146145
@@ -197,17 +196,11 @@ impl DeserializeDataTransform {
197196 }
198197
199198 fn need_wait_runtime_filter ( & mut self ) -> bool {
200- if ! self . need_wait_runtime_filter {
199+ if self . already_waited_runtime_filter {
201200 return false ;
202201 }
203- self . need_wait_runtime_filter = false ;
204- let runtime_filter_ready = self . ctx . get_runtime_filter_ready ( self . scan_id ) ;
205- if !runtime_filter_ready. is_empty ( ) {
206- self . runtime_filter_ready = runtime_filter_ready;
207- true
208- } else {
209- false
210- }
202+ self . already_waited_runtime_filter = true ;
203+ !self . runtime_filter_ready . is_empty ( )
211204 }
212205}
213206
@@ -308,24 +301,23 @@ impl Processor for DeserializeDataTransform {
308301 let origin_num_rows = data_block. num_rows ( ) ;
309302
310303 let mut filter = None ;
311- if self . ctx . has_bloom_runtime_filters ( self . table_index ) {
312- let start = Instant :: now ( ) ;
313- let rows_before = data_block. num_rows ( ) ;
314- if let Some ( bitmap) = self . runtime_filter ( data_block. clone ( ) ) ? {
315- data_block = data_block. filter_with_bitmap ( & bitmap) ?;
316- filter = Some ( bitmap) ;
317- let rows_after = data_block. num_rows ( ) ;
318- let bloom_duration = start. elapsed ( ) ;
304+ let bloom_start = Instant :: now ( ) ;
305+
306+ let rows_before = data_block. num_rows ( ) ;
307+ if let Some ( bitmap) = self . runtime_filter ( data_block. clone ( ) ) ? {
308+ data_block = data_block. filter_with_bitmap ( & bitmap) ?;
309+ filter = Some ( bitmap) ;
310+ let rows_after = data_block. num_rows ( ) ;
311+ let bloom_duration = bloom_start. elapsed ( ) ;
312+ Profile :: record_usize_profile (
313+ ProfileStatisticsName :: RuntimeFilterBloomTime ,
314+ bloom_duration. as_nanos ( ) as usize ,
315+ ) ;
316+ if rows_before > rows_after {
319317 Profile :: record_usize_profile (
320- ProfileStatisticsName :: RuntimeFilterBloomTime ,
321- bloom_duration . as_nanos ( ) as usize ,
318+ ProfileStatisticsName :: RuntimeFilterBloomRowsFiltered ,
319+ rows_before - rows_after ,
322320 ) ;
323- if rows_before > rows_after {
324- Profile :: record_usize_profile (
325- ProfileStatisticsName :: RuntimeFilterBloomRowsFiltered ,
326- rows_before - rows_after,
327- ) ;
328- }
329321 }
330322 }
331323
0 commit comments