Skip to content

Commit 87094f8

Browse files
kirillvasilenkoKirill Vasilenko
andauthored
Make transaction not to see aborted changes (#27293)
Co-authored-by: Kirill Vasilenko <kir-vasilenko@ybd.tech>
1 parent e452dca commit 87094f8

File tree

8 files changed

+89
-18
lines changed

8 files changed

+89
-18
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -176,15 +176,14 @@ NOlap::TSnapshot TColumnShard::GetMaxReadVersion() const {
176176
auto plannedTx = ProgressTxController->GetPlannedTx();
177177
if (plannedTx) {
178178
// We may only read just before the first transaction in the queue
179-
auto maxReadVersion = TRowVersion(plannedTx->Step, plannedTx->TxId).Prev();
180-
return NOlap::TSnapshot(maxReadVersion.Step, maxReadVersion.TxId);
181-
}
182-
ui64 step = LastPlannedStep;
183-
if (MediatorTimeCastEntry) {
184-
ui64 mediatorStep = MediatorTimeCastEntry->Get(TabletID());
185-
step = Max(step, mediatorStep);
179+
auto firstPlannedSnapshot = NOlap::TSnapshot(plannedTx->Step, plannedTx->TxId);
180+
return firstPlannedSnapshot.GetPreviousSnapshot();
181+
} else {
182+
// the same snapshot is used by bulk upsert and aborts
183+
// aborts are fine, but be careful with bulk upsert,
184+
// it must correctly break conflicting serializable txs
185+
return GetCurrentSnapshotForInternalModification();
186186
}
187-
return NOlap::TSnapshot(step, Max<ui64>());
188187
}
189188

190189
ui64 TColumnShard::GetOutdatedStep() const {

ydb/core/tx/columnshard/common/snapshot.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class TSnapshot {
5050
}
5151

5252
constexpr bool Valid() const noexcept {
53-
return PlanStep && TxId;
53+
return PlanStep != 0 && TxId != 0;
5454
}
5555

5656
static constexpr TSnapshot Zero() noexcept {
@@ -106,6 +106,15 @@ class TSnapshot {
106106
explicit operator size_t() const {
107107
return CombineHashes(PlanStep, TxId);
108108
}
109+
110+
TSnapshot GetPreviousSnapshot() const {
111+
AFL_VERIFY(Valid());
112+
if (TxId == 0) {
113+
return TSnapshot(PlanStep - 1, ::Max<ui64>());
114+
} else {
115+
return TSnapshot(PlanStep, TxId - 1);
116+
}
117+
}
109118
};
110119

111120
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ class IDataSource: public ICursorEntity, public NArrow::NSSA::IDataSource {
447447
return false;
448448
}
449449
if (DoAddTxConflict()) {
450+
StageData->Clear();
450451
StageData->Abort();
451452
return true;
452453
}

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(
4040
portionState = GetPortionStateAtScanStart(portion->GetPortionInfo());
4141
}
4242

43-
const bool needSnapshots = GetReadMetadata()->GetRequestSnapshot() < portionState.MaxRecordSnapshot || portionState.Conflicting;
43+
const bool needConflictDetector = portionState.Conflicting;
4444

4545
const bool useIndexes = false;
4646
const bool hasDeletions = source->GetHasDeletions();
@@ -54,21 +54,21 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(
5454

5555
const bool preventDuplicates = NeedDuplicateFiltering() && !portionState.Conflicting;
5656
{
57-
auto& result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
57+
auto& result = CacheFetchingScripts[needConflictDetector ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
5858
[hasDeletions ? 1 : 0][preventDuplicates ? 1 : 0];
5959
if (result.NeedInitialization()) {
6060
TGuard<TMutex> g(Mutex);
6161
if (auto gInit = result.StartInitialization()) {
6262
gInit->InitializationFinished(BuildColumnsFetchingPlan(
63-
needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions, preventDuplicates, isFinalSyncPoint));
63+
needConflictDetector, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions, preventDuplicates, isFinalSyncPoint));
6464
}
6565
AFL_VERIFY(!result.NeedInitialization());
6666
}
6767
return result.GetScriptVerified();
6868
}
6969
}
7070

71-
std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt,
71+
std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needConflictDetector, const bool partialUsageByPredicateExt,
7272
const bool /*useIndexes*/, const bool needFilterSharding, const bool needFilterDeletion, const bool preventDuplicates,
7373
const bool isFinalSyncPoint) const {
7474
const bool partialUsageByPredicate = partialUsageByPredicateExt && GetPredicateColumns()->GetColumnsCount();
@@ -90,7 +90,7 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
9090
if (partialUsageByPredicate) {
9191
acc.AddFetchingStep(*GetPredicateColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
9292
}
93-
if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) {
93+
if (needConflictDetector || GetFFColumns()->Cross(*GetSpecColumns())) {
9494
acc.AddFetchingStep(*GetSpecColumns(), NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter);
9595
}
9696
if (needFilterDeletion) {
@@ -101,9 +101,9 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
101101
acc.AddAssembleStep(*GetPredicateColumns(), "PREDICATE", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
102102
acc.AddStep(std::make_shared<TPredicateFilter>());
103103
}
104-
if (needSnapshots || GetFFColumns()->Cross(*GetSpecColumns())) {
104+
if (needConflictDetector || GetFFColumns()->Cross(*GetSpecColumns())) {
105105
acc.AddAssembleStep(*GetSpecColumns(), "SPEC", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
106-
acc.AddStep(std::make_shared<TSnapshotFilter>());
106+
acc.AddStep(std::make_shared<TConflictDetector>());
107107
}
108108
if (preventDuplicates) {
109109
acc.AddStep(std::make_shared<TDuplicateFilter>());

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "source.h"
44

55
#include <ydb/core/tx/columnshard/engines/filter.h>
6+
#include <ydb/core/tx/columnshard/engines/portions/written.h>
67
#include <ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h>
78
#include <ydb/core/tx/conveyor_composite/usage/service.h>
89
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
@@ -21,6 +22,48 @@ TConclusion<bool> TPredicateFilter::DoExecuteInplace(const std::shared_ptr<NComm
2122
return true;
2223
}
2324

25+
void VerifyConflictingPortion(const std::shared_ptr<NCommon::IDataSource>& source) {
26+
// the portion must be a simple portion
27+
AFL_VERIFY(source->GetType() == IDataSource::EType::SimplePortion);
28+
auto* portionSource = static_cast<TPortionDataSource*>(source.get());
29+
auto& info = portionSource->GetPortionInfo();
30+
auto status = portionSource->GetContext()->GetPortionStateAtScanStart(info);
31+
32+
// let's check that the portion state is ok
33+
// we may have here only written portions (not compacted)
34+
AFL_VERIFY(info.GetPortionType() == EPortionType::Written);
35+
const auto& wPortionInfo = static_cast<const TWrittenPortionInfo&>(info);
36+
// we may have here only conflicting portions
37+
AFL_VERIFY(status.Conflicting);
38+
const auto& requestSnapshot = source->GetContext()->GetReadMetadata()->GetRequestSnapshot();
39+
// if portion was already committed at the scan start, it must have commit snapshot greater than the request snapshot
40+
if (status.Committed) {
41+
AFL_VERIFY(wPortionInfo.GetCommitSnapshotVerified() > requestSnapshot)("error", "portion was committed and conflicting at the scan start, but has commit snapshot less than the request snapshot")("portion_info", wPortionInfo.DebugString())("request_snapshot", requestSnapshot.DebugString());
42+
} else {
43+
// if the portion was not committed it means now it may be:
44+
// 1. still not committed
45+
if (!wPortionInfo.IsCommitted()) {
46+
// do nothing, it is just fine
47+
// 2. committed and removed, in this case its snapshot must be greater or equal to the request snapshot
48+
} else if (wPortionInfo.HasRemoveSnapshot()) {
49+
AFL_VERIFY(wPortionInfo.GetCommitSnapshotVerified() >= requestSnapshot)("error", "portion was committed and conflicting at the scan start, but now it is removed and committed and has commit snapshot less than the request snapshot")("portion_info", wPortionInfo.DebugString())("request_snapshot", requestSnapshot.DebugString());
50+
// 3. committed and not removed, in this case its snapshot must be greater than the request snapshot
51+
} else {
52+
AFL_VERIFY(wPortionInfo.GetCommitSnapshotVerified() > requestSnapshot)("error", "portion was committed and conflicting at the scan start, but now it is committed and has commit snapshot less than the request snapshot")("portion_info", wPortionInfo.DebugString())("request_snapshot", requestSnapshot.DebugString());
53+
}
54+
}
55+
// source must not be empty, we will mark it as conflicting
56+
AFL_VERIFY(source->GetRecordsCount() > 0)("error", "source has no records");
57+
}
58+
59+
TConclusion<bool> TConflictDetector::DoExecuteInplace(
60+
const std::shared_ptr<NCommon::IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
61+
VerifyConflictingPortion(source);
62+
// it is not empty (not filtered everything out by other filters) and conflicting, so we must mark the conflict here
63+
AFL_VERIFY(source->AddTxConflict());
64+
return true;
65+
}
66+
2467
TConclusion<bool> TSnapshotFilter::DoExecuteInplace(
2568
const std::shared_ptr<NCommon::IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
2669
auto filter =

ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,18 @@ class TPredicateFilter: public IFetchingStep {
212212
}
213213
};
214214

215+
class TConflictDetector: public IFetchingStep {
216+
private:
217+
using TBase = IFetchingStep;
218+
219+
public:
220+
virtual TConclusion<bool> DoExecuteInplace(
221+
const std::shared_ptr<NCommon::IDataSource>& source, const TFetchingScriptCursor& step) const override;
222+
TConflictDetector()
223+
: TBase("CONFLICT_DETECTOR") {
224+
}
225+
};
226+
215227
class TSnapshotFilter: public IFetchingStep {
216228
private:
217229
using TBase = IFetchingStep;

ydb/core/tx/columnshard/engines/storage/granule/granule.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,14 @@ class TGranuleMeta: TNonCopyable {
225225
auto it = InsertedPortions.find(insertWriteId);
226226
AFL_VERIFY(it != InsertedPortions.end());
227227
AFL_VERIFY(InsertedPortionsById.contains(it->second->GetPortionId()));
228-
it->second->SetCommitSnapshot(ssRemove);
228+
// it is better to set remove snapshot before the commit snapshot
229+
// because otherwise concurrent readers may see the portion as just committed while
230+
// the commit snapshot is already set, but the remove snapshot is not set yet.
231+
// this problem should be addressed properly by a synchronized (or atomic) access
232+
// to this part of the portion info state https://github.com/ydb-platform/ydb/issues/27205.
233+
// until then, this workaround is better than nothing.
229234
it->second->SetRemoveSnapshot(ssRemove);
235+
it->second->SetCommitSnapshot(ssRemove);
230236
TDbWrapper wrapper(txc.DB, nullptr);
231237
it->second->CommitToDatabase(wrapper);
232238
}

ydb/core/tx/columnshard/operations/write.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,10 @@ void TWriteOperation::AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::T
126126
TBlobGroupSelector dsGroupSelector(owner.Info());
127127
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
128128

129+
auto abortSnapshot = owner.GetCurrentSnapshotForInternalModification();
129130
for (auto&& i : InsertWriteIds) {
130131
owner.MutableIndexAs<NOlap::TColumnEngineForLogs>().MutableGranuleVerified(PathId.InternalPathId).AbortPortionOnExecute(
131-
txc, i, owner.GetCurrentSnapshotForInternalModification());
132+
txc, i, abortSnapshot);
132133
}
133134
}
134135

0 commit comments

Comments
 (0)