Skip to content

Commit 4579632

Browse files
authored
YQ-4593 supported table writing in streaming queries (#28220)
1 parent 83b0980 commit 4579632

File tree

12 files changed

+277
-61
lines changed

12 files changed

+277
-61
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1933,11 +1933,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
19331933
const auto& stage = tx.Body->GetStages(stageIdx);
19341934
const auto& stageInfo = TasksGraph.GetStageInfo(TStageId(txIdx, stageIdx));
19351935

1936-
if (graphRestored && (stageInfo.Meta.ShardOperations || stageInfo.Meta.ShardKind != NSchemeCache::ETableKind::KindUnknown)) {
1937-
ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, YqlIssue({}, NYql::TIssuesIds::KIKIMR_INTERNAL_ERROR, "Restore is not supported for table operations"));
1938-
return;
1939-
}
1940-
19411936
if (stageInfo.Meta.ShardKind == NSchemeCache::ETableKind::KindAsyncIndexTable) {
19421937
TMaybe<TString> error;
19431938

ydb/core/kqp/gateway/behaviour/streaming_query/optimization.cpp

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ struct TStreamingExploreCtx {
1919
TExprContext& Ctx;
2020
std::unordered_set<const TExprNode*> Visited;
2121
ui64 StreamingReads = 0;
22-
ui64 StreamingWrites = 0;
22+
ui64 Writes = 0;
2323
};
2424

2525
bool ExploreStreamingQueryNode(TExprNode::TPtr node, TStreamingExploreCtx& res) {
@@ -58,24 +58,31 @@ bool ExploreStreamingQueryNode(TExprNode::TPtr node, TStreamingExploreCtx& res)
5858
}
5959

6060
if (const auto maybeDataSink = TMaybeNode<TCoDataSink>(providerArg)) {
61+
++res.Writes;
62+
6163
const auto dataSinkCategory = maybeDataSink.Cast().Category().Value();
62-
if (dataSinkCategory == NYql::PqProviderName) {
63-
++res.StreamingWrites;
64+
if (IsIn({NYql::PqProviderName, NYql::SolomonProviderName}, dataSinkCategory)) {
6465
return true;
6566
}
6667

67-
if (dataSinkCategory == NYql::SolomonProviderName) {
68+
if (dataSinkCategory == NYql::KikimrProviderName) {
69+
const auto maybeYdbWrite = TMaybeNode<TKiWriteTable>(node);
70+
if (!maybeYdbWrite) {
71+
res.Ctx.AddError(NYql::TIssue(res.Ctx.GetPosition(node->Pos()), "Operations with YDB objects is not allowed inside streaming queries"));
72+
return false;
73+
}
74+
75+
const auto ydbWrite = maybeYdbWrite.Cast();
76+
if (const TString mode(ydbWrite.Mode()); mode != "upsert") {
77+
res.Ctx.AddError(NYql::TIssue(res.Ctx.GetPosition(node->Pos()), TStringBuilder() << "Only UPSERT writing mode is supported for YDB writes inside streaming queries, got mode: " << to_upper(mode)));
78+
return false;
79+
}
80+
6881
return true;
6982
}
7083

7184
if (dataSinkCategory == NYql::ResultProviderName) {
7285
res.Ctx.AddError(NYql::TIssue(res.Ctx.GetPosition(node->Pos()), "Results is not allowed for streaming queries, please use INSERT to record the query result"));
73-
} else if (dataSinkCategory == NYql::KikimrProviderName) {
74-
if (TMaybeNode<TKiWriteTable>(node)) {
75-
res.Ctx.AddError(NYql::TIssue(res.Ctx.GetPosition(node->Pos()), "Writing into YDB tables is not supported now for streaming queries"));
76-
} else {
77-
res.Ctx.AddError(NYql::TIssue(res.Ctx.GetPosition(node->Pos()), "Operations with YDB objects is not allowed inside streaming queries"));
78-
}
7986
} else {
8087
res.Ctx.AddError(NYql::TIssue(res.Ctx.GetPosition(node->Pos()), TStringBuilder() << "Writing into data sink " << dataSinkCategory << " is not supported now for streaming queries"));
8188
}
@@ -111,8 +118,8 @@ bool CheckStreamingQueryAst(TExprNode::TPtr ast, TExprContext& ctx) {
111118
return false;
112119
}
113120

114-
if (res.StreamingWrites > 1) {
115-
ctx.AddError(NYql::TIssue(ctx.GetPosition(ast->Pos()), "Streaming query with more than one streaming write to topic is not supported now"));
121+
if (res.Writes > 1) {
122+
ctx.AddError(NYql::TIssue(ctx.GetPosition(ast->Pos()), "Streaming query with more than one write is not supported now"));
116123
return false;
117124
}
118125

ydb/core/kqp/gateway/behaviour/streaming_query/queries.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2182,7 +2182,7 @@ class TRequestHandlerBase : public TActionActorBase<TDerived> {
21822182
{}
21832183

21842184
void Bootstrap() {
2185-
LOG_D("Bootstrap. Fetch config");
2185+
LOG_D("Bootstrap");
21862186

21872187
TBase::Become(&TDerived::StateFunc);
21882188
DescribeQuery("start handling");

ydb/core/kqp/host/kqp_runner.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ class TCompilePhysicalQueryTransformer : public TSyncTransformerBase {
8282
TKqpPhysicalQuery physicalQuery(input);
8383

8484
YQL_ENSURE(TransformCtx.DataQueryBlocks);
85-
auto compiler = CreateKqpQueryCompiler(Cluster, Database, OptimizeCtx.Tables, FuncRegistry, TypesCtx, Config);
85+
auto compiler = CreateKqpQueryCompiler(Cluster, Database, FuncRegistry, TypesCtx, OptimizeCtx, Config);
8686
auto ret = compiler->CompilePhysicalQuery(physicalQuery, *TransformCtx.DataQueryBlocks, *preparedQuery.MutablePhysicalQuery(), ctx);
8787
if (!ret) {
8888
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), "Failed to compile physical query."));

ydb/core/kqp/opt/kqp_opt_kql.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ TExprBase BuildFillTable(const TKiWriteTable& write, TExprContext& ctx)
318318

319319
TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputColumns,
320320
const TCoAtomList& autoincrement, const bool isSink,
321-
const TKikimrTableDescription& table, TExprContext& ctx)
321+
const TKikimrTableDescription& table, TExprContext& ctx, TKqpOptimizeContext& kqpCtx)
322322
{
323323
auto generateColumnsIfInsertNode = GetSetting(write.Settings().Ref(), "generate_columns_if_insert");
324324
YQL_ENSURE(generateColumnsIfInsertNode);
@@ -328,6 +328,11 @@ TExprBase BuildUpsertTable(const TKiWriteTable& write, const TCoAtomList& inputC
328328
generateColumnsIfInsert = ExtendGenerateOnInsertColumnsList(write, generateColumnsIfInsert, inputColumns, autoincrement, ctx);
329329

330330
settings = AddSetting(*settings, write.Pos(), "Mode", Build<TCoAtom>(ctx, write.Pos()).Value("upsert").Done().Ptr(), ctx);
331+
332+
if (const auto requestContext = kqpCtx.UserRequestContext; requestContext && requestContext->IsStreamingQuery) {
333+
settings = AddSetting(*settings, write.Pos(), "AllowInconsistentWrites", nullptr, ctx);
334+
}
335+
331336
const auto [input, columns] = BuildWriteInput(write, table, inputColumns, autoincrement, isSink, write.Pos(), ctx);
332337
if (generateColumnsIfInsert.Ref().ChildrenSize() > 0) {
333338
return Build<TKqlInsertOnConflictUpdateRows>(ctx, write.Pos())
@@ -858,13 +863,13 @@ TExprNode::TPtr HandleReadTable(const TKiReadTable& read, TExprContext& ctx, con
858863

859864
TExprBase WriteTableSimple(const TKiWriteTable& write, const TCoAtomList& inputColumns,
860865
const TCoAtomList& autoincrement,
861-
const TKikimrTableDescription& tableData, TExprContext& ctx, const bool isSink)
866+
const TKikimrTableDescription& tableData, TExprContext& ctx, TKqpOptimizeContext& kqpCtx, const bool isSink)
862867
{
863868
Y_UNUSED(isSink);
864869
auto op = GetTableOp(write);
865870
switch (op) {
866871
case TYdbOperation::Upsert:
867-
return BuildUpsertTable(write, inputColumns, autoincrement, isSink, tableData, ctx);
872+
return BuildUpsertTable(write, inputColumns, autoincrement, isSink, tableData, ctx, kqpCtx);
868873
case TYdbOperation::Replace:
869874
return BuildReplaceTable(write, inputColumns, autoincrement, isSink, tableData, ctx);
870875
case TYdbOperation::InsertAbort:
@@ -957,7 +962,7 @@ TExprNode::TPtr HandleWriteTable(const TKiWriteTable& write, TExprContext& ctx,
957962
if (HasIndexesToWrite(tableData)) {
958963
return WriteTableWithIndexUpdate(write, inputColumns, defaultConstraintColumns, tableData, ctx, isSink).Ptr();
959964
} else {
960-
return WriteTableSimple(write, inputColumns, defaultConstraintColumns, tableData, ctx, isSink).Ptr();
965+
return WriteTableSimple(write, inputColumns, defaultConstraintColumns, tableData, ctx, kqpCtx, isSink).Ptr();
961966
}
962967
}
963968

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -609,17 +609,18 @@ TStringBuf RemoveJoinAliases(TStringBuf keyName) {
609609

610610
class TKqpQueryCompiler : public IKqpQueryCompiler {
611611
public:
612-
TKqpQueryCompiler(const TString& cluster, const TString& database, const TIntrusivePtr<TKikimrTablesData> tablesData,
613-
const NMiniKQL::IFunctionRegistry& funcRegistry, TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config)
612+
TKqpQueryCompiler(const TString& cluster, const TString& database, const NMiniKQL::IFunctionRegistry& funcRegistry,
613+
TTypeAnnotationContext& typesCtx, NOpt::TKqpOptimizeContext& optimizeCtx, NYql::TKikimrConfiguration::TPtr config)
614614
: Cluster(cluster)
615615
, Database(database)
616-
, TablesData(tablesData)
616+
, TablesData(optimizeCtx.Tables)
617617
, FuncRegistry(funcRegistry)
618618
, Alloc(__LOCATION__, TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators())
619619
, TypeEnv(Alloc)
620-
, KqlCtx(cluster, tablesData, TypeEnv, FuncRegistry)
620+
, KqlCtx(cluster, optimizeCtx.Tables, TypeEnv, FuncRegistry)
621621
, KqlCompiler(CreateKqlCompiler(KqlCtx, typesCtx))
622622
, TypesCtx(typesCtx)
623+
, OptimizeCtx(optimizeCtx)
623624
, Config(config)
624625
{
625626
Alloc.Release();
@@ -1346,8 +1347,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
13461347

13471348
settingsProto.SetIsOlap(tableMeta->Kind == EKikimrTableKind::Olap);
13481349

1349-
AFL_ENSURE(settings.InconsistentWrite().Cast().StringValue() == "false");
1350-
settingsProto.SetInconsistentTx(false);
1350+
const bool inconsistentWrite = settings.InconsistentWrite().Cast().Value() == "true"sv;
1351+
AFL_ENSURE(!inconsistentWrite || (OptimizeCtx.UserRequestContext && OptimizeCtx.UserRequestContext->IsStreamingQuery));
1352+
settingsProto.SetInconsistentTx(inconsistentWrite);
13511353

13521354
if (Config->EnableIndexStreamWrite && settingsProto.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT) {
13531355
AFL_ENSURE(tableMeta->Indexes.size() == tableMeta->ImplTables.size());
@@ -1926,17 +1928,18 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
19261928
TKqlCompileContext KqlCtx;
19271929
TIntrusivePtr<NCommon::IMkqlCallableCompiler> KqlCompiler;
19281930
TTypeAnnotationContext& TypesCtx;
1931+
NOpt::TKqpOptimizeContext& OptimizeCtx;
19291932
TKikimrConfiguration::TPtr Config;
19301933
TSet<TString> SecretNames;
19311934
};
19321935

19331936
} // namespace
19341937

19351938
TIntrusivePtr<IKqpQueryCompiler> CreateKqpQueryCompiler(const TString& cluster, const TString& database,
1936-
const TIntrusivePtr<TKikimrTablesData> tablesData, const IFunctionRegistry& funcRegistry,
1937-
TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config)
1939+
const IFunctionRegistry& funcRegistry, TTypeAnnotationContext& typesCtx,
1940+
NOpt::TKqpOptimizeContext& optimizeCtx, NYql::TKikimrConfiguration::TPtr config)
19381941
{
1939-
return MakeIntrusive<TKqpQueryCompiler>(cluster, database, tablesData, funcRegistry, typesCtx, config);
1942+
return MakeIntrusive<TKqpQueryCompiler>(cluster, database, funcRegistry, typesCtx, optimizeCtx, config);
19401943
}
19411944

19421945
} // namespace NKqp
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
#pragma once
22

33
#include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h>
4+
#include <ydb/core/kqp/opt/kqp_opt.h>
45
#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
6+
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
57
#include <ydb/core/kqp/provider/yql_kikimr_settings.h>
68
#include <ydb/core/protos/kqp_physical.pb.h>
79

8-
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
9-
1010
namespace NKikimr {
1111
namespace NKqp {
1212

@@ -18,8 +18,8 @@ class IKqpQueryCompiler : public TThrRefBase {
1818
};
1919

2020
TIntrusivePtr<IKqpQueryCompiler> CreateKqpQueryCompiler(const TString& cluster, const TString& database,
21-
const TIntrusivePtr<NYql::TKikimrTablesData> tablesData, const NMiniKQL::IFunctionRegistry& funcRegistry,
22-
NYql::TTypeAnnotationContext& typesCtx, NYql::TKikimrConfiguration::TPtr config);
21+
const NMiniKQL::IFunctionRegistry& funcRegistry, NYql::TTypeAnnotationContext& typesCtx,
22+
NOpt::TKqpOptimizeContext& optimizeCtx, NYql::TKikimrConfiguration::TPtr config);
2323

2424
} // namespace NKqp
2525
} // namespace NKikimr

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1870,21 +1870,21 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
18701870
return result;
18711871
}
18721872

1873-
void SendData(NMiniKQL::TUnboxedValueBatch&& data, i64 size, const TMaybe<NYql::NDqProto::TCheckpoint>&, bool finished) final {
1873+
void SendData(NMiniKQL::TUnboxedValueBatch&& data, i64 size, const TMaybe<NYql::NDqProto::TCheckpoint>& checkpoint, bool finished) final {
18741874
YQL_ENSURE(!data.IsWide(), "Wide stream is not supported yet");
1875-
YQL_ENSURE(!Closed);
1875+
YQL_ENSURE(!Closed || data.empty());
18761876
Closed = finished;
18771877
EgressStats.Resume();
18781878
Y_UNUSED(size);
18791879

18801880
try {
1881-
Batcher->AddData(data);
1882-
YQL_ENSURE(WriteTableActor);
1883-
WriteTableActor->Write(WriteToken, GetOperation(Settings.GetType()), Batcher->Build());
1884-
if (Closed) {
1885-
WriteTableActor->Close(WriteToken);
1886-
WriteTableActor->FlushBuffers();
1887-
WriteTableActor->Close();
1881+
if (!data.empty()) {
1882+
Batcher->AddData(data);
1883+
DataBuffer.emplace(Batcher->Build());
1884+
}
1885+
1886+
if (checkpoint) {
1887+
DataBuffer.emplace(*checkpoint);
18881888
}
18891889
} catch (const TMemoryLimitExceededException&) {
18901890
RuntimeError(
@@ -1908,6 +1908,32 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
19081908

19091909
void Process() {
19101910
try {
1911+
YQL_ENSURE(WriteTableActor);
1912+
if (CheckpointInProgress && WriteTableActor->IsEmpty()) {
1913+
DoCheckpoint();
1914+
}
1915+
1916+
while (!DataBuffer.empty() && !CheckpointInProgress) {
1917+
auto variant = std::move(DataBuffer.front());
1918+
DataBuffer.pop();
1919+
1920+
if (std::holds_alternative<IDataBatchPtr>(variant)) {
1921+
WriteTableActor->Write(WriteToken, GetOperation(Settings.GetType()), std::get<IDataBatchPtr>(std::move(variant)));
1922+
} else if (std::holds_alternative<NYql::NDqProto::TCheckpoint>(variant)) {
1923+
CheckpointInProgress = std::get<NYql::NDqProto::TCheckpoint>(std::move(variant));
1924+
WriteTableActor->FlushBuffers();
1925+
if (WriteTableActor->IsEmpty()) {
1926+
DoCheckpoint();
1927+
}
1928+
}
1929+
}
1930+
1931+
if (Closed && DataBuffer.empty() && !WriteTableActor->IsClosed()) {
1932+
WriteTableActor->Close(WriteToken);
1933+
WriteTableActor->FlushBuffers();
1934+
WriteTableActor->Close();
1935+
}
1936+
19111937
const bool outOfMemory = GetFreeSpace() <= 0;
19121938
if (outOfMemory) {
19131939
WaitingForTableActor = true;
@@ -1929,7 +1955,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
19291955
WriteTableActor->FlushBuffers();
19301956
}
19311957

1932-
if (Closed || outOfMemory) {
1958+
if (Closed || outOfMemory || CheckpointInProgress) {
19331959
if (!WriteTableActor->FlushToShards()) {
19341960
return;
19351961
}
@@ -2025,6 +2051,12 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
20252051
}
20262052
}
20272053

2054+
void DoCheckpoint() {
2055+
YQL_ENSURE(CheckpointInProgress);
2056+
Callbacks->OnAsyncOutputStateSaved({}, OutputIndex, *CheckpointInProgress);
2057+
CheckpointInProgress = std::nullopt;
2058+
}
2059+
20282060
TString LogPrefix;
20292061
const NKikimrKqp::TKqpTableSinkSettings Settings;
20302062
TWriteActorSettings MessageSettings;
@@ -2041,6 +2073,8 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
20412073
TActorId WriteTableActorId;
20422074

20432075
TKqpTableWriteActor::TWriteToken WriteToken = 0;
2076+
std::queue<std::variant<IDataBatchPtr, NYql::NDqProto::TCheckpoint>> DataBuffer;
2077+
std::optional<NYql::NDqProto::TCheckpoint> CheckpointInProgress;
20442078

20452079
bool Closed = false;
20462080
bool WaitingForTableActor = false;

ydb/core/kqp/runtime/kqp_write_table.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ class TRowsBatcher {
707707
return res;
708708
}
709709

710-
auto poolAlloc = CreateOffloadedPoolAllocator(std::move(Alloc));
710+
auto poolAlloc = CreateOffloadedPoolAllocator(Alloc);
711711
return MakeIntrusive<TRowBatch>(TOwnedCellVecBatch(poolAlloc->CreateMemoryPool()), poolAlloc);
712712
}
713713

@@ -719,7 +719,7 @@ class TRowsBatcher {
719719
if (Batches.empty() || (MaxBytesPerBatch && newMemorySerialized + Batches.back()->GetMemorySerialized() > *MaxBytesPerBatch)) {
720720
Batches.emplace_back(std::make_unique<TBatch>(Alloc));
721721
}
722-
722+
723723
AFL_ENSURE(newMemory == Batches.back()->AddRow(std::move(row)));
724724
Memory += newMemory;
725725
}
@@ -1182,8 +1182,9 @@ class TShardsInfo {
11821182
public:
11831183
class TShardInfo {
11841184
friend class TShardsInfo;
1185-
TShardInfo(i64& memory, ui64& nextCookie, bool& closed)
1185+
TShardInfo(i64& memory, ui64& pendingBatches, ui64& nextCookie, bool& closed)
11861186
: Memory(memory)
1187+
, PendingBatches(pendingBatches)
11871188
, NextCookie(nextCookie)
11881189
, Cookie(NextCookie++)
11891190
, Closed(closed) {
@@ -1238,6 +1239,7 @@ class TShardsInfo {
12381239
const i64 batchMemory = Batches.front().GetMemory();
12391240
result.DataSize += batchMemory;
12401241
Memory -= batchMemory;
1242+
PendingBatches--;
12411243
Batches.pop_front();
12421244
}
12431245

@@ -1254,6 +1256,7 @@ class TShardsInfo {
12541256
AFL_ENSURE(!IsClosed());
12551257
Batches.emplace_back(std::move(batch));
12561258
Memory += Batches.back().GetMemory();
1259+
PendingBatches++;
12571260
HasReadInBatch |= Batches.back().HasRead;
12581261
}
12591262

@@ -1292,6 +1295,7 @@ class TShardsInfo {
12921295
private:
12931296
std::deque<TBatchWithMetadata> Batches;
12941297
i64& Memory;
1298+
ui64& PendingBatches;
12951299
bool HasReadInBatch = false;
12961300

12971301
ui64& NextCookie;
@@ -1310,7 +1314,7 @@ class TShardsInfo {
13101314
return it->second;
13111315
}
13121316

1313-
auto [insertIt, _] = ShardsInfo.emplace(shard, TShardInfo(Memory, NextCookie, Closed));
1317+
auto [insertIt, _] = ShardsInfo.emplace(shard, TShardInfo(Memory, PendingBatches, NextCookie, Closed));
13141318
return insertIt->second;
13151319
}
13161320

@@ -1330,12 +1334,7 @@ class TShardsInfo {
13301334
}
13311335

13321336
bool IsEmpty() const {
1333-
for (const auto& [_, shard] : ShardsInfo) {
1334-
if (!shard.IsEmpty()) {
1335-
return false;
1336-
}
1337-
}
1338-
return true;
1337+
return PendingBatches == 0;
13391338
}
13401339

13411340
bool IsFinished() const {
@@ -1373,6 +1372,7 @@ class TShardsInfo {
13731372
THashMap<ui64, TShardInfo> ShardsInfo;
13741373
i64 Memory = 0;
13751374
ui64 NextCookie = 1;
1375+
ui64 PendingBatches = 0;
13761376
bool Closed = false;
13771377
};
13781378

0 commit comments

Comments
 (0)