Skip to content

Commit 0c7cda2

Browse files
authored
feat: Support EXCEPT (DISTINCT / ALL) (#280)
* feat: Support EXCEPT (DISTINCT / ALL) * Optimize operators * fmt
1 parent b710db4 commit 0c7cda2

File tree

12 files changed

+243
-69
lines changed

12 files changed

+243
-69
lines changed

src/binder/select.rs

Lines changed: 74 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::execution::dql::join::joins_nullable;
2424
use crate::expression::agg::AggKind;
2525
use crate::expression::{AliasType, BinaryOperator};
2626
use crate::planner::operator::aggregate::AggregateOperator;
27+
use crate::planner::operator::except::ExceptOperator;
2728
use crate::planner::operator::function_scan::FunctionScanOperator;
2829
use crate::planner::operator::insert::InsertOperator;
2930
use crate::planner::operator::join::JoinCondition;
@@ -181,56 +182,83 @@ impl<'a: 'b, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'
181182
}
182183
true
183184
};
184-
match (op, is_all) {
185-
(SetOperator::Union, true) => {
186-
let left_schema = left_plan.output_schema();
187-
let right_schema = right_plan.output_schema();
188-
189-
if !fn_eq(left_schema, right_schema) {
190-
return Err(DatabaseError::MisMatch(
191-
"the output types on the left",
192-
"the output types on the right",
193-
));
185+
186+
let left_schema = left_plan.output_schema();
187+
let right_schema = right_plan.output_schema();
188+
189+
if !fn_eq(left_schema, right_schema) {
190+
return Err(DatabaseError::MisMatch(
191+
"the output types on the left",
192+
"the output types on the right",
193+
));
194+
}
195+
196+
match op {
197+
SetOperator::Union => {
198+
if is_all {
199+
Ok(UnionOperator::build(
200+
left_schema.clone(),
201+
right_schema.clone(),
202+
left_plan,
203+
right_plan,
204+
))
205+
} else {
206+
let distinct_exprs = left_schema
207+
.iter()
208+
.cloned()
209+
.map(ScalarExpression::ColumnRef)
210+
.collect_vec();
211+
212+
let union_op = Operator::Union(UnionOperator {
213+
left_schema_ref: left_schema.clone(),
214+
_right_schema_ref: right_schema.clone(),
215+
});
216+
217+
Ok(self.bind_distinct(
218+
LogicalPlan::new(
219+
union_op,
220+
Childrens::Twins {
221+
left: left_plan,
222+
right: right_plan,
223+
},
224+
),
225+
distinct_exprs,
226+
))
194227
}
195-
Ok(UnionOperator::build(
196-
left_schema.clone(),
197-
right_schema.clone(),
198-
left_plan,
199-
right_plan,
200-
))
201228
}
202-
(SetOperator::Union, false) => {
203-
let left_schema = left_plan.output_schema();
204-
let right_schema = right_plan.output_schema();
205-
206-
if !fn_eq(left_schema, right_schema) {
207-
return Err(DatabaseError::MisMatch(
208-
"the output types on the left",
209-
"the output types on the right",
210-
));
229+
SetOperator::Except => {
230+
if is_all {
231+
Ok(ExceptOperator::build(
232+
left_schema.clone(),
233+
right_schema.clone(),
234+
left_plan,
235+
right_plan,
236+
))
237+
} else {
238+
let distinct_exprs = left_schema
239+
.iter()
240+
.cloned()
241+
.map(ScalarExpression::ColumnRef)
242+
.collect_vec();
243+
244+
let except_op = Operator::Except(ExceptOperator {
245+
left_schema_ref: left_schema.clone(),
246+
_right_schema_ref: right_schema.clone(),
247+
});
248+
249+
Ok(self.bind_distinct(
250+
LogicalPlan::new(
251+
except_op,
252+
Childrens::Twins {
253+
left: left_plan,
254+
right: right_plan,
255+
},
256+
),
257+
distinct_exprs,
258+
))
211259
}
212-
let union_op = Operator::Union(UnionOperator {
213-
left_schema_ref: left_schema.clone(),
214-
_right_schema_ref: right_schema.clone(),
215-
});
216-
let distinct_exprs = left_schema
217-
.iter()
218-
.cloned()
219-
.map(ScalarExpression::ColumnRef)
220-
.collect_vec();
221-
222-
Ok(self.bind_distinct(
223-
LogicalPlan::new(
224-
union_op,
225-
Childrens::Twins {
226-
left: left_plan,
227-
right: right_plan,
228-
},
229-
),
230-
distinct_exprs,
231-
))
232260
}
233-
(set_operator, _) => Err(DatabaseError::UnsupportedStmt(format!(
261+
set_operator => Err(DatabaseError::UnsupportedStmt(format!(
234262
"set operator: {:?}",
235263
set_operator
236264
))),

src/execution/dql/except.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use crate::execution::{build_read, Executor, ReadExecutor};
2+
use crate::planner::LogicalPlan;
3+
use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache};
4+
use crate::throw;
5+
use ahash::{HashSet, HashSetExt};
6+
use std::ops::Coroutine;
7+
use std::ops::CoroutineState;
8+
use std::pin::Pin;
9+
10+
pub struct Except {
11+
left_input: LogicalPlan,
12+
right_input: LogicalPlan,
13+
}
14+
15+
impl From<(LogicalPlan, LogicalPlan)> for Except {
16+
fn from((left_input, right_input): (LogicalPlan, LogicalPlan)) -> Self {
17+
Except {
18+
left_input,
19+
right_input,
20+
}
21+
}
22+
}
23+
24+
impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Except {
25+
fn execute(
26+
self,
27+
cache: (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache),
28+
transaction: *mut T,
29+
) -> Executor<'a> {
30+
Box::new(
31+
#[coroutine]
32+
move || {
33+
let Except {
34+
left_input,
35+
right_input,
36+
} = self;
37+
38+
let mut coroutine = build_read(right_input, cache, transaction);
39+
40+
let mut except_col = HashSet::new();
41+
42+
while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
43+
let tuple = throw!(tuple);
44+
except_col.insert(tuple);
45+
}
46+
47+
let mut coroutine = build_read(left_input, cache, transaction);
48+
49+
while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) {
50+
let tuple = throw!(tuple);
51+
if !except_col.contains(&tuple) {
52+
yield Ok(tuple);
53+
}
54+
}
55+
},
56+
)
57+
}
58+
}

src/execution/dql/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub(crate) mod aggregate;
22
pub(crate) mod describe;
33
pub(crate) mod dummy;
4+
pub(crate) mod except;
45
pub(crate) mod explain;
56
pub(crate) mod filter;
67
pub(crate) mod function_scan;

src/execution/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::execution::dql::aggregate::hash_agg::HashAggExecutor;
2424
use crate::execution::dql::aggregate::simple_agg::SimpleAggExecutor;
2525
use crate::execution::dql::describe::Describe;
2626
use crate::execution::dql::dummy::Dummy;
27+
use crate::execution::dql::except::Except;
2728
use crate::execution::dql::explain::Explain;
2829
use crate::execution::dql::filter::Filter;
2930
use crate::execution::dql::function_scan::FunctionScan;
@@ -146,6 +147,11 @@ pub fn build_read<'a, T: Transaction + 'a>(
146147

147148
Union::from((left_input, right_input)).execute(cache, transaction)
148149
}
150+
Operator::Except(_) => {
151+
let (left_input, right_input) = childrens.pop_twins();
152+
153+
Except::from((left_input, right_input)).execute(cache, transaction)
154+
}
149155
_ => unreachable!(),
150156
}
151157
}

src/optimizer/rule/normalization/column_pruning.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ impl ColumnPruning {
110110
| Operator::Limit(_)
111111
| Operator::Join(_)
112112
| Operator::Filter(_)
113-
| Operator::Union(_) => {
113+
| Operator::Union(_)
114+
| Operator::Except(_) => {
114115
let temp_columns = operator.referenced_columns(false);
115116
// why?
116117
let mut column_references = column_references;

src/optimizer/rule/normalization/compilation_in_advance.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ impl ExpressionRemapper {
108108
| Operator::Truncate(_)
109109
| Operator::CopyFromFile(_)
110110
| Operator::CopyToFile(_)
111-
| Operator::Union(_) => (),
111+
| Operator::Union(_)
112+
| Operator::Except(_) => (),
112113
}
113114
if let Some(exprs) = operator.output_exprs() {
114115
*output_exprs = exprs;
@@ -217,7 +218,8 @@ impl EvaluatorBind {
217218
| Operator::Truncate(_)
218219
| Operator::CopyFromFile(_)
219220
| Operator::CopyToFile(_)
220-
| Operator::Union(_) => (),
221+
| Operator::Union(_)
222+
| Operator::Except(_) => (),
221223
}
222224

223225
Ok(())

src/planner/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod operator;
22

33
use crate::catalog::{ColumnCatalog, ColumnRef, TableName};
4+
use crate::planner::operator::except::ExceptOperator;
45
use crate::planner::operator::join::JoinType;
56
use crate::planner::operator::union::UnionOperator;
67
use crate::planner::operator::values::ValuesOperator;
@@ -169,6 +170,10 @@ impl LogicalPlan {
169170
| Operator::Union(UnionOperator {
170171
left_schema_ref: schema_ref,
171172
..
173+
})
174+
| Operator::Except(ExceptOperator {
175+
left_schema_ref: schema_ref,
176+
..
172177
}) => SchemaOutput::SchemaRef(schema_ref.clone()),
173178
Operator::Dummy => SchemaOutput::Schema(vec![]),
174179
Operator::ShowTable => SchemaOutput::Schema(vec![ColumnRef::from(

src/planner/operator/except.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use crate::planner::operator::Operator;
2+
use crate::planner::{Childrens, LogicalPlan};
3+
use crate::types::tuple::SchemaRef;
4+
use itertools::Itertools;
5+
use kite_sql_serde_macros::ReferenceSerialization;
6+
use std::fmt;
7+
use std::fmt::Formatter;
8+
9+
#[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)]
10+
pub struct ExceptOperator {
11+
pub left_schema_ref: SchemaRef,
12+
// mainly use `left_schema` as output and `right_schema` for `column pruning`
13+
pub _right_schema_ref: SchemaRef,
14+
}
15+
16+
impl ExceptOperator {
17+
pub fn build(
18+
left_schema_ref: SchemaRef,
19+
right_schema_ref: SchemaRef,
20+
left_plan: LogicalPlan,
21+
right_plan: LogicalPlan,
22+
) -> LogicalPlan {
23+
LogicalPlan::new(
24+
Operator::Except(ExceptOperator {
25+
left_schema_ref,
26+
_right_schema_ref: right_schema_ref,
27+
}),
28+
Childrens::Twins {
29+
left: left_plan,
30+
right: right_plan,
31+
},
32+
)
33+
}
34+
}
35+
36+
impl fmt::Display for ExceptOperator {
37+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
38+
let schema = self
39+
.left_schema_ref
40+
.iter()
41+
.map(|column| column.name().to_string())
42+
.join(", ");
43+
44+
write!(f, "Except: [{}]", schema)?;
45+
46+
Ok(())
47+
}
48+
}

src/planner/operator/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod describe;
1111
pub mod drop_index;
1212
pub mod drop_table;
1313
pub mod drop_view;
14+
pub mod except;
1415
pub mod filter;
1516
pub mod function_scan;
1617
pub mod insert;
@@ -43,6 +44,7 @@ use crate::planner::operator::describe::DescribeOperator;
4344
use crate::planner::operator::drop_index::DropIndexOperator;
4445
use crate::planner::operator::drop_table::DropTableOperator;
4546
use crate::planner::operator::drop_view::DropViewOperator;
47+
use crate::planner::operator::except::ExceptOperator;
4648
use crate::planner::operator::function_scan::FunctionScanOperator;
4749
use crate::planner::operator::insert::InsertOperator;
4850
use crate::planner::operator::join::JoinCondition;
@@ -73,6 +75,7 @@ pub enum Operator {
7375
ShowView,
7476
Explain,
7577
Describe(DescribeOperator),
78+
Except(ExceptOperator),
7679
Union(UnionOperator),
7780
// DML
7881
Insert(InsertOperator),
@@ -147,6 +150,10 @@ impl Operator {
147150
| Operator::Union(UnionOperator {
148151
left_schema_ref: schema_ref,
149152
..
153+
})
154+
| Operator::Except(ExceptOperator {
155+
left_schema_ref: schema_ref,
156+
..
150157
}) => Some(
151158
schema_ref
152159
.iter()
@@ -230,6 +237,10 @@ impl Operator {
230237
Operator::Union(UnionOperator {
231238
left_schema_ref,
232239
_right_schema_ref,
240+
})
241+
| Operator::Except(ExceptOperator {
242+
left_schema_ref,
243+
_right_schema_ref,
233244
}) => left_schema_ref
234245
.iter()
235246
.chain(_right_schema_ref.iter())
@@ -293,6 +304,7 @@ impl fmt::Display for Operator {
293304
Operator::CopyFromFile(op) => write!(f, "{}", op),
294305
Operator::CopyToFile(op) => write!(f, "{}", op),
295306
Operator::Union(op) => write!(f, "{}", op),
307+
Operator::Except(op) => write!(f, "{}", op),
296308
}
297309
}
298310
}

src/types/tuple.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub fn types(schema: &Schema) -> Vec<LogicalType> {
2323
.collect_vec()
2424
}
2525

26-
#[derive(Clone, Debug, PartialEq)]
26+
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
2727
pub struct Tuple {
2828
pub pk: Option<TupleId>,
2929
pub values: Vec<DataValue>,

0 commit comments

Comments
 (0)