11#include " event_pb.h"
22
3+ #include < ydb/library/actors/interconnect/rdma/mem_pool.h>
4+ #include < ydb/library/actors/protos/interconnect.pb.h>
5+
36namespace NActors {
47 TString EventPBBaseToString (const TString& header, const TString& dbgStr) {
58 TString res;
@@ -259,26 +262,9 @@ namespace NActors {
259262 return res;
260263 }
261264
262- bool SerializeToArcadiaStreamImpl (TChunkSerializer* chunker, const TVector<TRope> &payload) {
263- // serialize payload first
265+ template < typename TCb>
266+ bool SerializeHeaderCommon ( const TVector<TRope>& payload, TCb& append) {
264267 if (payload) {
265- void *data;
266- int size = 0 ;
267- auto append = [&](const char *p, size_t len) {
268- while (len) {
269- if (size) {
270- const size_t numBytesToCopy = std::min<size_t >(size, len);
271- memcpy (data, p, numBytesToCopy);
272- data = static_cast <char *>(data) + numBytesToCopy;
273- size -= numBytesToCopy;
274- p += numBytesToCopy;
275- len -= numBytesToCopy;
276- } else if (!chunker->Next (&data, &size)) {
277- return false ;
278- }
279- }
280- return true ;
281- };
282268 auto appendNumber = [&](size_t number) {
283269 char buf[MaxNumberBytes];
284270 return append (buf, SerializeNumber (number, buf));
@@ -294,19 +280,86 @@ namespace NActors {
294280 return false ;
295281 }
296282 }
297- if (size) {
298- chunker->BackUp (std::exchange (size, 0 ));
283+ }
284+
285+ return true ;
286+ }
287+
288+ bool SerializePayloadCommon (const TVector<TRope> &payload, std::function<bool (TRope)> append) {
289+ for (const TRope& rope : payload) {
290+ if (!append (rope)) {
291+ return false ;
299292 }
300- for (const TRope& rope : payload) {
301- if (!chunker->WriteRope (&rope)) {
293+ }
294+ return true ;
295+ }
296+
297+ bool SerializeToArcadiaStreamImpl (TChunkSerializer* chunker, const TVector<TRope> &payload) {
298+ // serialize payload first
299+ void *data;
300+ int size = 0 ;
301+ auto append = [&](const char *p, size_t len) {
302+ while (len) {
303+ if (size) {
304+ const size_t numBytesToCopy = std::min<size_t >(size, len);
305+ memcpy (data, p, numBytesToCopy);
306+ data = static_cast <char *>(data) + numBytesToCopy;
307+ size -= numBytesToCopy;
308+ p += numBytesToCopy;
309+ len -= numBytesToCopy;
310+ } else if (!chunker->Next (&data, &size)) {
302311 return false ;
303312 }
304313 }
314+ return true ;
315+ };
316+ if (!SerializeHeaderCommon (payload, append)) {
317+ return false ;
318+ }
319+ if (size) {
320+ chunker->BackUp (std::exchange (size, 0 ));
321+ }
322+
323+ auto appendRope = [&](TRope rope) {
324+ if (!chunker->WriteRope (&rope)) {
325+ return false ;
326+ }
327+ return true ;
328+ };
329+ if (!SerializePayloadCommon (payload, appendRope)) {
330+ return false ;
305331 }
306332
307333 return true ;
308334 }
309335
336+ std::optional<TRope> SerializeToRopeImpl (std::function<TRcBuf(ui32 size)> alloc, const TVector<TRope> &payload) {
337+ TRope result;
338+ auto sz = CalculateSerializedHeaderSizeImpl (payload);
339+ if (!sz) {
340+ return result;
341+ }
342+ TRcBuf headerBuf = alloc (sz);
343+ if (!headerBuf) {
344+ return {};
345+ }
346+ char * data = headerBuf.GetDataMut ();
347+ auto append = [&data](const char *p, size_t len) {
348+ std::memcpy (data, p, len);
349+ data += len;
350+ return true ;
351+ };
352+ SerializeHeaderCommon (payload, append);
353+ result.Insert (result.End (), headerBuf);
354+
355+ auto appendRope = [&](TRope rope) {
356+ result.Insert (result.End (), std::move (rope));
357+ return true ;
358+ };
359+ SerializePayloadCommon (payload, appendRope);
360+
361+ return result;
362+ }
310363
311364 void ParseExtendedFormatPayload (TRope::TConstIterator &iter, size_t &size, TVector<TRope> &payload, size_t &totalPayloadSize)
312365 {
@@ -356,23 +409,41 @@ namespace NActors {
356409 }
357410 }
358411
359- ui32 CalculateSerializedSizeImpl (const TVector<TRope> &payload, ssize_t recordSize ) {
360- ssize_t result = recordSize ;
361- if (result >= 0 && payload) {
412+ ui32 CalculateSerializedHeaderSizeImpl (const TVector<TRope> &payload) {
413+ ui32 result = 0 ;
414+ if (payload) {
362415 ++result; // marker
363416 char buf[MaxNumberBytes];
364417 result += SerializeNumber (payload.size (), buf);
365- size_t totalPayloadSize = 0 ;
366418 for (const TRope& rope : payload) {
367419 size_t ropeSize = rope.GetSize ();
368- totalPayloadSize += ropeSize;
369420 result += SerializeNumber (ropeSize, buf);
370421 }
371- result += totalPayloadSize;
372422 }
373423 return result;
374424 }
375425
426+ ui32 CalculateSerializedSizeImpl (const TVector<TRope> &payload, ssize_t recordSize) {
427+ ssize_t result = recordSize;
428+ if (result >= 0 && payload) {
429+ result += CalculateSerializedHeaderSizeImpl (payload);
430+ for (const TRope& rope : payload) {
431+ result += rope.GetSize ();
432+ }
433+ }
434+ return result;
435+ }
436+
437+ bool IsRdma (const TRope &rope) {
438+ for (auto it = rope.Begin (); it != rope.End (); ++it) {
439+ const TRcBuf& chunk = it.GetChunk ();
440+ if (NInterconnect::NRdma::TryExtractFromRcBuf (chunk).Empty ()) {
441+ return false ;
442+ }
443+ }
444+ return true ;
445+ }
446+
376447 TEventSerializationInfo CreateSerializationInfoImpl (size_t preserializedSize, bool allowExternalDataChannel, const TVector<TRope> &payload, ssize_t recordSize) {
377448 TEventSerializationInfo info;
378449 info.IsExtendedFormat = static_cast <bool >(payload);
@@ -384,14 +455,14 @@ namespace NActors {
384455 for (const TRope& rope : payload) {
385456 headerLen += SerializeNumber (rope.size (), temp);
386457 }
387- info.Sections .push_back (TEventSectionInfo{0 , headerLen, 0 , 0 , true });
458+ info.Sections .push_back (TEventSectionInfo{0 , headerLen, 0 , 0 , true , true });
388459 for (const TRope& rope : payload) {
389- info.Sections .push_back (TEventSectionInfo{0 , rope.size (), 0 , 0 , false });
460+ info.Sections .push_back (TEventSectionInfo{0 , rope.size (), 0 , 0 , false , IsRdma (rope) });
390461 }
391462 }
392463
393464 const size_t byteSize = Max<ssize_t >(0 , recordSize) + preserializedSize;
394- info.Sections .push_back (TEventSectionInfo{0 , byteSize, 0 , 0 , true }); // protobuf itself
465+ info.Sections .push_back (TEventSectionInfo{0 , byteSize, 0 , 0 , true , true }); // protobuf itself
395466
396467#ifndef NDEBUG
397468 size_t total = 0 ;
0 commit comments