11#include " event_pb.h"
22
3+ #include < ydb/library/actors/interconnect/rdma/mem_pool.h>
4+ #include < ydb/library/actors/protos/interconnect.pb.h>
5+
36namespace NActors {
47 TString EventPBBaseToString (const TString& header, const TString& dbgStr) {
58 TString res;
@@ -264,26 +267,9 @@ namespace NActors {
264267 return res;
265268 }
266269
267- bool SerializeToArcadiaStreamImpl (TChunkSerializer* chunker, const TVector<TRope> &payload) {
268- // serialize payload first
270+ template < typename TCb>
271+ bool SerializeHeaderCommon ( const TVector<TRope>& payload, TCb& append) {
269272 if (payload) {
270- void *data;
271- int size = 0 ;
272- auto append = [&](const char *p, size_t len) {
273- while (len) {
274- if (size) {
275- const size_t numBytesToCopy = std::min<size_t >(size, len);
276- memcpy (data, p, numBytesToCopy);
277- data = static_cast <char *>(data) + numBytesToCopy;
278- size -= numBytesToCopy;
279- p += numBytesToCopy;
280- len -= numBytesToCopy;
281- } else if (!chunker->Next (&data, &size)) {
282- return false ;
283- }
284- }
285- return true ;
286- };
287273 auto appendNumber = [&](size_t number) {
288274 char buf[MaxNumberBytes];
289275 return append (buf, SerializeNumber (number, buf));
@@ -299,19 +285,86 @@ namespace NActors {
299285 return false ;
300286 }
301287 }
302- if (size) {
303- chunker->BackUp (std::exchange (size, 0 ));
288+ }
289+
290+ return true ;
291+ }
292+
293+ bool SerializePayloadCommon (const TVector<TRope> &payload, std::function<bool (TRope)> append) {
294+ for (const TRope& rope : payload) {
295+ if (!append (rope)) {
296+ return false ;
304297 }
305- for (const TRope& rope : payload) {
306- if (!chunker->WriteRope (&rope)) {
298+ }
299+ return true ;
300+ }
301+
302+ bool SerializeToArcadiaStreamImpl (TChunkSerializer* chunker, const TVector<TRope> &payload) {
303+ // serialize payload first
304+ void *data;
305+ int size = 0 ;
306+ auto append = [&](const char *p, size_t len) {
307+ while (len) {
308+ if (size) {
309+ const size_t numBytesToCopy = std::min<size_t >(size, len);
310+ memcpy (data, p, numBytesToCopy);
311+ data = static_cast <char *>(data) + numBytesToCopy;
312+ size -= numBytesToCopy;
313+ p += numBytesToCopy;
314+ len -= numBytesToCopy;
315+ } else if (!chunker->Next (&data, &size)) {
307316 return false ;
308317 }
309318 }
319+ return true ;
320+ };
321+ if (!SerializeHeaderCommon (payload, append)) {
322+ return false ;
323+ }
324+ if (size) {
325+ chunker->BackUp (std::exchange (size, 0 ));
326+ }
327+
328+ auto appendRope = [&](TRope rope) {
329+ if (!chunker->WriteRope (&rope)) {
330+ return false ;
331+ }
332+ return true ;
333+ };
334+ if (!SerializePayloadCommon (payload, appendRope)) {
335+ return false ;
310336 }
311337
312338 return true ;
313339 }
314340
341+ std::optional<TRope> SerializeToRopeImpl (std::function<TRcBuf(ui32 size)> alloc, const TVector<TRope> &payload) {
342+ TRope result;
343+ auto sz = CalculateSerializedHeaderSizeImpl (payload);
344+ if (!sz) {
345+ return result;
346+ }
347+ TRcBuf headerBuf = alloc (sz);
348+ if (!headerBuf) {
349+ return {};
350+ }
351+ char * data = headerBuf.GetDataMut ();
352+ auto append = [&data](const char *p, size_t len) {
353+ std::memcpy (data, p, len);
354+ data += len;
355+ return true ;
356+ };
357+ SerializeHeaderCommon (payload, append);
358+ result.Insert (result.End (), headerBuf);
359+
360+ auto appendRope = [&](TRope rope) {
361+ result.Insert (result.End (), std::move (rope));
362+ return true ;
363+ };
364+ SerializePayloadCommon (payload, appendRope);
365+
366+ return result;
367+ }
315368
316369 void ParseExtendedFormatPayload (TRope::TConstIterator &iter, size_t &size, TVector<TRope> &payload, size_t &totalPayloadSize)
317370 {
@@ -361,23 +414,41 @@ namespace NActors {
361414 }
362415 }
363416
364- ui32 CalculateSerializedSizeImpl (const TVector<TRope> &payload, ssize_t recordSize ) {
365- ssize_t result = recordSize ;
366- if (result >= 0 && payload) {
417+ ui32 CalculateSerializedHeaderSizeImpl (const TVector<TRope> &payload) {
418+ ui32 result = 0 ;
419+ if (payload) {
367420 ++result; // marker
368421 char buf[MaxNumberBytes];
369422 result += SerializeNumber (payload.size (), buf);
370- size_t totalPayloadSize = 0 ;
371423 for (const TRope& rope : payload) {
372424 size_t ropeSize = rope.GetSize ();
373- totalPayloadSize += ropeSize;
374425 result += SerializeNumber (ropeSize, buf);
375426 }
376- result += totalPayloadSize;
377427 }
378428 return result;
379429 }
380430
431+ ui32 CalculateSerializedSizeImpl (const TVector<TRope> &payload, ssize_t recordSize) {
432+ ssize_t result = recordSize;
433+ if (result >= 0 && payload) {
434+ result += CalculateSerializedHeaderSizeImpl (payload);
435+ for (const TRope& rope : payload) {
436+ result += rope.GetSize ();
437+ }
438+ }
439+ return result;
440+ }
441+
442+ bool IsRdma (const TRope &rope) {
443+ for (auto it = rope.Begin (); it != rope.End (); ++it) {
444+ const TRcBuf& chunk = it.GetChunk ();
445+ if (NInterconnect::NRdma::TryExtractFromRcBuf (chunk).Empty ()) {
446+ return false ;
447+ }
448+ }
449+ return true ;
450+ }
451+
381452 TEventSerializationInfo CreateSerializationInfoImpl (size_t preserializedSize, bool allowExternalDataChannel, const TVector<TRope> &payload, ssize_t recordSize) {
382453 TEventSerializationInfo info;
383454 info.IsExtendedFormat = static_cast <bool >(payload);
@@ -389,14 +460,14 @@ namespace NActors {
389460 for (const TRope& rope : payload) {
390461 headerLen += SerializeNumber (rope.size (), temp);
391462 }
392- info.Sections .push_back (TEventSectionInfo{0 , headerLen, 0 , 0 , true });
463+ info.Sections .push_back (TEventSectionInfo{0 , headerLen, 0 , 0 , true , true });
393464 for (const TRope& rope : payload) {
394- info.Sections .push_back (TEventSectionInfo{0 , rope.size (), 0 , 0 , false });
465+ info.Sections .push_back (TEventSectionInfo{0 , rope.size (), 0 , 0 , false , IsRdma (rope) });
395466 }
396467 }
397468
398469 const size_t byteSize = Max<ssize_t >(0 , recordSize) + preserializedSize;
399- info.Sections .push_back (TEventSectionInfo{0 , byteSize, 0 , 0 , true }); // protobuf itself
470+ info.Sections .push_back (TEventSectionInfo{0 , byteSize, 0 , 0 , true , true }); // protobuf itself
400471
401472#ifndef NDEBUG
402473 size_t total = 0 ;
0 commit comments