Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
13b266e
feat: propagate runtime filters through equivalence classes
SkyFan2002 Oct 26, 2025
775740e
make lint
SkyFan2002 Oct 26, 2025
6b115af
refactor: improve profile of runtime filter
SkyFan2002 Oct 27, 2025
a2203f0
feat: scan wait for runtime filter build ready
SkyFan2002 Oct 27, 2025
b548edc
fix
SkyFan2002 Oct 28, 2025
29f3f65
remove builder.contain_sink_processor
SkyFan2002 Oct 28, 2025
8972202
fix
SkyFan2002 Oct 28, 2025
733275e
fix
SkyFan2002 Oct 28, 2025
6366c16
fix
SkyFan2002 Oct 28, 2025
04c6ce3
fix
SkyFan2002 Oct 28, 2025
a8bd6cb
add TransformRuntimeFilterWait
SkyFan2002 Oct 30, 2025
b74baf6
add log
SkyFan2002 Oct 30, 2025
ecd3d81
add log
SkyFan2002 Oct 30, 2025
71dd1bc
fix
SkyFan2002 Oct 31, 2025
5858f47
enable bloom
SkyFan2002 Nov 2, 2025
5038e07
determine at runtime whether to apply the bloom runtime filter
SkyFan2002 Nov 2, 2025
0c0edbb
fix
SkyFan2002 Nov 3, 2025
2f88feb
fix
SkyFan2002 Nov 3, 2025
7ad9b19
Merge branch 'main' into improve_runtime_filter
SkyFan2002 Nov 3, 2025
226ead5
fix
SkyFan2002 Nov 3, 2025
fba2866
rm log
SkyFan2002 Nov 4, 2025
c0f49fa
improve
SkyFan2002 Nov 4, 2025
2efc816
fix
SkyFan2002 Nov 4, 2025
86944c6
update test
SkyFan2002 Nov 5, 2025
1ed7d52
update test
SkyFan2002 Nov 5, 2025
143237f
update test
SkyFan2002 Nov 5, 2025
ce4cad3
update test
SkyFan2002 Nov 5, 2025
09f6a49
update test
SkyFan2002 Nov 5, 2025
baa7a92
Merge upstream/main into improve_runtime_filter
SkyFan2002 Nov 5, 2025
c4bf687
update test
SkyFan2002 Nov 5, 2025
17ab76f
update test
SkyFan2002 Nov 5, 2025
7330930
update test
SkyFan2002 Nov 5, 2025
9f846ef
update test
SkyFan2002 Nov 5, 2025
dab4374
fix comment and rm unused code
SkyFan2002 Nov 5, 2025
4b633c6
fix
SkyFan2002 Nov 5, 2025
ccec703
Merge branch 'main' into improve_runtime_filter
zhang2014 Nov 6, 2025
f72b86d
try fix
SkyFan2002 Nov 7, 2025
c4abb96
Merge branch 'improve_runtime_filter' of https://github.com/SkyFan200…
SkyFan2002 Nov 7, 2025
c8016ff
Merge remote-tracking branch 'upstream/main' into improve_runtime_filter
SkyFan2002 Nov 7, 2025
c2cf429
fix merge
SkyFan2002 Nov 7, 2025
e8013f2
Merge remote-tracking branch 'upstream/main' into improve_runtime_filter
SkyFan2002 Nov 9, 2025
952b44c
fix merge
SkyFan2002 Nov 9, 2025
2418d81
fix
SkyFan2002 Nov 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 87 additions & 18 deletions src/query/catalog/src/runtime_filter_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use databend_common_base::base::tokio::sync::watch;
use databend_common_base::base::tokio::sync::watch::Receiver;
Expand All @@ -23,42 +26,108 @@ use xorf::BinaryFuse16;

#[derive(Clone, Default)]
pub struct RuntimeFilterInfo {
pub inlist: Vec<Expr<String>>,
pub min_max: Vec<Expr<String>>,
pub bloom: Vec<(String, BinaryFuse16)>,
pub filters: Vec<RuntimeFilterEntry>,
}

impl Debug for RuntimeFilterInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RuntimeFilterInfo {{ inlist: {}, min_max: {}, bloom: {:?} }}",
self.inlist
"RuntimeFilterInfo {{ filters: [{}] }}",
self.filters
.iter()
.map(|e| e.sql_display())
.map(|entry| format!("#{}(probe:{})", entry.id, entry.probe_expr.sql_display()))
.collect::<Vec<String>>()
.join(","),
self.min_max
.iter()
.map(|e| e.sql_display())
.collect::<Vec<String>>()
.join(","),
self.bloom
.iter()
.map(|(name, _)| name)
.collect::<Vec<&String>>()
.join(",")
)
}
}

impl RuntimeFilterInfo {
pub fn is_empty(&self) -> bool {
self.inlist.is_empty() && self.bloom.is_empty() && self.min_max.is_empty()
self.filters.is_empty()
}

pub fn is_blooms_empty(&self) -> bool {
self.bloom.is_empty()
self.filters.iter().all(|entry| entry.bloom.is_none())
}
}

#[derive(Clone)]
pub struct RuntimeFilterEntry {
pub id: usize,
pub probe_expr: Expr<String>,
pub bloom: Option<RuntimeFilterBloom>,
pub inlist: Option<Expr<String>>,
pub min_max: Option<Expr<String>>,
pub stats: Arc<RuntimeFilterStats>,
pub build_rows: usize,
pub build_table_rows: Option<u64>,
pub enabled: bool,
}

#[derive(Clone)]
pub struct RuntimeFilterBloom {
pub column_name: String,
pub filter: BinaryFuse16,
}

#[derive(Default)]
pub struct RuntimeFilterStats {
bloom_time_ns: AtomicU64,
bloom_rows_filtered: AtomicU64,
inlist_min_max_time_ns: AtomicU64,
min_max_rows_filtered: AtomicU64,
min_max_partitions_pruned: AtomicU64,
}

impl RuntimeFilterStats {
pub fn new() -> Self {
Self::default()
}

pub fn record_bloom(&self, time_ns: u64, rows_filtered: u64) {
self.bloom_time_ns.fetch_add(time_ns, Ordering::Relaxed);
self.bloom_rows_filtered
.fetch_add(rows_filtered, Ordering::Relaxed);
}

pub fn record_inlist_min_max(&self, time_ns: u64, rows_filtered: u64, partitions_pruned: u64) {
self.inlist_min_max_time_ns
.fetch_add(time_ns, Ordering::Relaxed);
self.min_max_rows_filtered
.fetch_add(rows_filtered, Ordering::Relaxed);
self.min_max_partitions_pruned
.fetch_add(partitions_pruned, Ordering::Relaxed);
}

pub fn snapshot(&self) -> RuntimeFilterStatsSnapshot {
RuntimeFilterStatsSnapshot {
bloom_time_ns: self.bloom_time_ns.load(Ordering::Relaxed),
bloom_rows_filtered: self.bloom_rows_filtered.load(Ordering::Relaxed),
inlist_min_max_time_ns: self.inlist_min_max_time_ns.load(Ordering::Relaxed),
min_max_rows_filtered: self.min_max_rows_filtered.load(Ordering::Relaxed),
min_max_partitions_pruned: self.min_max_partitions_pruned.load(Ordering::Relaxed),
}
}
}

#[derive(Default, Clone, Debug)]
pub struct RuntimeFilterStatsSnapshot {
pub bloom_time_ns: u64,
pub bloom_rows_filtered: u64,
pub inlist_min_max_time_ns: u64,
pub min_max_rows_filtered: u64,
pub min_max_partitions_pruned: u64,
}

#[derive(Clone, Debug)]
pub struct RuntimeFilterReport {
pub filter_id: usize,
pub has_bloom: bool,
pub has_inlist: bool,
pub has_min_max: bool,
pub stats: RuntimeFilterStatsSnapshot,
}

pub struct RuntimeFilterReady {
Expand Down
13 changes: 9 additions & 4 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ use crate::plan::PartInfoPtr;
use crate::plan::PartStatistics;
use crate::plan::Partitions;
use crate::query_kind::QueryKind;
use crate::runtime_filter_info::RuntimeFilterEntry;
use crate::runtime_filter_info::RuntimeFilterInfo;
use crate::runtime_filter_info::RuntimeFilterReady;
use crate::runtime_filter_info::RuntimeFilterReport;
use crate::session_type::SessionType;
use crate::statistics::data_cache_statistics::DataCacheMetrics;
use crate::table::Table;
Expand Down Expand Up @@ -357,22 +359,25 @@ pub trait TableContext: Send + Sync {

fn get_runtime_filter_ready(&self, table_index: usize) -> Vec<Arc<RuntimeFilterReady>>;

fn set_wait_runtime_filter(&self, table_index: usize, need_to_wait: bool);

fn get_wait_runtime_filter(&self, table_index: usize) -> bool;

fn clear_runtime_filter(&self);
fn assert_no_runtime_filter_state(&self) -> Result<()> {
unimplemented!()
}

fn set_merge_into_join(&self, join: MergeIntoJoin);

fn get_merge_into_join(&self) -> MergeIntoJoin;

fn get_runtime_filters(&self, id: usize) -> Vec<RuntimeFilterEntry>;

fn get_bloom_runtime_filter_with_id(&self, id: usize) -> Vec<(String, BinaryFuse16)>;

fn get_inlist_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;

fn get_min_max_runtime_filter_with_id(&self, id: usize) -> Vec<Expr<String>>;

fn runtime_filter_reports(&self) -> HashMap<usize, Vec<RuntimeFilterReport>>;

fn has_bloom_runtime_filters(&self, id: usize) -> bool;
fn txn_mgr(&self) -> TxnManagerRef;
fn get_table_meta_timestamps(
Expand Down
15 changes: 13 additions & 2 deletions src/query/service/src/interpreters/interpreter_explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ impl Interpreter for ExplainInterpreter {
profs: HashMap::new(),
metadata: &metadata,
scan_id_to_runtime_filters: HashMap::new(),
runtime_filter_reports: HashMap::new(),
};

let formatter = plan.formatter()?;
Expand Down Expand Up @@ -483,13 +484,16 @@ impl ExplainInterpreter {
plan.set_pruning_stats(&mut pruned_partitions_stats);
}

let runtime_filter_reports = self.ctx.runtime_filter_reports();

let result = match self.partial {
true => {
let metadata = metadata.read();
let mut context = FormatContext {
profs: query_profiles.clone(),
metadata: &metadata,
scan_id_to_runtime_filters: HashMap::new(),
runtime_filter_reports: runtime_filter_reports.clone(),
};

let formatter = plan.formatter()?;
Expand All @@ -498,8 +502,15 @@ impl ExplainInterpreter {
}
false => {
let metadata = metadata.read();
plan.format(&metadata, query_profiles.clone())?
.format_pretty()?
let mut context = FormatContext {
profs: query_profiles.clone(),
metadata: &metadata,
scan_id_to_runtime_filters: HashMap::new(),
runtime_filter_reports: runtime_filter_reports.clone(),
};
let formatter = plan.formatter()?;
let format_node = formatter.format(&mut context)?;
format_node.format_pretty()?
}
};

Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/physical_plans/format/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use databend_common_ast::ast::FormatTreeNode;
use databend_common_base::base::format_byte_size;
use databend_common_base::runtime::profile::get_statistics_desc;
use databend_common_catalog::plan::PartStatistics;
use databend_common_catalog::runtime_filter_info::RuntimeFilterReport;
use databend_common_expression::DataSchemaRef;
use databend_common_sql::executor::physical_plans::AggregateFunctionDesc;
use databend_common_sql::IndexType;
Expand All @@ -32,6 +33,7 @@ pub struct FormatContext<'a> {
pub metadata: &'a Metadata,
pub profs: HashMap<u32, PlanProfile>,
pub scan_id_to_runtime_filters: HashMap<IndexType, Vec<PhysicalRuntimeFilter>>,
pub runtime_filter_reports: HashMap<IndexType, Vec<RuntimeFilterReport>>,
}

pub fn pretty_display_agg_desc(desc: &AggregateFunctionDesc, metadata: &Metadata) -> String {
Expand Down
29 changes: 23 additions & 6 deletions src/query/service/src/physical_plans/format/format_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ impl<'a> PhysicalFormat for HashJoinFormatter<'a> {

#[recursive::recursive]
fn format(&self, ctx: &mut FormatContext<'_>) -> Result<FormatTreeNode<String>> {
// Register runtime filters for all probe targets
for rf in self.inner.runtime_filter.filters.iter() {
ctx.scan_id_to_runtime_filters
.entry(rf.scan_id)
.or_default()
.push(rf.clone());
for (_probe_key, scan_id) in &rf.probe_targets {
ctx.scan_id_to_runtime_filters
.entry(*scan_id)
.or_default()
.push(rf.clone());
}
}

let build_keys = self
Expand Down Expand Up @@ -83,11 +86,25 @@ impl<'a> PhysicalFormat for HashJoinFormatter<'a> {

let mut build_runtime_filters = vec![];
for rf in self.inner.runtime_filter.filters.iter() {
// Format all probe targets
let probe_targets_str = rf
.probe_targets
.iter()
.map(|(probe_key, scan_id)| {
format!(
"{}@scan{}",
probe_key.as_expr(&BUILTIN_FUNCTIONS).sql_display(),
scan_id
)
})
.collect::<Vec<_>>()
.join(", ");

let mut s = format!(
"filter id:{}, build key:{}, probe key:{}, filter type:",
"filter id:{}, build key:{}, probe targets:[{}], filter type:",
rf.id,
rf.build_key.as_expr(&BUILTIN_FUNCTIONS).sql_display(),
rf.probe_key.as_expr(&BUILTIN_FUNCTIONS).sql_display(),
probe_targets_str,
);
if rf.enable_bloom_runtime_filter {
s += "bloom,";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl<'a> PhysicalFormat for TableScanFormatter<'a> {

let mut children = vec![
FormatTreeNode::new(format!("table: {table_name}")),
FormatTreeNode::new(format!("scan id: {}", self.inner.scan_id)),
FormatTreeNode::new(format!(
"output columns: [{}]",
format_output_columns(self.inner.output_schema()?, ctx.metadata, false)
Expand Down
7 changes: 3 additions & 4 deletions src/query/service/src/physical_plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ mod physical_expression_scan;
mod physical_filter;
mod physical_hash_join;
mod physical_join;
mod physical_join_filter;
mod physical_limit;
mod physical_multi_table_insert;
mod physical_mutation;
Expand All @@ -59,6 +58,7 @@ mod physical_udf;
mod physical_union_all;
mod physical_window;
mod physical_window_partition;
mod runtime_filter;

pub use physical_add_stream_column::AddStreamColumn;
pub use physical_aggregate_expand::AggregateExpand;
Expand All @@ -82,9 +82,6 @@ pub use physical_exchange_sink::ExchangeSink;
pub use physical_exchange_source::ExchangeSource;
pub use physical_filter::Filter;
pub use physical_hash_join::HashJoin;
pub use physical_join_filter::JoinRuntimeFilter;
pub use physical_join_filter::PhysicalRuntimeFilter;
pub use physical_join_filter::PhysicalRuntimeFilters;
pub use physical_limit::Limit;
pub use physical_materialized_cte::*;
pub use physical_multi_table_insert::*;
Expand All @@ -110,6 +107,8 @@ pub use physical_udf::UdfFunctionDesc;
pub use physical_union_all::UnionAll;
pub use physical_window::*;
pub use physical_window_partition::*;
pub use runtime_filter::PhysicalRuntimeFilter;
pub use runtime_filter::PhysicalRuntimeFilters;

pub mod explain;
mod format;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ impl IPhysicalPlan for AggregatePartial {
}

fn build_pipeline2(&self, builder: &mut PipelineBuilder) -> Result<()> {
builder.contain_sink_processor = true;
self.input.build_pipeline(builder)?;

let max_block_size = builder.settings.get_max_block_size()?;
Expand Down
Loading
Loading