Skip to content

Commit b5faea4

Browse files
authored
Column tables fixes 25 3 (#29007)
2 parents f518d0f + 1b6ea51 commit b5faea4

File tree

48 files changed

+368
-157
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+368
-157
lines changed

ydb/core/formats/arrow/accessor/dictionary/constructor.cpp

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,12 @@ TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromStrin
4545
std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", externalInfo.GetColumnType()) }));
4646
auto resultVariants = externalInfo.GetDefaultSerializer()->Deserialize(TString(blobVariants.data(), blobVariants.size()), schemaVariants);
4747
if (!resultVariants.ok()) {
48-
return TConclusionStatus::Fail(
49-
"cannot parse dictionary variants: " + resultVariants.status().ToString() + " as " + externalInfo.GetColumnType()->ToString());
48+
return TConclusionStatus::Fail(TStringBuilder{}
49+
<< "Internal deserialization error. type: dictionary (schema variants), schema: " << schemaVariants->ToString()
50+
<< " records count: " << externalInfo.GetRecordsCount()
51+
<< " not null records count: " << (externalInfo.GetNotNullRecordsCount() ? ToString(*externalInfo.GetNotNullRecordsCount()) : TString{"unknown"})
52+
<< " reason: " << resultVariants.status().ToString()
53+
<< " original data: " << Base64Encode(TString(blobVariants.data(), blobVariants.size())));
5054
}
5155
auto rbVariants = TStatusValidator::GetValid(resultVariants);
5256
AFL_VERIFY(rbVariants->num_columns() == 1);
@@ -55,7 +59,13 @@ TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromStrin
5559
auto schemaRecords = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", type) }));
5660
auto resultRecords = externalInfo.GetDefaultSerializer()->Deserialize(TString(blobRecords.data(), blobRecords.size()), schemaRecords);
5761
if (!resultRecords.ok()) {
58-
return TConclusionStatus::Fail(resultRecords.status().ToString());
62+
return TConclusionStatus::Fail(TStringBuilder{}
63+
<< "Internal deserialization error. type: dictionary (schema records), schema: " << schemaRecords->ToString()
64+
<< " records count: " << externalInfo.GetRecordsCount()
65+
<< " not null records count: " << (externalInfo.GetNotNullRecordsCount() ? ToString(*externalInfo.GetNotNullRecordsCount()) : TString{"unknown"})
66+
<< " variants count: " << rbVariants->num_rows()
67+
<< " reason: " << resultRecords.status().ToString()
68+
<< " original data: " << Base64Encode(TString(blobRecords.data(), blobRecords.size())));
5969
}
6070
auto rbRecords = TStatusValidator::GetValid(resultRecords);
6171
AFL_VERIFY(rbRecords->num_columns() == 1);

ydb/core/formats/arrow/accessor/plain/constructor.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@ TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromStrin
1717
auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", externalInfo.GetColumnType()) }));
1818
auto result = externalInfo.GetDefaultSerializer()->Deserialize(originalData, schema);
1919
if (!result.ok()) {
20-
return TConclusionStatus::Fail(result.status().ToString());
20+
return TConclusionStatus::Fail(TStringBuilder{}
21+
<< "Internal deserialization error. type: plain, schema: " << schema->ToString()
22+
<< " records count: " << externalInfo.GetRecordsCount()
23+
<< " not null records count: " << (externalInfo.GetNotNullRecordsCount() ? ToString(*externalInfo.GetNotNullRecordsCount()) : TString{"unknown"})
24+
<< " reason: " << result.status().ToString()
25+
<< " original data: " << Base64Encode(originalData));
2126
}
2227
auto rb = TStatusValidator::GetValid(result);
2328
AFL_VERIFY(rb->num_columns() == 1)("count", rb->num_columns())("schema", schema->ToString());

ydb/core/formats/arrow/accessor/sparsed/constructor.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@ TConclusion<std::shared_ptr<IChunkedArray>> TConstructor::DoDeserializeFromStrin
1616
auto schema = std::make_shared<arrow::Schema>(fields);
1717
auto rbParsed = externalInfo.GetDefaultSerializer()->Deserialize(originalData, schema);
1818
if (!rbParsed.ok()) {
19-
return TConclusionStatus::Fail(rbParsed.status().ToString());
19+
return TConclusionStatus::Fail(TStringBuilder{}
20+
<< "Internal deserialization error. type: sparsed, schema: " << schema->ToString()
21+
<< " records count: " << externalInfo.GetRecordsCount()
22+
<< " not null records count: " << (externalInfo.GetNotNullRecordsCount() ? ToString(*externalInfo.GetNotNullRecordsCount()) : TString{"unknown"})
23+
<< " reason: " << rbParsed.status().ToString()
24+
<< " original data: " << Base64Encode(originalData));
2025
}
2126
auto rb = *rbParsed;
2227
AFL_VERIFY(rb->num_columns() == 2)("count", rb->num_columns())("schema", rb->schema()->ToString());

ydb/core/formats/arrow/program/original.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ TConclusion<IResourceProcessor::EExecutionResult> TOriginalColumnDataProcessor::
99
if (!source) {
1010
return TConclusionStatus::Fail("source was destroyed before (original fetch start)");
1111
}
12+
THashSet<uint32_t> uniqueEntityIds;
1213
std::vector<std::shared_ptr<IFetchLogic>> logic;
1314
for (auto&& [_, i] : DataAddresses) {
1415
auto acc = context.GetResources().GetAccessorOptional(i.GetColumnId());
@@ -28,6 +29,11 @@ TConclusion<IResourceProcessor::EExecutionResult> TOriginalColumnDataProcessor::
2829
if (conclusion.IsFail()) {
2930
return conclusion;
3031
} else if (!!conclusion.GetResult()) {
32+
auto entityId = conclusion.GetResult()->GetEntityId();
33+
auto [_, emplaced] = uniqueEntityIds.emplace(entityId);
34+
if (!emplaced) {
35+
return TConclusionStatus::Fail(TStringBuilder{} << "Try to process the same entity id (data) " << entityId << " twice");
36+
}
3137
logic.emplace_back(conclusion.DetachResult());
3238
} else {
3339
continue;
@@ -39,6 +45,13 @@ TConclusion<IResourceProcessor::EExecutionResult> TOriginalColumnDataProcessor::
3945
if (conclusion.IsFail()) {
4046
return conclusion;
4147
} else {
48+
for (const auto& index: conclusion.GetResult()) {
49+
auto entityId = index->GetEntityId();
50+
auto [_, emplaced] = uniqueEntityIds.emplace(entityId);
51+
if (!emplaced) {
52+
return TConclusionStatus::Fail(TStringBuilder{} << "Try to process the same entity id (index) " << entityId << " twice");
53+
}
54+
}
4255
logic.insert(logic.end(), conclusion.GetResult().begin(), conclusion.GetResult().end());
4356
}
4457
}
@@ -50,6 +63,11 @@ TConclusion<IResourceProcessor::EExecutionResult> TOriginalColumnDataProcessor::
5063
if (conclusion.IsFail()) {
5164
return conclusion;
5265
} else if (!!conclusion.GetResult()) {
66+
auto entityId = conclusion.GetResult()->GetEntityId();
67+
auto [_, emplaced] = uniqueEntityIds.emplace(entityId);
68+
if (!emplaced) {
69+
return TConclusionStatus::Fail(TStringBuilder{} << "Try to process the same entity id (header) " << entityId << " twice");
70+
}
5371
logic.emplace_back(conclusion.DetachResult());
5472
} else {
5573
continue;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#include <ydb/core/formats/arrow/accessor/plain/accessor.h>
2+
#include <ydb/core/formats/arrow/accessor/plain/constructor.h>
3+
#include <ydb/core/formats/arrow/serializer/native.h>
4+
5+
#include <ydb/library/actors/core/log.h>
6+
7+
#include <library/cpp/testing/unittest/registar.h>
8+
#include <util/string/join.h>
9+
10+
11+
static std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray> BuildArray() {
12+
NKikimr::NArrow::NAccessor::TTrivialArray::TPlainBuilder<arrow::BinaryType> arrBuilder;
13+
arrBuilder.AddRecord(1, "b1");
14+
arrBuilder.AddRecord(3, "b2");
15+
arrBuilder.AddRecord(4, "b3");
16+
return arrBuilder.Finish(5);
17+
}
18+
19+
Y_UNIT_TEST_SUITE(Slicer) {
20+
using namespace NKikimr::NArrow;
21+
22+
Y_UNIT_TEST(SplitBySizes) {
23+
auto arr = BuildArray();
24+
NKikimr::NArrow::NAccessor::TChunkConstructionData info(
25+
arr->GetRecordsCount(), nullptr, arr->GetDataType(), NKikimr::NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer());
26+
auto serialized = NKikimr::NArrow::NAccessor::NPlain::TConstructor().SerializeToString(arr, info);
27+
const auto predSaver = [&](const std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray>& arr) {
28+
return NKikimr::NArrow::NAccessor::NPlain::TConstructor().SerializeToString(arr, info);
29+
};
30+
31+
NKikimr::NArrow::NSerialization::TNativeSerializer serializer;
32+
for (const auto& chunk: arr->SplitBySizes(predSaver, serialized, {1, 1, 1})) {
33+
auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", arrow::utf8()) }));
34+
TString serializedData = chunk.GetSerializedData();
35+
arrow::Result<std::shared_ptr<arrow::RecordBatch>> result = serializer.Deserialize(serializedData, schema);
36+
UNIT_ASSERT_C(result.status().ok(), result.status().ToString());
37+
}
38+
}
39+
}

ydb/core/formats/arrow/ut/ya.make

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ CFLAGS(
2929

3030
SRCS(
3131
ut_arrow.cpp
32-
ut_program_step.cpp
33-
ut_dictionary.cpp
3432
ut_column_filter.cpp
33+
ut_dictionary.cpp
3534
ut_hash.cpp
35+
ut_program_step.cpp
3636
ut_reader.cpp
37+
ut_slicer.cpp
3738
)
3839

3940
END()

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2138,6 +2138,7 @@ message TColumnShardConfig {
21382138
optional uint64 BadPortionSizeLimit = 51 [default = 524288];
21392139
optional uint64 BadPortionsLimit = 52;
21402140
optional bool CombineChunksInResult = 54 [default = true];
2141+
optional bool EnableParallelCompaction = 56 [default = true];
21412142
}
21422143

21432144
message TSchemeShardConfig {

ydb/core/protos/flat_scheme_op.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,7 @@ message TCompactionLevelConstructorContainer {
538538
optional uint64 PortionsCountAvailable = 3;
539539
optional uint64 PortionsCountLimit = 4;
540540
optional uint64 PortionsSizeLimit = 5;
541+
optional uint64 Concurrency = 6;
541542
}
542543

543544
message TOneLayer {

ydb/core/tx/columnshard/background_controller.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
namespace NKikimr::NColumnShard {
55

66
bool TBackgroundController::StartCompaction(const TInternalPathId pathId, const TString& taskId) {
7-
auto [it, _] = ActiveCompactionInfo.emplace(pathId, NOlap::TPlanCompactionInfo{ pathId, taskId });
7+
auto [it, _] = ActiveCompactionInfo.emplace(std::make_pair(pathId, taskId), NOlap::TPlanCompactionInfo{ pathId, taskId });
88
it->second.Start();
99
return true;
1010
}
1111

12-
void TBackgroundController::FinishCompaction(const TInternalPathId pathId) {
13-
auto it = ActiveCompactionInfo.find(pathId);
12+
void TBackgroundController::FinishCompaction(const TInternalPathId pathId, const TString& taskId) {
13+
auto it = ActiveCompactionInfo.find(std::make_pair(pathId, taskId));
1414
AFL_VERIFY(it != ActiveCompactionInfo.end());
1515
if (it->second.Finish()) {
1616
ActiveCompactionInfo.erase(it);
@@ -21,7 +21,7 @@ void TBackgroundController::FinishCompaction(const TInternalPathId pathId) {
2121
void TBackgroundController::CheckDeadlines() {
2222
for (auto&& i : ActiveCompactionInfo) {
2323
if (TMonotonic::Now() - i.second.GetStartTime() > NOlap::TCompactionLimits::CompactionTimeout) {
24-
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_compaction")("path_id", i.first)("task_id", i.second.GetTaskId());
24+
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "deadline_compaction")("path_id", i.first.first)("task_id", i.second.GetTaskId());
2525
// uncomment it for debug purpose
2626
// AFL_VERIFY_DEBUG(false);
2727
}

ydb/core/tx/columnshard/background_controller.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace NKikimr::NColumnShard {
1212

1313
class TBackgroundController {
1414
private:
15-
using TCurrentCompaction = THashMap<TInternalPathId, NOlap::TPlanCompactionInfo>;
15+
using TCurrentCompaction = THashMap<std::pair<TInternalPathId, TString>, NOlap::TPlanCompactionInfo>;
1616
TCurrentCompaction ActiveCompactionInfo;
1717
std::optional<ui64> WaitingCompactionPriority;
1818

@@ -60,7 +60,7 @@ class TBackgroundController {
6060
void CheckDeadlines();
6161

6262
bool StartCompaction(const TInternalPathId pathId, const TString& taskId);
63-
void FinishCompaction(const TInternalPathId pathId);
63+
void FinishCompaction(const TInternalPathId pathId, const TString& taskId);
6464

6565
ui32 GetCompactionsCount() const {
6666
return ActiveCompactionInfo.size();

0 commit comments

Comments
 (0)