From 9bacd9b1acd547cb507c9808e76f9d6a1359e205 Mon Sep 17 00:00:00 2001 From: elianddb Date: Mon, 8 Sep 2025 22:20:34 +0000 Subject: [PATCH 1/2] add fix dup row --- enginetest/queries/script_queries.go | 20 ++++++++++++++++++++ sql/analyzer/indexed_joins.go | 6 +++--- sql/analyzer/unnest_exists_subqueries.go | 1 + sql/plan/join.go | 2 +- sql/plan/subquery.go | 2 +- sql/rowexec/join_iters.go | 4 ++++ sql/rowexec/merge_join.go | 1 + sql/rowexec/rel.go | 2 +- 8 files changed, 32 insertions(+), 6 deletions(-) diff --git a/enginetest/queries/script_queries.go b/enginetest/queries/script_queries.go index 275c914271..2895f4630a 100644 --- a/enginetest/queries/script_queries.go +++ b/enginetest/queries/script_queries.go @@ -120,6 +120,26 @@ type ScriptTestAssertion struct { // Unlike other engine tests, ScriptTests must be self-contained. No other tables are created outside the definition of // the tests. var ScriptTests = []ScriptTest{ + { + // https://github.com/dolthub/dolt/issues/9797 + Name: "EXISTS subquery returns duplicate rows with PRIMARY KEY", + SetUpScript: []string{ + "CREATE TABLE t(c0 INT, c1 INT, PRIMARY KEY(c0, c1));", + "INSERT INTO t VALUES (1, 1);", + "INSERT INTO t VALUES (2, 2);", + "INSERT INTO t VALUES (2, 3);", + }, + Assertions: []ScriptTestAssertion{ + { + Query: "SELECT * FROM t WHERE EXISTS (SELECT 1 FROM t AS x WHERE x.c0 = t.c0);", + Expected: []sql.Row{ + {1, 1}, + {2, 2}, + {2, 3}, + }, + }, + }, + }, { // https://github.com/dolthub/dolt/issues/9794 Name: "UPDATE with TRIM function on TEXT column", diff --git a/sql/analyzer/indexed_joins.go b/sql/analyzer/indexed_joins.go index 5d4ade4fbb..f1c8e77af2 100644 --- a/sql/analyzer/indexed_joins.go +++ b/sql/analyzer/indexed_joins.go @@ -472,9 +472,9 @@ func convertSemiToInnerJoin(m *memo.Memo) error { } // join and its commute are a new group - joinGrp := m.MemoizeInnerJoin(nil, semi.Left, rightGrp, plan.JoinTypeInner, semi.Filter) + joinGrp := m.MemoizeInnerJoin(nil, semi.Left, rightGrp, plan.JoinTypeSemi, semi.Filter) // TODO: can't commute if right SubqueryAlias references outside scope (OuterScopeVisibility/IsLateral) - m.MemoizeInnerJoin(joinGrp, rightGrp, semi.Left, plan.JoinTypeInner, semi.Filter) + m.MemoizeInnerJoin(joinGrp, rightGrp, semi.Left, plan.JoinTypeSemi, semi.Filter) // project belongs to the original group leftCols := semi.Left.RelProps.OutputCols() @@ -1105,7 +1105,7 @@ func addMergeJoins(ctx *sql.Context, m *memo.Memo) error { } if rightIndexMatchesFilters(rIndex, join.Left.RelProps.FuncDeps().Constants(), matchedEqFilters) { jb := join.Copy() - if d, ok := jb.Left.First.(*memo.Distinct); ok && lIndex.SqlIdx().IsUnique() { + if d, ok := jb.Left.First.(*memo.Distinct); ok && lIndex.SqlIdx().IsUnique() { jb.Left = d.Child } if d, ok := jb.Right.First.(*memo.Distinct); ok && rIndex.SqlIdx().IsUnique() { diff --git a/sql/analyzer/unnest_exists_subqueries.go b/sql/analyzer/unnest_exists_subqueries.go index 166de18260..965934d83c 100644 --- a/sql/analyzer/unnest_exists_subqueries.go +++ b/sql/analyzer/unnest_exists_subqueries.go @@ -151,6 +151,7 @@ func unnestExistSubqueries(ctx *sql.Context, scope *plan.Scope, a *Analyzer, fil retFilters = append(retFilters, f) continue } + // recurse if s.inner != nil { diff --git a/sql/plan/join.go b/sql/plan/join.go index 8709b45cbe..a996007a21 100644 --- a/sql/plan/join.go +++ b/sql/plan/join.go @@ -177,7 +177,7 @@ func (i JoinType) IsPartial() bool { switch i { case JoinTypeSemi, JoinTypeAnti, JoinTypeAntiIncludeNulls, JoinTypeSemiHash, JoinTypeAntiHash, JoinTypeAntiHashIncludeNulls, JoinTypeAntiLookup, JoinTypeAntiLookupIncludeNulls, - JoinTypeSemiLookup: + JoinTypeSemiLookup, JoinTypeSemiMerge, JoinTypeAntiMerge, JoinTypeAntiMergeIncludeNulls: return true default: return false diff --git a/sql/plan/subquery.go b/sql/plan/subquery.go index 296ef0ed9f..4903b27e2e 100644 --- a/sql/plan/subquery.go +++ b/sql/plan/subquery.go @@ -18,7 +18,6 @@ import ( "fmt" "io" "sync" - "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/hash" "github.com/dolthub/go-mysql-server/sql/transform" @@ -431,6 +430,7 @@ func (s *Subquery) HashMultiple(ctx *sql.Context, row sql.Row) (sql.KeyValueCach // HasResultRow returns whether the subquery has a result set > 0. func (s *Subquery) HasResultRow(ctx *sql.Context, row sql.Row) (bool, error) { + // First check if the query was cached. s.cacheMu.Lock() cached := s.resultsCached diff --git a/sql/rowexec/join_iters.go b/sql/rowexec/join_iters.go index fbfcca13cc..c64494733e 100644 --- a/sql/rowexec/join_iters.go +++ b/sql/rowexec/join_iters.go @@ -367,6 +367,10 @@ func (i *existsIter) Next(ctx *sql.Context) (sql.Row, error) { nextState = esIncRight } case esRet: + if i.typ.IsSemi() { + // For semi-joins, after returning a match, move to next left row + nextState = esIncLeft + } return i.removeParentRow(i.primaryRow.Copy()), nil default: return nil, fmt.Errorf("invalid exists join state") diff --git a/sql/rowexec/merge_join.go b/sql/rowexec/merge_join.go index 1926697dda..0ba0fe1bdf 100644 --- a/sql/rowexec/merge_join.go +++ b/sql/rowexec/merge_join.go @@ -588,3 +588,4 @@ func (i *mergeJoinIter) Close(ctx *sql.Context) (err error) { return err } + diff --git a/sql/rowexec/rel.go b/sql/rowexec/rel.go index 8c60e6e023..6917d37e45 100644 --- a/sql/rowexec/rel.go +++ b/sql/rowexec/rel.go @@ -282,7 +282,7 @@ func (b *BaseBuilder) buildJoinNode(ctx *sql.Context, n *plan.JoinNode, row sql. case n.Op.IsRange(): return newRangeHeapJoinIter(ctx, b, n, row) default: - return newJoinIter(ctx, b, n, row) + return newJoinIter(ctx, b, n, row) } } From b855b92f14636d1212ab87109e7902df68b4f047 Mon Sep 17 00:00:00 2001 From: elianddb Date: Mon, 8 Sep 2025 22:22:24 +0000 Subject: [PATCH 2/2] [ga-format-pr] Run ./format_repo.sh to fix formatting --- sql/analyzer/indexed_joins.go | 2 +- sql/analyzer/unnest_exists_subqueries.go | 1 - sql/plan/subquery.go | 3 ++- sql/rowexec/merge_join.go | 1 - sql/rowexec/rel.go | 2 +- 5 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/analyzer/indexed_joins.go b/sql/analyzer/indexed_joins.go index f1c8e77af2..fe1aedb5a8 100644 --- a/sql/analyzer/indexed_joins.go +++ b/sql/analyzer/indexed_joins.go @@ -1105,7 +1105,7 @@ func addMergeJoins(ctx *sql.Context, m *memo.Memo) error { } if rightIndexMatchesFilters(rIndex, join.Left.RelProps.FuncDeps().Constants(), matchedEqFilters) { jb := join.Copy() - if d, ok := jb.Left.First.(*memo.Distinct); ok && lIndex.SqlIdx().IsUnique() { + if d, ok := jb.Left.First.(*memo.Distinct); ok && lIndex.SqlIdx().IsUnique() { jb.Left = d.Child } if d, ok := jb.Right.First.(*memo.Distinct); ok && rIndex.SqlIdx().IsUnique() { diff --git a/sql/analyzer/unnest_exists_subqueries.go b/sql/analyzer/unnest_exists_subqueries.go index 965934d83c..166de18260 100644 --- a/sql/analyzer/unnest_exists_subqueries.go +++ b/sql/analyzer/unnest_exists_subqueries.go @@ -151,7 +151,6 @@ func unnestExistSubqueries(ctx *sql.Context, scope *plan.Scope, a *Analyzer, fil retFilters = append(retFilters, f) continue } - // recurse if s.inner != nil { diff --git a/sql/plan/subquery.go b/sql/plan/subquery.go index 4903b27e2e..289fcf4e41 100644 --- a/sql/plan/subquery.go +++ b/sql/plan/subquery.go @@ -18,6 +18,7 @@ import ( "fmt" "io" "sync" + "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/hash" "github.com/dolthub/go-mysql-server/sql/transform" @@ -430,7 +431,7 @@ func (s *Subquery) HashMultiple(ctx *sql.Context, row sql.Row) (sql.KeyValueCach // HasResultRow returns whether the subquery has a result set > 0. func (s *Subquery) HasResultRow(ctx *sql.Context, row sql.Row) (bool, error) { - + // First check if the query was cached. s.cacheMu.Lock() cached := s.resultsCached diff --git a/sql/rowexec/merge_join.go b/sql/rowexec/merge_join.go index 0ba0fe1bdf..1926697dda 100644 --- a/sql/rowexec/merge_join.go +++ b/sql/rowexec/merge_join.go @@ -588,4 +588,3 @@ func (i *mergeJoinIter) Close(ctx *sql.Context) (err error) { return err } - diff --git a/sql/rowexec/rel.go b/sql/rowexec/rel.go index 6917d37e45..8c60e6e023 100644 --- a/sql/rowexec/rel.go +++ b/sql/rowexec/rel.go @@ -282,7 +282,7 @@ func (b *BaseBuilder) buildJoinNode(ctx *sql.Context, n *plan.JoinNode, row sql. case n.Op.IsRange(): return newRangeHeapJoinIter(ctx, b, n, row) default: - return newJoinIter(ctx, b, n, row) + return newJoinIter(ctx, b, n, row) } }