Skip to content

Commit e67ef35

Browse files
committed
[ICRDMA] Handle allocation failure in case of event serialization (#29029)
1 parent 1ade07e commit e67ef35

File tree

5 files changed

+44
-43
lines changed

5 files changed

+44
-43
lines changed

ydb/library/actors/core/event.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
#include <util/system/hp_timer.h>
1313
#include <util/generic/maybe.h>
1414

15+
namespace NInterconnect::NRdma {
16+
class IMemPool;
17+
}
18+
1519
namespace NActors {
1620
class TChunkSerializer;
1721
class IActor;
@@ -39,7 +43,7 @@ namespace NActors {
3943
}
4044
virtual ui32 Type() const = 0;
4145
virtual bool SerializeToArcadiaStream(TChunkSerializer*) const = 0;
42-
virtual std::optional<TRope> SerializeToRope(std::function<TRcBuf(ui32 size)>) const {
46+
virtual std::optional<TRope> SerializeToRope(NInterconnect::NRdma::IMemPool*) const {
4347
return std::nullopt;
4448
}
4549
virtual bool IsSerializable() const = 0;

ydb/library/actors/core/event_pb.cpp

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -333,30 +333,40 @@ namespace NActors {
333333
return true;
334334
}
335335

336-
std::optional<TRope> SerializeToRopeImpl(std::function<TRcBuf(ui32 size)> alloc, const TVector<TRope> &payload) {
336+
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, NInterconnect::NRdma::IMemPool* pool) {
337337
TRope result;
338338
auto sz = CalculateSerializedHeaderSizeImpl(payload);
339-
if (!sz) {
340-
return result;
341-
}
342-
TRcBuf headerBuf = alloc(sz);
343-
if (!headerBuf) {
344-
return {};
339+
if (sz) {
340+
std::optional<TRcBuf> headerBuf = pool->AllocRcBuf(sz, NInterconnect::NRdma::IMemPool::EMPTY);
341+
if (!headerBuf) {
342+
return {};
343+
}
344+
char* data = headerBuf->GetDataMut();
345+
auto append = [&data](const char *p, size_t len) {
346+
std::memcpy(data, p, len);
347+
data += len;
348+
return true;
349+
};
350+
SerializeHeaderCommon(payload, append);
351+
result.Insert(result.End(), std::move(headerBuf.value()));
352+
353+
auto appendRope = [&](TRope rope) {
354+
result.Insert(result.End(), std::move(rope));
355+
return true;
356+
};
357+
SerializePayloadCommon(payload, appendRope);
345358
}
346-
char* data = headerBuf.GetDataMut();
347-
auto append = [&data](const char *p, size_t len) {
348-
std::memcpy(data, p, len);
349-
data += len;
350-
return true;
351-
};
352-
SerializeHeaderCommon(payload, append);
353-
result.Insert(result.End(), headerBuf);
354359

355-
auto appendRope = [&](TRope rope) {
356-
result.Insert(result.End(), std::move(rope));
357-
return true;
358-
};
359-
SerializePayloadCommon(payload, appendRope);
360+
{
361+
ui32 size = msg.ByteSizeLong();
362+
std::optional<TRcBuf> recordsSerializedBuf = pool->AllocRcBuf(size, NInterconnect::NRdma::IMemPool::EMPTY);
363+
if (!recordsSerializedBuf) {
364+
return {};
365+
}
366+
bool serializationDone = msg.SerializePartialToArray(recordsSerializedBuf->GetDataMut(), size);
367+
Y_ABORT_UNLESS(serializationDone);
368+
result.Insert(result.End(), std::move(recordsSerializedBuf.value()));
369+
}
360370

361371
return result;
362372
}

ydb/library/actors/core/event_pb.h

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ namespace NActors {
152152
void ParseExtendedFormatPayload(TRope::TConstIterator &iter, size_t &size, TVector<TRope> &payload, size_t &totalPayloadSize);
153153
bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TVector<TRope> &payload);
154154
ui32 CalculateSerializedHeaderSizeImpl(const TVector<TRope> &payload);
155-
std::optional<TRope> SerializeToRopeImpl(std::function<TRcBuf(ui32 size)> alloc, const TVector<TRope> &payload);
155+
std::optional<TRope> SerializeToRopeImpl(const google::protobuf::MessageLite& msg, const TVector<TRope>& payload, NInterconnect::NRdma::IMemPool* pool);
156156
ui32 CalculateSerializedSizeImpl(const TVector<TRope> &payload, ssize_t recordSize);
157157
TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize, bool allowExternalDataChannel, const TVector<TRope> &payload, ssize_t recordSize);
158158

@@ -205,20 +205,8 @@ namespace NActors {
205205
return CalculateSerializedSizeImpl(Payload, Record.ByteSize());
206206
}
207207

208-
std::optional<TRope> SerializeToRope(std::function<TRcBuf(ui32 size)> alloc) const override {
209-
std::optional<TRope> result = SerializeToRopeImpl(alloc, Payload);
210-
if (!result) {
211-
return {};
212-
}
213-
ui32 size = Record.ByteSizeLong();
214-
TRcBuf recordsSerializedBuf = alloc(size);
215-
if (!recordsSerializedBuf) {
216-
return {};
217-
}
218-
bool serializationDone = Record.SerializePartialToArray(recordsSerializedBuf.GetDataMut(), size);
219-
Y_ABORT_UNLESS(serializationDone);
220-
result->Insert(result->End(), std::move(recordsSerializedBuf));
221-
return result;
208+
std::optional<TRope> SerializeToRope(NInterconnect::NRdma::IMemPool* pool) const override {
209+
return NActors::SerializeToRopeImpl(Record, Payload, pool);
222210
}
223211

224212
static TEv* Load(const TEventSerializedData *input) {

ydb/library/actors/interconnect/interconnect_channel.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -328,9 +328,7 @@ namespace NActors {
328328
ui32* checksum, ssize_t rdmaDeviceIndex)
329329
{
330330
if (!event.Buffer && event.Event) {
331-
std::optional<TRope> rope = event.Event->SerializeToRope(
332-
[&](ui32 size) -> TRcBuf { return RdmaMemPool->AllocRcBuf(size, NInterconnect::NRdma::IMemPool::EMPTY).value(); }
333-
);
331+
std::optional<TRope> rope = event.Event->SerializeToRope(RdmaMemPool.get());
334332
if (!rope) {
335333
return false; // serialization failed
336334
}

ydb/library/actors/interconnect/ut_rdma/rdma_xdc_ut.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include <ydb/library/actors/core/event_pb.h>
22
#include <ydb/library/actors/interconnect/rdma/ut/utils/utils.h>
3+
#include <ydb/library/actors/interconnect/rdma/mem_pool.h>
34

45
#include <library/cpp/testing/gtest/gtest.h>
56

@@ -254,10 +255,10 @@ TEST_F(XdcRdmaTest, SerializeToRope) {
254255
totalXdcSize += len;
255256
}
256257

257-
auto allocRcBuf = [](ui32 size) {
258-
return TRcBuf::Uninitialized(size);
259-
};
260-
auto serializedRope = ev->SerializeToRope(allocRcBuf);
258+
auto mempool = NInterconnect::NRdma::CreateSlotMemPool(nullptr);
259+
260+
auto serializedRope = ev->SerializeToRope(mempool.get());
261+
261262
ASSERT_TRUE(serializedRope.has_value());
262263
auto rope = serializedRope->ConvertToString();
263264
// 6 1 -120 39 88x5000 8 123 18 11 104 101 108 108 111 32 119 111 114 108 100

0 commit comments

Comments
 (0)