diff --git a/ydb/core/kqp/opt/rbo/kqp_constant_folding_stage.cpp b/ydb/core/kqp/opt/rbo/kqp_constant_folding_stage.cpp new file mode 100644 index 000000000000..d9fbe888ff86 --- /dev/null +++ b/ydb/core/kqp/opt/rbo/kqp_constant_folding_stage.cpp @@ -0,0 +1,80 @@ +#include "kqp_rbo_rules.h" +#include +#include +#include +#include + +using namespace NYql::NNodes; + +namespace { + +TNodeOnNodeOwnedMap ExtractConstantExprs(const TExprNode::TPtr& input, TExprContext& ctx, bool foldUdfs = true) { + TNodeOnNodeOwnedMap result; + ExtractConstantExprs(lambda.Body().Ptr(), result, ctx.ExprCtx, foldUdfs); + return result; +} + +} + +namespace NKikimr { +namespace NKqp { + +void TConstantFoldingStage::RunStage(TOpRoot &root, TRBOContext &ctx) { + TVector lambdasWithConstExpr; + bool foldUdfs = ctx.KqpCtx.Config->EnableFoldUdfs(); + + // Iterate through all operators that contain lambdas with potential constant expression + TNodeOnNodeOwnedMap replaces; + TVector> affectedOps; + + for (auto it : root) { + if (!it.Current->GetLambdas().empty()) { + auto lambdas = it.Current->GetLambdas(); + bool affected = false; + for (auto l : lambdas) { + auto lambda = TCoLambda(l); + auto extractedMap = ExtractConstantExprs(lambda.Body().Ptr(), ctx.ExprCtx, foldUdfs); + if (!extracted.empty()) { + affected = true; + } + } + + if (affected) { + affectedOps.push_back(it.Current); + } + } + } + + if (replaces.empty()) { + return; + } + + // Build a list of eval expressions + + TExprNode::TListType lambdaList; + TExprNode::TListType evalElements; + for (auto & [k, v] : replaces) { + lambdaList.push_back(k); + evalElements.push_back(v); + } + + auto evalList = ctx.ExprCtx.NewList(root.Pos, std::move(evalList)); + + // Run optimizer with eval on the evalList + TOptimizeExprSettings settings(&ctx.TypeCtx); + settings.VisitTuples = false; + ctx.ExprCtx.Step.Repeat(TExprStep::ExprEval); + auto status = RemapExpr(evalList, evalList, replaces, ctx.ExprCtx, settings); + + // Iterate over affected operators and modify their expressions with folded expressions + replaces.clear(); + for (size_t i=0; iChild(i); + } + + for (auto op : affectedOps) { + op->ApplyReplaceMap(replaces); + } +} +} +} \ No newline at end of file diff --git a/ydb/core/kqp/opt/rbo/kqp_operator.h b/ydb/core/kqp/opt/rbo/kqp_operator.h index ce1ce48e272d..969f6d7d3ecd 100644 --- a/ydb/core/kqp/opt/rbo/kqp_operator.h +++ b/ydb/core/kqp/opt/rbo/kqp_operator.h @@ -267,6 +267,10 @@ class IOperator { const TTypeAnnotationNode* GetIUType(TInfoUnit iu); + virtual TVector GetLambdas() { return {}; } + + virtual void ApplyReplaceMap(TNodeOnNodeOwnedMap map, TRBOContext & ctx) { Y_UNUSED(map); Y_UNUSED_(ctx); } + /*** * Rename information units of this operator using a specified mapping */ @@ -341,6 +345,8 @@ class TOpMap : public IUnaryOperator { bool project); virtual TVector GetOutputIUs() override; virtual TVector GetScalarSubplanIUs(TPlanProps& props) override; + virtual TVector GetLambdas() override; + virtual void ApplyReplaceMap(TNodeOnNodeOwnedMap map, TRBOContext & ctx) override; bool HasRenames() const; bool HasLambdas() const; @@ -394,6 +400,8 @@ class TOpFilter : public IUnaryOperator { virtual TVector GetOutputIUs() override; virtual TVector GetScalarSubplanIUs(TPlanProps& props) override; virtual TString ToString(TExprContext& ctx) override; + virtual TVector GetLambdas() override; + virtual void ApplyReplaceMap(TNodeOnNodeOwnedMap map, TRBOContext & ctx) override; TVector GetFilterIUs(TPlanProps& props) const; TConjunctInfo GetConjunctInfo(TPlanProps& props) const; @@ -429,6 +437,8 @@ class TOpLimit : public IUnaryOperator { virtual TVector GetOutputIUs() override; void RenameIUs(const THashMap &renameMap, TExprContext &ctx) override; virtual TString ToString(TExprContext& ctx) override; + virtual TVector GetLambdas() override; + virtual void ApplyReplaceMap(TNodeOnNodeOwnedMap map, TRBOContext & ctx) override; TExprNode::TPtr LimitCond; }; diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp b/ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp index a3ebbc91cc85..b829fe414726 100644 --- a/ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp +++ b/ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp @@ -662,259 +662,6 @@ bool TAssignStagesRule::TestAndApply(std::shared_ptr &input, TRBOCont return true; } -struct Scope { - Scope() {} - - TVector ParentScopes; - bool TopScope = false; - bool IdentityMap = true; - THashSet Unrenameable; - TVector OutputIUs; - TVector> Operators; - - TString ToString(TExprContext &ctx) { - auto res = TStringBuilder() << "{parents: ["; - for (int p : ParentScopes) { - res << p << ","; - } - res << "], Identity: " << IdentityMap << ", TopScope: " << TopScope << ", Unrenameable: {"; - for (auto &iu : Unrenameable) { - res << iu.GetFullName() << ","; - } - res << "}, Output: {"; - for (auto &iu : OutputIUs) { - res << iu.GetFullName() << ","; - } - res << "}, Operators: ["; - for (auto &op : Operators) { - res << op->ToString(ctx) << ","; - } - res << "]}"; - return res; - } -}; - -struct TIOperatorSharedPtrHash { - size_t operator()(const std::shared_ptr &p) const { return p ? THash{}((int64_t)p.get()) : 0; } -}; - -class Scopes { - public: - void ComputeScopesRec(std::shared_ptr &op, int &currScope); - void ComputeScopes(std::shared_ptr &op); - - THashMap ScopeMap; - THashMap, int, TIOperatorSharedPtrHash> RevScopeMap; -}; - -void Scopes::ComputeScopesRec(std::shared_ptr &op, int &currScope) { - if (RevScopeMap.contains(op)) { - return; - } - bool makeNewScope = - (op->Kind == EOperator::Map && CastOperator(op)->Project) || (op->Kind == EOperator::Project) || (op->Parents.size() >= 2); - - //YQL_CLOG(TRACE, CoreDq) << "Op: " << op->ToString() << ", nparents = " << op->Parents.size(); - - if (makeNewScope) { - currScope++; - auto newScope = Scope(); - // FIXME: The top scope is a scope with id=1 - if (currScope == 1) { - newScope.TopScope = true; - } - - if (op->Kind == EOperator::Map && CastOperator(op)->Project) { - auto map = CastOperator(op); - newScope.OutputIUs = map->GetOutputIUs(); - newScope.IdentityMap = false; - } else if (op->Kind == EOperator::Project) { - auto project = CastOperator(op); - newScope.OutputIUs = project->GetOutputIUs(); - newScope.IdentityMap = false; - } - ScopeMap[currScope] = newScope; - } - - if (op->Kind == EOperator::Source) { - for (auto iu : op->GetOutputIUs()) { - ScopeMap.at(currScope).Unrenameable.insert(iu); - } - } - - ScopeMap.at(currScope).Operators.push_back(op); - RevScopeMap[op] = currScope; - for (auto c : op->Children) { - ComputeScopesRec(c, currScope); - } -} - -void Scopes::ComputeScopes(std::shared_ptr &op) { - int currScope = 0; - ScopeMap[0] = Scope(); - ComputeScopesRec(op, currScope); - for (auto &[id, sc] : ScopeMap) { - auto topOp = sc.Operators[0]; - for (auto &p : topOp->Parents) { - auto parentScopeId = RevScopeMap.at(p.lock()); - sc.ParentScopes.push_back(parentScopeId); - if (topOp->Parents.size() >= 2) { - auto &parentScope = ScopeMap.at(parentScopeId); - for (auto iu : sc.OutputIUs) { - parentScope.Unrenameable.insert(iu); - } - } - } - } -} - -struct TIntTUnitPairHash { - size_t operator()(const std::pair &p) const { return THash{}(p.first) ^ TInfoUnit::THashFunction{}(p.second); } -}; - -void TRenameStage::RunStage(TOpRoot &root, TRBOContext &ctx) { - - YQL_CLOG(TRACE, CoreDq) << "Before compute parents"; - - for (auto it : root) { - YQL_CLOG(TRACE, CoreDq) << "Iterator: " << it.Current->ToString(ctx.ExprCtx); - for (auto c : it.Current->Children) { - YQL_CLOG(TRACE, CoreDq) << "Child: " << c->ToString(ctx.ExprCtx); - } - } - - root.ComputeParents(); - - // We need to build scopes for the plan, because same aliases and variable names may be - // used multiple times in different scopes - auto scopes = Scopes(); - scopes.ComputeScopes(root.GetInput()); - - for (auto &[id, sc] : scopes.ScopeMap) { - YQL_CLOG(TRACE, CoreDq) << "Scope map: " << id << ": " << sc.ToString(ctx.ExprCtx); - } - - // Build a rename map by startingg at maps that rename variables and project - // Follow the parent scopes as far as possible and pick the top-most mapping - // If at any point there are multiple parent scopes - stop - - THashMap, TVector>, TIntTUnitPairHash> renameMap; - - int newAliasId = 1; - - for (auto iter : root) { - if (iter.Current->Kind == EOperator::Map && CastOperator(iter.Current)->Project) { - auto map = CastOperator(iter.Current); - - for (auto [to, body] : map->MapElements) { - - auto scopeId = scopes.RevScopeMap.at(map); - auto scope = scopes.ScopeMap.at(scopeId); - auto parentScopes = scope.ParentScopes; - - // If we're not in the final scope that exports variables to the user, - // generate a unique new alias for the variable to avoid collisions - auto exportTo = to; - if (!scope.TopScope) { - TString newAlias = "#" + std::to_string(newAliasId++); - exportTo = TInfoUnit(newAlias, to.ColumnName); - } - - // "Export" the result of map output to the upper scope, but only if there is one - // parent scope only - auto source = std::make_pair(scopeId, to); - auto target = std::make_pair(parentScopes[0], exportTo); - renameMap[source].push_back(target); - - // if (parentScopes.size()==1) { - // renameMap[source].push_back(target); - // } - - // If the map element is a rename, record the rename in the map within the same scope - // However skip all unrenamable uis - if (std::holds_alternative(body)) { - auto sourceIU = std::get(body); - if (!scope.Unrenameable.contains(sourceIU)) { - source = std::make_pair(scopeId, sourceIU); - target = std::make_pair(scopeId, to); - renameMap[source].push_back(target); - } - } - } - } - } - - for (auto &[key, value] : renameMap) { - if (value.size() == 1) { - YQL_CLOG(TRACE, CoreDq) << "Rename map: " << key.second.GetFullName() << "," << key.first << " -> " - << value[0].second.GetFullName() << "," << value[0].first; - } else { - YQL_CLOG(TRACE, CoreDq) << "Rename map: " << key.second.GetFullName() << "," << key.first << " -> "; - for (auto v : value) { - YQL_CLOG(TRACE, CoreDq) << v.second.GetFullName() << "," << v.first; - } - } - } - - // Make a transitive closure of rename map - THashMap, std::pair, TIntTUnitPairHash> closedMap; - for (auto &[k, v] : renameMap) { - if (v.size() == 1) { - closedMap[k] = v[0]; - } - } - - bool fixpointReached = false; - while (!fixpointReached) { - - fixpointReached = true; - for (auto &[k, v] : closedMap) { - if (closedMap.contains(v)) { - fixpointReached = false; - } - - while (closedMap.contains(v)) { - v = closedMap.at(v); - } - closedMap[k] = v; - } - } - - // Add unique aliases - - // Iterate through the plan, applying renames to one operator at a time - - for (auto it : root) { - // Build a subset of the map for the current scope only - auto scopeId = scopes.RevScopeMap.at(it.Current); - - // Exclude all IUs from OpReads in this scope - // THashSet exclude; - // for (auto & op : scopes.ScopeMap.at(scopeId).Operators) { - // if (op->Kind == EOperator::Source) { - // for (auto iu : op->GetOutputIUs()) { - // exclude.insert(iu); - // } - // } - //} - - auto scopedRenameMap = THashMap(); - for (auto &[k, v] : closedMap) { - // if (k.first == scopeId && !exclude.contains(k.second)) { - if (k.first == scopeId) { - scopedRenameMap.emplace(k.second, v.second); - } - } - - YQL_CLOG(TRACE, CoreDq) << "Applying renames to operator: " << scopeId << ":" << it.Current->ToString(ctx.ExprCtx); - for (auto &[k, v] : scopedRenameMap) { - YQL_CLOG(TRACE, CoreDq) << "From " << k.GetFullName() << ", To " << v.GetFullName(); - } - - it.Current->RenameIUs(scopedRenameMap, ctx.ExprCtx); - } -} - TRuleBasedStage RuleStage1 = TRuleBasedStage( { std::make_shared(), diff --git a/ydb/core/kqp/opt/rbo/kqp_rbo_rules.h b/ydb/core/kqp/opt/rbo/kqp_rbo_rules.h index 8dc9d472675f..1879f17f7c5e 100644 --- a/ydb/core/kqp/opt/rbo/kqp_rbo_rules.h +++ b/ydb/core/kqp/opt/rbo/kqp_rbo_rules.h @@ -67,10 +67,21 @@ class TAssignStagesRule : public IRule { extern TRuleBasedStage RuleStage1; extern TRuleBasedStage RuleStage2; +/** + * Separate global stage to remove extra renames and project out unneeded columns + */ class TRenameStage : public ISinglePassStage { public: virtual void RunStage(TOpRoot &root, TRBOContext &ctx) override; }; +/** + * Separate global constant folding stage + */ +class TConstantFoldingStage : public ISinglePassStage { + public: + virtual void RunStage(TOpRoot &root, TRBOContext &ctx) override; +}; + } // namespace NKqp } // namespace NKikimr \ No newline at end of file diff --git a/ydb/core/kqp/opt/rbo/kqp_rename_unused_stage.cpp b/ydb/core/kqp/opt/rbo/kqp_rename_unused_stage.cpp new file mode 100644 index 000000000000..61a21b6f801e --- /dev/null +++ b/ydb/core/kqp/opt/rbo/kqp_rename_unused_stage.cpp @@ -0,0 +1,270 @@ +#include "kqp_rbo_rules.h" +#include +#include +#include +#include + +using namespace NYql::NNodes; + +namespace NKikimr { +namespace NKqp { + +struct Scope { + Scope() {} + + TVector ParentScopes; + bool TopScope = false; + bool IdentityMap = true; + THashSet Unrenameable; + TVector OutputIUs; + TVector> Operators; + + TString ToString(TExprContext &ctx) { + auto res = TStringBuilder() << "{parents: ["; + for (int p : ParentScopes) { + res << p << ","; + } + res << "], Identity: " << IdentityMap << ", TopScope: " << TopScope << ", Unrenameable: {"; + for (auto &iu : Unrenameable) { + res << iu.GetFullName() << ","; + } + res << "}, Output: {"; + for (auto &iu : OutputIUs) { + res << iu.GetFullName() << ","; + } + res << "}, Operators: ["; + for (auto &op : Operators) { + res << op->ToString(ctx) << ","; + } + res << "]}"; + return res; + } +}; + +struct TIOperatorSharedPtrHash { + size_t operator()(const std::shared_ptr &p) const { return p ? THash{}((int64_t)p.get()) : 0; } +}; + +class Scopes { + public: + void ComputeScopesRec(std::shared_ptr &op, int &currScope); + void ComputeScopes(std::shared_ptr &op); + + THashMap ScopeMap; + THashMap, int, TIOperatorSharedPtrHash> RevScopeMap; +}; + +void Scopes::ComputeScopesRec(std::shared_ptr &op, int &currScope) { + if (RevScopeMap.contains(op)) { + return; + } + bool makeNewScope = + (op->Kind == EOperator::Map && CastOperator(op)->Project) || (op->Kind == EOperator::Project) || (op->Parents.size() >= 2); + + //YQL_CLOG(TRACE, CoreDq) << "Op: " << op->ToString() << ", nparents = " << op->Parents.size(); + + if (makeNewScope) { + currScope++; + auto newScope = Scope(); + // FIXME: The top scope is a scope with id=1 + if (currScope == 1) { + newScope.TopScope = true; + } + + if (op->Kind == EOperator::Map && CastOperator(op)->Project) { + auto map = CastOperator(op); + newScope.OutputIUs = map->GetOutputIUs(); + newScope.IdentityMap = false; + } else if (op->Kind == EOperator::Project) { + auto project = CastOperator(op); + newScope.OutputIUs = project->GetOutputIUs(); + newScope.IdentityMap = false; + } + ScopeMap[currScope] = newScope; + } + + if (op->Kind == EOperator::Source) { + for (auto iu : op->GetOutputIUs()) { + ScopeMap.at(currScope).Unrenameable.insert(iu); + } + } + + ScopeMap.at(currScope).Operators.push_back(op); + RevScopeMap[op] = currScope; + for (auto c : op->Children) { + ComputeScopesRec(c, currScope); + } +} + +void Scopes::ComputeScopes(std::shared_ptr &op) { + int currScope = 0; + ScopeMap[0] = Scope(); + ComputeScopesRec(op, currScope); + for (auto &[id, sc] : ScopeMap) { + auto topOp = sc.Operators[0]; + for (auto &p : topOp->Parents) { + auto parentScopeId = RevScopeMap.at(p.lock()); + sc.ParentScopes.push_back(parentScopeId); + if (topOp->Parents.size() >= 2) { + auto &parentScope = ScopeMap.at(parentScopeId); + for (auto iu : sc.OutputIUs) { + parentScope.Unrenameable.insert(iu); + } + } + } + } +} + +struct TIntTUnitPairHash { + size_t operator()(const std::pair &p) const { return THash{}(p.first) ^ TInfoUnit::THashFunction{}(p.second); } +}; + + +/** + * Global stage that removed unnecessary renames and unused columns + */ +void TRenameStage::RunStage(TOpRoot &root, TRBOContext &ctx) { + + YQL_CLOG(TRACE, CoreDq) << "Before compute parents"; + + for (auto it : root) { + YQL_CLOG(TRACE, CoreDq) << "Iterator: " << it.Current->ToString(ctx.ExprCtx); + for (auto c : it.Current->Children) { + YQL_CLOG(TRACE, CoreDq) << "Child: " << c->ToString(ctx.ExprCtx); + } + } + + root.ComputeParents(); + + // We need to build scopes for the plan, because same aliases and variable names may be + // used multiple times in different scopes + auto scopes = Scopes(); + scopes.ComputeScopes(root.GetInput()); + + for (auto &[id, sc] : scopes.ScopeMap) { + YQL_CLOG(TRACE, CoreDq) << "Scope map: " << id << ": " << sc.ToString(ctx.ExprCtx); + } + + // Build a rename map by startingg at maps that rename variables and project + // Follow the parent scopes as far as possible and pick the top-most mapping + // If at any point there are multiple parent scopes - stop + + THashMap, TVector>, TIntTUnitPairHash> renameMap; + + int newAliasId = 1; + + for (auto iter : root) { + if (iter.Current->Kind == EOperator::Map && CastOperator(iter.Current)->Project) { + auto map = CastOperator(iter.Current); + + for (auto [to, body] : map->MapElements) { + + auto scopeId = scopes.RevScopeMap.at(map); + auto scope = scopes.ScopeMap.at(scopeId); + auto parentScopes = scope.ParentScopes; + + // If we're not in the final scope that exports variables to the user, + // generate a unique new alias for the variable to avoid collisions + auto exportTo = to; + if (!scope.TopScope) { + TString newAlias = "#" + std::to_string(newAliasId++); + exportTo = TInfoUnit(newAlias, to.ColumnName); + } + + // "Export" the result of map output to the upper scope, but only if there is one + // parent scope only + auto source = std::make_pair(scopeId, to); + auto target = std::make_pair(parentScopes[0], exportTo); + renameMap[source].push_back(target); + + // if (parentScopes.size()==1) { + // renameMap[source].push_back(target); + // } + + // If the map element is a rename, record the rename in the map within the same scope + // However skip all unrenamable uis + if (std::holds_alternative(body)) { + auto sourceIU = std::get(body); + if (!scope.Unrenameable.contains(sourceIU)) { + source = std::make_pair(scopeId, sourceIU); + target = std::make_pair(scopeId, to); + renameMap[source].push_back(target); + } + } + } + } + } + + for (auto &[key, value] : renameMap) { + if (value.size() == 1) { + YQL_CLOG(TRACE, CoreDq) << "Rename map: " << key.second.GetFullName() << "," << key.first << " -> " + << value[0].second.GetFullName() << "," << value[0].first; + } else { + YQL_CLOG(TRACE, CoreDq) << "Rename map: " << key.second.GetFullName() << "," << key.first << " -> "; + for (auto v : value) { + YQL_CLOG(TRACE, CoreDq) << v.second.GetFullName() << "," << v.first; + } + } + } + + // Make a transitive closure of rename map + THashMap, std::pair, TIntTUnitPairHash> closedMap; + for (auto &[k, v] : renameMap) { + if (v.size() == 1) { + closedMap[k] = v[0]; + } + } + + bool fixpointReached = false; + while (!fixpointReached) { + + fixpointReached = true; + for (auto &[k, v] : closedMap) { + if (closedMap.contains(v)) { + fixpointReached = false; + } + + while (closedMap.contains(v)) { + v = closedMap.at(v); + } + closedMap[k] = v; + } + } + + // Add unique aliases + + // Iterate through the plan, applying renames to one operator at a time + + for (auto it : root) { + // Build a subset of the map for the current scope only + auto scopeId = scopes.RevScopeMap.at(it.Current); + + // Exclude all IUs from OpReads in this scope + // THashSet exclude; + // for (auto & op : scopes.ScopeMap.at(scopeId).Operators) { + // if (op->Kind == EOperator::Source) { + // for (auto iu : op->GetOutputIUs()) { + // exclude.insert(iu); + // } + // } + //} + + auto scopedRenameMap = THashMap(); + for (auto &[k, v] : closedMap) { + // if (k.first == scopeId && !exclude.contains(k.second)) { + if (k.first == scopeId) { + scopedRenameMap.emplace(k.second, v.second); + } + } + + YQL_CLOG(TRACE, CoreDq) << "Applying renames to operator: " << scopeId << ":" << it.Current->ToString(ctx.ExprCtx); + for (auto &[k, v] : scopedRenameMap) { + YQL_CLOG(TRACE, CoreDq) << "From " << k.GetFullName() << ", To " << v.GetFullName(); + } + + it.Current->RenameIUs(scopedRenameMap, ctx.ExprCtx); + } +} + +} +} \ No newline at end of file diff --git a/ydb/core/kqp/opt/rbo/ya.make b/ydb/core/kqp/opt/rbo/ya.make index 44682613107b..25a41e68bfd5 100644 --- a/ydb/core/kqp/opt/rbo/ya.make +++ b/ydb/core/kqp/opt/rbo/ya.make @@ -8,6 +8,8 @@ SRCS( kqp_convert_to_physical.cpp kqp_plan_conversion_utils.cpp kqp_rbo_type_ann.cpp + kqp_rename_unused_stage.cpp + kqp_constant_folding_stage.cpp ) PEERDIR(