88
99#include < ydb/core/backup/common/checksum.h>
1010#include < ydb/core/base/appdata.h>
11+ #include < ydb/core/base/counters.h>
1112#include < ydb/core/protos/datashard_config.pb.h>
1213#include < ydb/core/protos/flat_scheme_op.pb.h>
1314#include < ydb/library/services/services.pb.h>
@@ -296,25 +297,80 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
296297 for (ui32 id : tableInfo.GetValueColumnIds (columnNames)) {
297298 rowScheme.AddValueColumnIds (id);
298299 }
300+
301+ CellBytes = 0 ;
299302 }
300303
301304 void AddRow (const TVector<TCell>& keys, const TVector<TCell>& values) {
302305 Y_ABORT_UNLESS (Record);
303306 auto & row = *Record->AddRows ();
304307 row.SetKeyColumns (TSerializedCellVec::Serialize (keys));
305308 row.SetValueColumns (TSerializedCellVec::Serialize (values));
309+
310+ for (const auto & x : keys) {
311+ CellBytes += x.Size ();
312+ }
313+ for (const auto & x : values) {
314+ CellBytes += x.Size ();
315+ }
306316 }
307317
308318 const std::shared_ptr<NKikimrTxDataShard::TEvUploadRowsRequest>& GetRecord () {
309319 Y_ABORT_UNLESS (Record);
310320 return Record;
311321 }
312322
323+ ui64 GetCellBytes () const {
324+ return CellBytes;
325+ }
326+
313327 private:
314328 std::shared_ptr<NKikimrTxDataShard::TEvUploadRowsRequest> Record;
329+ ui64 CellBytes;
315330
316331 }; // TUploadRowsRequestBuilder
317332
333+ struct TCounters {
334+ struct TLatency {
335+ TInstant Begin;
336+ ::NMonitoring::THistogramPtr Counter;
337+
338+ explicit TLatency (::NMonitoring::THistogramPtr counter)
339+ : Counter(counter)
340+ {
341+ }
342+
343+ void Start (TInstant begin) {
344+ Begin = begin;
345+ }
346+
347+ void Finish (TInstant end) {
348+ Counter->Collect ((end - Begin).MilliSeconds ());
349+ Begin = TInstant::Zero ();
350+ }
351+ };
352+
353+ ::NMonitoring::TDynamicCounters::TCounterPtr BytesReceived;
354+ ::NMonitoring::TDynamicCounters::TCounterPtr BytesWritten;
355+ TLatency LatencyRead;
356+ TLatency LatencyProcess;
357+ TLatency LatencyWrite;
358+
359+ explicit TCounters (::NMonitoring::TDynamicCounterPtr counters)
360+ : BytesReceived(counters->GetCounter (" BytesReceived" , true ))
361+ , BytesWritten(counters->GetCounter (" BytesWritten" , true ))
362+ , LatencyRead(counters->GetHistogram (" LatencyReadMs" , ::NMonitoring::ExponentialHistogram(10 , 4 , 1 )))
363+ , LatencyProcess(counters->GetHistogram (" LatencyProcessMs" , ::NMonitoring::ExponentialHistogram(10 , 4 , 1 )))
364+ , LatencyWrite(counters->GetHistogram (" LatencyWriteMs" , ::NMonitoring::ExponentialHistogram(10 , 4 , 1 )))
365+ {
366+ }
367+
368+ }; // TCounters
369+
370+ static TInstant Now () {
371+ return TlsActivationContext->Now ();
372+ }
373+
318374 void AllocateResource () {
319375 IMPORT_LOG_D (" AllocateResource" );
320376
@@ -347,7 +403,6 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
347403 Send (client, new TEvents::TEvPoisonPill ());
348404 }
349405
350-
351406 Client = RegisterWithSameMailbox (CreateS3Wrapper (ExternalStorageConfig->ConstructStorageOperator ()));
352407
353408 HeadObject (Settings.GetDataKey (DataFormat, CompressionCodec));
@@ -490,6 +545,9 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
490545 << " , content-length# " << ContentLength
491546 << " , body-size# " << msg.Body .size ());
492547
548+ *Counters.BytesReceived += msg.Body .size ();
549+ Counters.LatencyRead .Finish (Now ());
550+
493551 Reader->Feed (std::move (msg.Body ));
494552 Process ();
495553 }
@@ -534,6 +592,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
534592
535593 case TReadController::NOT_ENOUGH_DATA:
536594 if (SumWithSaturation (ProcessedBytes, Reader->PendingBytes ()) < ContentLength) {
595+ Counters.LatencyRead .Start (Now ());
537596 return GetObject (Settings.GetDataKey (DataFormat, CompressionCodec),
538597 Reader->NextRange (ContentLength, ProcessedBytes));
539598 } else {
@@ -546,6 +605,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
546605 << " : " << error);
547606 }
548607
608+ Counters.LatencyProcess .Start (Now ());
609+
549610 if (Checksum) {
550611 Checksum->AddData (data);
551612 }
@@ -615,6 +676,9 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
615676 << " : count# " << record->RowsSize ()
616677 << " , size# " << record->ByteSizeLong ());
617678
679+ Counters.LatencyProcess .Finish (Now ());
680+ Counters.LatencyWrite .Start (Now ());
681+
618682 Send (DataShard, new TEvDataShard::TEvS3UploadRowsRequest (TxId, record, {
619683 ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState ()
620684 }));
@@ -623,6 +687,9 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
623687 void Handle (TEvDataShard::TEvS3UploadRowsResponse::TPtr& ev) {
624688 IMPORT_LOG_D (" Handle " << ev->Get ()->ToString ());
625689
690+ *Counters.BytesWritten += RequestBuilder.GetCellBytes ();
691+ Counters.LatencyWrite .Finish (Now ());
692+
626693 const auto & record = ev->Get ()->Record ;
627694 switch (record.GetStatus ()) {
628695 case NKikimrTxDataShard::TError::OK:
@@ -816,6 +883,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
816883 , ReadBatchSize(task.GetS3Settings().GetLimits().GetReadBatchSize())
817884 , ReadBufferSizeLimit(AppData()->DataShardConfig.GetRestoreReadBufferSizeLimit())
818885 , Checksum(task.GetValidateChecksums() ? CreateChecksum() : nullptr)
886+ , Counters(GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup(" subsystem" , " import" ))
819887 {
820888 }
821889
@@ -895,6 +963,9 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
895963
896964 NBackup::IChecksum::TPtr Checksum;
897965 TString ExpectedChecksum;
966+
967+ TCounters Counters;
968+
898969}; // TS3Downloader
899970
900971IActor* CreateS3Downloader (const TActorId& dataShard, ui64 txId, const NKikimrSchemeOp::TRestoreTask& task, const TTableInfo& info) {
0 commit comments