Skip to content

Commit 64652e1

Browse files
author
Pavel Velikhov
committed
[NEW RBO] Added contant folding (intermediate commit)
1 parent b796797 commit 64652e1

File tree

6 files changed

+373
-253
lines changed

6 files changed

+373
-253
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#include "kqp_rbo_rules.h"
2+
#include <ydb/core/kqp/common/kqp_yql.h>
3+
#include <yql/essentials/core/yql_expr_optimize.h>
4+
#include <yql/essentials/utils/log/log.h>
5+
#include <typeinfo>
6+
7+
using namespace NYql::NNodes;
8+
9+
namespace {
10+
11+
TNodeOnNodeOwnedMap ExtractConstantExprs(const TExprNode::TPtr& input, TExprContext& ctx, bool foldUdfs = true) {
12+
TNodeOnNodeOwnedMap result;
13+
ExtractConstantExprs(lambda.Body().Ptr(), result, ctx.ExprCtx, foldUdfs);
14+
return result;
15+
}
16+
17+
}
18+
19+
namespace NKikimr {
20+
namespace NKqp {
21+
22+
void TConstantFoldingStage::RunStage(TOpRoot &root, TRBOContext &ctx) {
23+
TVector<TExprNode::TPtr> lambdasWithConstExpr;
24+
bool foldUdfs = ctx.KqpCtx.Config->EnableFoldUdfs();
25+
26+
// Iterate through all operators that contain lambdas with potential constant expression
27+
TNodeOnNodeOwnedMap replaces;
28+
TVector<std::shared_ptr<IOperator>> affectedOps;
29+
30+
for (auto it : root) {
31+
if (!it.Current->GetLambdas().empty()) {
32+
auto lambdas = it.Current->GetLambdas();
33+
bool affected = false;
34+
for (auto l : lambdas) {
35+
auto lambda = TCoLambda(l);
36+
auto extractedMap = ExtractConstantExprs(lambda.Body().Ptr(), ctx.ExprCtx, foldUdfs);
37+
if (!extracted.empty()) {
38+
affected = true;
39+
}
40+
}
41+
42+
if (affected) {
43+
affectedOps.push_back(it.Current);
44+
}
45+
}
46+
}
47+
48+
if (replaces.empty()) {
49+
return;
50+
}
51+
52+
// Build a list of eval expressions
53+
54+
TExprNode::TListType lambdaList;
55+
TExprNode::TListType evalElements;
56+
for (auto & [k, v] : replaces) {
57+
lambdaList.push_back(k);
58+
evalElements.push_back(v);
59+
}
60+
61+
auto evalList = ctx.ExprCtx.NewList(root.Pos, std::move(evalList));
62+
63+
// Run optimizer with eval on the evalList
64+
TOptimizeExprSettings settings(&ctx.TypeCtx);
65+
settings.VisitTuples = false;
66+
ctx.ExprCtx.Step.Repeat(TExprStep::ExprEval);
67+
auto status = RemapExpr(evalList, evalList, replaces, ctx.ExprCtx, settings);
68+
69+
// Iterate over affected operators and modify their expressions with folded expressions
70+
replaces.clear();
71+
for (size_t i=0; i<lambdaList.size(); i++) {
72+
replaces[lambdaList[i]] = evalList->Child(i);
73+
}
74+
75+
for (auto op : affectedOps) {
76+
op->ApplyReplaceMap(replaces);
77+
}
78+
}
79+
}
80+
}

ydb/core/kqp/opt/rbo/kqp_operator.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,10 @@ class IOperator {
267267

268268
const TTypeAnnotationNode* GetIUType(TInfoUnit iu);
269269

270+
virtual TVector<TExpNode::TPtr> GetLambdas() { return {}; }
271+
272+
virtual void ApplyReplaceMap(TNodeOnNodeOwnedMap map, TRBOContext & ctx) { Y_UNUSED(map); Y_UNUSED_(ctx); }
273+
270274
/***
271275
* Rename information units of this operator using a specified mapping
272276
*/
@@ -341,6 +345,8 @@ class TOpMap : public IUnaryOperator {
341345
bool project);
342346
virtual TVector<TInfoUnit> GetOutputIUs() override;
343347
virtual TVector<TInfoUnit> GetScalarSubplanIUs(TPlanProps& props) override;
348+
virtual TVector<TExpNode::TPtr> GetLambdas() override;
349+
virtual void ApplyReplaceMap(TNodeOnNodeOwnedMap map, TRBOContext & ctx) override;
344350

345351
bool HasRenames() const;
346352
bool HasLambdas() const;
@@ -394,6 +400,8 @@ class TOpFilter : public IUnaryOperator {
394400
virtual TVector<TInfoUnit> GetOutputIUs() override;
395401
virtual TVector<TInfoUnit> GetScalarSubplanIUs(TPlanProps& props) override;
396402
virtual TString ToString(TExprContext& ctx) override;
403+
virtual TVector<TExpNode::TPtr> GetLambdas() override;
404+
virtual void ApplyReplaceMap(TNodeOnNodeOwnedMap map, TRBOContext & ctx) override;
397405

398406
TVector<TInfoUnit> GetFilterIUs(TPlanProps& props) const;
399407
TConjunctInfo GetConjunctInfo(TPlanProps& props) const;
@@ -429,6 +437,8 @@ class TOpLimit : public IUnaryOperator {
429437
virtual TVector<TInfoUnit> GetOutputIUs() override;
430438
void RenameIUs(const THashMap<TInfoUnit, TInfoUnit, TInfoUnit::THashFunction> &renameMap, TExprContext &ctx) override;
431439
virtual TString ToString(TExprContext& ctx) override;
440+
virtual TVector<TExpNode::TPtr> GetLambdas() override;
441+
virtual void ApplyReplaceMap(TNodeOnNodeOwnedMap map, TRBOContext & ctx) override;
432442

433443
TExprNode::TPtr LimitCond;
434444
};

ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp

Lines changed: 0 additions & 253 deletions
Original file line numberDiff line numberDiff line change
@@ -662,259 +662,6 @@ bool TAssignStagesRule::TestAndApply(std::shared_ptr<IOperator> &input, TRBOCont
662662
return true;
663663
}
664664

665-
struct Scope {
666-
Scope() {}
667-
668-
TVector<int> ParentScopes;
669-
bool TopScope = false;
670-
bool IdentityMap = true;
671-
THashSet<TInfoUnit, TInfoUnit::THashFunction> Unrenameable;
672-
TVector<TInfoUnit> OutputIUs;
673-
TVector<std::shared_ptr<IOperator>> Operators;
674-
675-
TString ToString(TExprContext &ctx) {
676-
auto res = TStringBuilder() << "{parents: [";
677-
for (int p : ParentScopes) {
678-
res << p << ",";
679-
}
680-
res << "], Identity: " << IdentityMap << ", TopScope: " << TopScope << ", Unrenameable: {";
681-
for (auto &iu : Unrenameable) {
682-
res << iu.GetFullName() << ",";
683-
}
684-
res << "}, Output: {";
685-
for (auto &iu : OutputIUs) {
686-
res << iu.GetFullName() << ",";
687-
}
688-
res << "}, Operators: [";
689-
for (auto &op : Operators) {
690-
res << op->ToString(ctx) << ",";
691-
}
692-
res << "]}";
693-
return res;
694-
}
695-
};
696-
697-
struct TIOperatorSharedPtrHash {
698-
size_t operator()(const std::shared_ptr<IOperator> &p) const { return p ? THash<int64_t>{}((int64_t)p.get()) : 0; }
699-
};
700-
701-
class Scopes {
702-
public:
703-
void ComputeScopesRec(std::shared_ptr<IOperator> &op, int &currScope);
704-
void ComputeScopes(std::shared_ptr<IOperator> &op);
705-
706-
THashMap<int, Scope> ScopeMap;
707-
THashMap<std::shared_ptr<IOperator>, int, TIOperatorSharedPtrHash> RevScopeMap;
708-
};
709-
710-
void Scopes::ComputeScopesRec(std::shared_ptr<IOperator> &op, int &currScope) {
711-
if (RevScopeMap.contains(op)) {
712-
return;
713-
}
714-
bool makeNewScope =
715-
(op->Kind == EOperator::Map && CastOperator<TOpMap>(op)->Project) || (op->Kind == EOperator::Project) || (op->Parents.size() >= 2);
716-
717-
//YQL_CLOG(TRACE, CoreDq) << "Op: " << op->ToString() << ", nparents = " << op->Parents.size();
718-
719-
if (makeNewScope) {
720-
currScope++;
721-
auto newScope = Scope();
722-
// FIXME: The top scope is a scope with id=1
723-
if (currScope == 1) {
724-
newScope.TopScope = true;
725-
}
726-
727-
if (op->Kind == EOperator::Map && CastOperator<TOpMap>(op)->Project) {
728-
auto map = CastOperator<TOpMap>(op);
729-
newScope.OutputIUs = map->GetOutputIUs();
730-
newScope.IdentityMap = false;
731-
} else if (op->Kind == EOperator::Project) {
732-
auto project = CastOperator<TOpProject>(op);
733-
newScope.OutputIUs = project->GetOutputIUs();
734-
newScope.IdentityMap = false;
735-
}
736-
ScopeMap[currScope] = newScope;
737-
}
738-
739-
if (op->Kind == EOperator::Source) {
740-
for (auto iu : op->GetOutputIUs()) {
741-
ScopeMap.at(currScope).Unrenameable.insert(iu);
742-
}
743-
}
744-
745-
ScopeMap.at(currScope).Operators.push_back(op);
746-
RevScopeMap[op] = currScope;
747-
for (auto c : op->Children) {
748-
ComputeScopesRec(c, currScope);
749-
}
750-
}
751-
752-
void Scopes::ComputeScopes(std::shared_ptr<IOperator> &op) {
753-
int currScope = 0;
754-
ScopeMap[0] = Scope();
755-
ComputeScopesRec(op, currScope);
756-
for (auto &[id, sc] : ScopeMap) {
757-
auto topOp = sc.Operators[0];
758-
for (auto &p : topOp->Parents) {
759-
auto parentScopeId = RevScopeMap.at(p.lock());
760-
sc.ParentScopes.push_back(parentScopeId);
761-
if (topOp->Parents.size() >= 2) {
762-
auto &parentScope = ScopeMap.at(parentScopeId);
763-
for (auto iu : sc.OutputIUs) {
764-
parentScope.Unrenameable.insert(iu);
765-
}
766-
}
767-
}
768-
}
769-
}
770-
771-
struct TIntTUnitPairHash {
772-
size_t operator()(const std::pair<int, TInfoUnit> &p) const { return THash<int>{}(p.first) ^ TInfoUnit::THashFunction{}(p.second); }
773-
};
774-
775-
void TRenameStage::RunStage(TOpRoot &root, TRBOContext &ctx) {
776-
777-
YQL_CLOG(TRACE, CoreDq) << "Before compute parents";
778-
779-
for (auto it : root) {
780-
YQL_CLOG(TRACE, CoreDq) << "Iterator: " << it.Current->ToString(ctx.ExprCtx);
781-
for (auto c : it.Current->Children) {
782-
YQL_CLOG(TRACE, CoreDq) << "Child: " << c->ToString(ctx.ExprCtx);
783-
}
784-
}
785-
786-
root.ComputeParents();
787-
788-
// We need to build scopes for the plan, because same aliases and variable names may be
789-
// used multiple times in different scopes
790-
auto scopes = Scopes();
791-
scopes.ComputeScopes(root.GetInput());
792-
793-
for (auto &[id, sc] : scopes.ScopeMap) {
794-
YQL_CLOG(TRACE, CoreDq) << "Scope map: " << id << ": " << sc.ToString(ctx.ExprCtx);
795-
}
796-
797-
// Build a rename map by startingg at maps that rename variables and project
798-
// Follow the parent scopes as far as possible and pick the top-most mapping
799-
// If at any point there are multiple parent scopes - stop
800-
801-
THashMap<std::pair<int, TInfoUnit>, TVector<std::pair<int, TInfoUnit>>, TIntTUnitPairHash> renameMap;
802-
803-
int newAliasId = 1;
804-
805-
for (auto iter : root) {
806-
if (iter.Current->Kind == EOperator::Map && CastOperator<TOpMap>(iter.Current)->Project) {
807-
auto map = CastOperator<TOpMap>(iter.Current);
808-
809-
for (auto [to, body] : map->MapElements) {
810-
811-
auto scopeId = scopes.RevScopeMap.at(map);
812-
auto scope = scopes.ScopeMap.at(scopeId);
813-
auto parentScopes = scope.ParentScopes;
814-
815-
// If we're not in the final scope that exports variables to the user,
816-
// generate a unique new alias for the variable to avoid collisions
817-
auto exportTo = to;
818-
if (!scope.TopScope) {
819-
TString newAlias = "#" + std::to_string(newAliasId++);
820-
exportTo = TInfoUnit(newAlias, to.ColumnName);
821-
}
822-
823-
// "Export" the result of map output to the upper scope, but only if there is one
824-
// parent scope only
825-
auto source = std::make_pair(scopeId, to);
826-
auto target = std::make_pair(parentScopes[0], exportTo);
827-
renameMap[source].push_back(target);
828-
829-
// if (parentScopes.size()==1) {
830-
// renameMap[source].push_back(target);
831-
// }
832-
833-
// If the map element is a rename, record the rename in the map within the same scope
834-
// However skip all unrenamable uis
835-
if (std::holds_alternative<TInfoUnit>(body)) {
836-
auto sourceIU = std::get<TInfoUnit>(body);
837-
if (!scope.Unrenameable.contains(sourceIU)) {
838-
source = std::make_pair(scopeId, sourceIU);
839-
target = std::make_pair(scopeId, to);
840-
renameMap[source].push_back(target);
841-
}
842-
}
843-
}
844-
}
845-
}
846-
847-
for (auto &[key, value] : renameMap) {
848-
if (value.size() == 1) {
849-
YQL_CLOG(TRACE, CoreDq) << "Rename map: " << key.second.GetFullName() << "," << key.first << " -> "
850-
<< value[0].second.GetFullName() << "," << value[0].first;
851-
} else {
852-
YQL_CLOG(TRACE, CoreDq) << "Rename map: " << key.second.GetFullName() << "," << key.first << " -> ";
853-
for (auto v : value) {
854-
YQL_CLOG(TRACE, CoreDq) << v.second.GetFullName() << "," << v.first;
855-
}
856-
}
857-
}
858-
859-
// Make a transitive closure of rename map
860-
THashMap<std::pair<int, TInfoUnit>, std::pair<int, TInfoUnit>, TIntTUnitPairHash> closedMap;
861-
for (auto &[k, v] : renameMap) {
862-
if (v.size() == 1) {
863-
closedMap[k] = v[0];
864-
}
865-
}
866-
867-
bool fixpointReached = false;
868-
while (!fixpointReached) {
869-
870-
fixpointReached = true;
871-
for (auto &[k, v] : closedMap) {
872-
if (closedMap.contains(v)) {
873-
fixpointReached = false;
874-
}
875-
876-
while (closedMap.contains(v)) {
877-
v = closedMap.at(v);
878-
}
879-
closedMap[k] = v;
880-
}
881-
}
882-
883-
// Add unique aliases
884-
885-
// Iterate through the plan, applying renames to one operator at a time
886-
887-
for (auto it : root) {
888-
// Build a subset of the map for the current scope only
889-
auto scopeId = scopes.RevScopeMap.at(it.Current);
890-
891-
// Exclude all IUs from OpReads in this scope
892-
// THashSet<TInfoUnit, TInfoUnit::THashFunction> exclude;
893-
// for (auto & op : scopes.ScopeMap.at(scopeId).Operators) {
894-
// if (op->Kind == EOperator::Source) {
895-
// for (auto iu : op->GetOutputIUs()) {
896-
// exclude.insert(iu);
897-
// }
898-
// }
899-
//}
900-
901-
auto scopedRenameMap = THashMap<TInfoUnit, TInfoUnit, TInfoUnit::THashFunction>();
902-
for (auto &[k, v] : closedMap) {
903-
// if (k.first == scopeId && !exclude.contains(k.second)) {
904-
if (k.first == scopeId) {
905-
scopedRenameMap.emplace(k.second, v.second);
906-
}
907-
}
908-
909-
YQL_CLOG(TRACE, CoreDq) << "Applying renames to operator: " << scopeId << ":" << it.Current->ToString(ctx.ExprCtx);
910-
for (auto &[k, v] : scopedRenameMap) {
911-
YQL_CLOG(TRACE, CoreDq) << "From " << k.GetFullName() << ", To " << v.GetFullName();
912-
}
913-
914-
it.Current->RenameIUs(scopedRenameMap, ctx.ExprCtx);
915-
}
916-
}
917-
918665
TRuleBasedStage RuleStage1 = TRuleBasedStage(
919666
{
920667
std::make_shared<TInlineScalarSubplanRule>(),

0 commit comments

Comments
 (0)