From 80a7702981c4e7c327839f15ee3460a1e687d2b6 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 7 Nov 2025 15:25:47 +0300 Subject: [PATCH 1/4] http_gateway: wake up poll when we released inflight buffers There can be some spurious/double wakeup, but this should not matter much --- .../common/http_gateway/yql_http_gateway.cpp | 70 +++++++++++++------ .../common/http_gateway/yql_http_gateway.h | 4 +- 2 files changed, 50 insertions(+), 24 deletions(-) diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 45b446879f2c..27447a0c2854 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -480,6 +480,8 @@ class TEasyCurlStream : public TEasyCurl { IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, + std::weak_ptr handle, + size_t threshold, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, std::move(headers), EMethod::GET, offset, sizeLimit, 0ULL, std::move(config), std::move(dnsCache)) @@ -488,6 +490,8 @@ class TEasyCurlStream : public TEasyCurl { , OnFinish(std::move(onFinish)) , Counter(std::make_shared(0ULL)) , InflightCounter(inflightCounter) + , Handle(std::move(handle)) + , Threshold(threshold) {} static TPtr Make( @@ -502,10 +506,12 @@ class TEasyCurlStream : public TEasyCurl { IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, + std::weak_ptr handle = {}, + size_t threshold = 0, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) { - return std::make_shared(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, std::move(config), std::move(dnsCache)); + return std::make_shared(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, handle, threshold, std::move(config), std::move(dnsCache)); } enum class EAction : i8 { @@ -565,8 +571,9 @@ class TEasyCurlStream : public TEasyCurl { size_t Write(void* contents, size_t size, size_t nmemb) final { MaybeStart(CURLE_OK); const auto realsize = size * nmemb; - if (!Cancelled) - OnNewData(IHTTPGateway::TCountedContent(TString(static_cast(contents), realsize), Counter, InflightCounter)); + if (!Cancelled) { + OnNewData(IHTTPGateway::TCountedContent(TString(static_cast(contents), realsize), Counter, InflightCounter, Handle, Threshold)); + } return realsize; } @@ -583,6 +590,8 @@ class TEasyCurlStream : public TEasyCurl { bool Paused = false; bool Cancelled = false; long HttpResponseCode = 0L; + std::weak_ptr Handle; + size_t Threshold; }; using TKeyType = std::tuple; @@ -676,7 +685,7 @@ friend class IHTTPGateway; } ~THTTPMultiGateway() { - curl_multi_wakeup(Handle); + curl_multi_wakeup(Handle.get()); IsStopped = true; if (Thread.joinable()) { Thread.join(); @@ -695,7 +704,12 @@ friend class IHTTPGateway; if (globalInitResult != CURLE_OK) { throw yexception() << "curl_global_init error " << int(globalInitResult) << ": " << curl_easy_strerror(globalInitResult) << Endl; } - Handle = curl_multi_init(); + Handle = std::shared_ptr(curl_multi_init(), [](auto handle) { + const CURLMcode multiCleanupResult = curl_multi_cleanup(handle); + if (multiCleanupResult != CURLM_OK) { + Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl; + } + }); if (!Handle) { throw yexception() << "curl_multi_init error"; } @@ -703,10 +717,9 @@ friend class IHTTPGateway; void UninitCurl() { Y_ABORT_UNLESS(Handle); - const CURLMcode multiCleanupResult = curl_multi_cleanup(Handle); - if (multiCleanupResult != CURLM_OK) { - Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl; - } + auto weakHandle = std::weak_ptr(Handle); + Handle.reset(); + while (!weakHandle.expired()) {} // short busy-wait in case of TCountedContent curl_global_cleanup(); } @@ -722,14 +735,14 @@ friend class IHTTPGateway; OutputMemory->Set(OutputSize); int running = 0; - if (const auto c = curl_multi_perform(Handle, &running); CURLM_OK != c) { + if (const auto c = curl_multi_perform(Handle.get(), &running); CURLM_OK != c) { Fail(c); break; } if (running < int(handlers)) { for (int messages = int(handlers) - running; messages;) { - if (const auto msg = curl_multi_info_read(Handle, &messages)) { + if (const auto msg = curl_multi_info_read(Handle.get(), &messages)) { if(msg->msg == CURLMSG_DONE) { Done(msg->easy_handle, msg->data.result); } @@ -737,7 +750,7 @@ friend class IHTTPGateway; } } else { const int timeoutMs = 300; - if (const auto c = curl_multi_poll(Handle, nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) { + if (const auto c = curl_multi_poll(Handle.get(), nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) { Fail(c); break; } @@ -752,7 +765,7 @@ friend class IHTTPGateway; const auto streamHandle = stream->GetHandle(); switch (stream->GetAction(BuffersSizePerStream)) { case TEasyCurlStream::EAction::Init: - curl_multi_add_handle(Handle, streamHandle); + curl_multi_add_handle(Handle.get(), streamHandle); break; case TEasyCurlStream::EAction::Work: curl_easy_pause(streamHandle, CURLPAUSE_RECV_CONT); @@ -761,7 +774,7 @@ friend class IHTTPGateway; curl_easy_pause(streamHandle, CURL_WRITEFUNC_PAUSE); break; case TEasyCurlStream::EAction::Drop: - curl_multi_remove_handle(Handle, streamHandle); + curl_multi_remove_handle(Handle.get(), streamHandle); Allocated.erase(streamHandle); break; case TEasyCurlStream::EAction::None: @@ -784,7 +797,7 @@ friend class IHTTPGateway; const auto handle = Await.front()->GetHandle(); Allocated.emplace(handle, std::move(Await.front())); Await.pop(); - curl_multi_add_handle(Handle, handle); + curl_multi_add_handle(Handle.get(), handle); } AwaitQueue->Set(Await.size()); AllocatedMemory->Set(AllocatedSize); @@ -859,7 +872,7 @@ friend class IHTTPGateway; const TIssue error(curl_multi_strerror(result)); while (!works.empty()) { - curl_multi_remove_handle(Handle, works.top()->GetHandle()); + curl_multi_remove_handle(Handle.get(), works.top()->GetHandle()); works.top()->Fail(CURLE_OK, error); works.pop(); } @@ -914,7 +927,7 @@ friend class IHTTPGateway; TOnDownloadFinish onFinish, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final { - auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList()); + auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, Handle, BuffersSizePerStream, InitConfig, DnsGateway.GetDNSCurlList()); const std::unique_lock lock(SyncRef()); const auto handle = stream->GetHandle(); TEasyCurlStream::TWeakPtr weak = stream; @@ -942,12 +955,12 @@ friend class IHTTPGateway; void Wakeup(size_t sizeLimit) { AwaitQueue->Set(Await.size()); if (Allocated.size() < MaxHandlers && AllocatedSize + sizeLimit + OutputSize.load() <= MaxSimulatenousDownloadsSize) { - curl_multi_wakeup(Handle); + curl_multi_wakeup(Handle.get()); } } CURLM* GetHandle() const { - return Handle; + return Handle.get(); } private: @@ -955,7 +968,7 @@ friend class IHTTPGateway; return *Sync; } - CURLM* Handle = nullptr; + std::shared_ptr Handle; std::queue Await; std::vector Streams; @@ -1043,8 +1056,9 @@ IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode, con {} IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr& counter, - const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) + const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, std::weak_ptr handle, size_t threshold) : TContentBase(std::move(data)), Counter(counter), InflightCounter(inflightCounter) + , Handle(handle), Threshold(threshold) { Counter->fetch_add(size()); if (InflightCounter) { @@ -1054,14 +1068,24 @@ IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared IHTTPGateway::TCountedContent::~TCountedContent() { - Counter->fetch_sub(size()); + auto oldSize = Counter->fetch_sub(size()); + if (oldSize >= Threshold && oldSize - size() < Threshold) { + if (auto handle = Handle.lock()) { + curl_multi_wakeup(handle.get()); + } + } if (InflightCounter) { InflightCounter->Sub(size()); } } TString IHTTPGateway::TCountedContent::Extract() { - Counter->fetch_sub(size()); + auto oldSize = Counter->fetch_sub(size()); + if (oldSize >= Threshold && oldSize - size() < Threshold) { + if (auto handle = Handle.lock()) { + curl_multi_wakeup(handle.get()); + } + } if (InflightCounter) { InflightCounter->Sub(size()); } diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 2095419a6270..d1e7df9d4c1b 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -101,7 +101,7 @@ class IHTTPGateway { class TCountedContent : public TContentBase { public: - TCountedContent(TString&& data, const std::shared_ptr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter); + TCountedContent(TString&& data, const std::shared_ptr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, std::weak_ptr handle, size_t threshold); ~TCountedContent(); TCountedContent(TCountedContent&&) = default; @@ -111,6 +111,8 @@ class IHTTPGateway { private: const std::shared_ptr Counter; const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter; + std::weak_ptr Handle; + const size_t Threshold; }; using TOnDownloadStart = std::function; // http code. From 1c29e04a1ad38976ca6ca4ebdd22978ff6412758 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 7 Nov 2025 16:24:28 +0300 Subject: [PATCH 2/4] add little delay into busy-loop --- .../yql/providers/common/http_gateway/yql_http_gateway.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 27447a0c2854..a51e1686af33 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -719,7 +720,9 @@ friend class IHTTPGateway; Y_ABORT_UNLESS(Handle); auto weakHandle = std::weak_ptr(Handle); Handle.reset(); - while (!weakHandle.expired()) {} // short busy-wait in case of TCountedContent + while (!weakHandle.expired()) { // short busy-wait in unlikely case of collision with TCountedContent + Sleep(TInstant::MicroSeconds(1)); + } curl_global_cleanup(); } From c3e08d0e19ac521431a4e73306a0b1e8f37b1651 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 7 Nov 2025 16:24:58 +0300 Subject: [PATCH 3/4] refactor common code --- .../common/http_gateway/yql_http_gateway.cpp | 17 ++++++----------- .../common/http_gateway/yql_http_gateway.h | 2 ++ 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index a51e1686af33..2067ba86d85b 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -1069,8 +1069,7 @@ IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared } } -IHTTPGateway::TCountedContent::~TCountedContent() -{ +void IHTTPGateway::TCountedContent::BeforeRelease() { auto oldSize = Counter->fetch_sub(size()); if (oldSize >= Threshold && oldSize - size() < Threshold) { if (auto handle = Handle.lock()) { @@ -1082,16 +1081,12 @@ IHTTPGateway::TCountedContent::~TCountedContent() } } +IHTTPGateway::TCountedContent::~TCountedContent() { + BeforeRelease(); +} + TString IHTTPGateway::TCountedContent::Extract() { - auto oldSize = Counter->fetch_sub(size()); - if (oldSize >= Threshold && oldSize - size() < Threshold) { - if (auto handle = Handle.lock()) { - curl_multi_wakeup(handle.get()); - } - } - if (InflightCounter) { - InflightCounter->Sub(size()); - } + BeforeRelease(); return TContentBase::Extract(); } diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index d1e7df9d4c1b..2ac830acf4cc 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -109,6 +109,8 @@ class IHTTPGateway { TString Extract(); private: + void BeforeRelease(); + const std::shared_ptr Counter; const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter; std::weak_ptr Handle; From c69cc25657f379b8f134f4b91ec2d50bb4432d35 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 7 Nov 2025 16:42:07 +0300 Subject: [PATCH 4/4] fixup! add little delay into busy-loop --- .../yql/providers/common/http_gateway/yql_http_gateway.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 2067ba86d85b..6858fee5fadc 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -721,7 +721,7 @@ friend class IHTTPGateway; auto weakHandle = std::weak_ptr(Handle); Handle.reset(); while (!weakHandle.expired()) { // short busy-wait in unlikely case of collision with TCountedContent - Sleep(TInstant::MicroSeconds(1)); + Sleep(TDuration::MicroSeconds(1)); } curl_global_cleanup(); }