diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 45b446879f2c..1c87725912fb 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -480,6 +481,8 @@ class TEasyCurlStream : public TEasyCurl { IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, + std::weak_ptr handle, + size_t threshold, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, std::move(headers), EMethod::GET, offset, sizeLimit, 0ULL, std::move(config), std::move(dnsCache)) @@ -488,6 +491,8 @@ class TEasyCurlStream : public TEasyCurl { , OnFinish(std::move(onFinish)) , Counter(std::make_shared(0ULL)) , InflightCounter(inflightCounter) + , Handle(std::move(handle)) + , Threshold(threshold) {} static TPtr Make( @@ -502,10 +507,12 @@ class TEasyCurlStream : public TEasyCurl { IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, + std::weak_ptr handle = {}, + size_t threshold = 0, const TCurlInitConfig& config = TCurlInitConfig(), TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr) { - return std::make_shared(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, std::move(config), std::move(dnsCache)); + return std::make_shared(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, handle, threshold, std::move(config), std::move(dnsCache)); } enum class EAction : i8 { @@ -565,8 +572,9 @@ class TEasyCurlStream : public TEasyCurl { size_t Write(void* contents, size_t size, size_t nmemb) final { MaybeStart(CURLE_OK); const auto realsize = size * nmemb; - if (!Cancelled) - OnNewData(IHTTPGateway::TCountedContent(TString(static_cast(contents), realsize), Counter, InflightCounter)); + if (!Cancelled) { + OnNewData(IHTTPGateway::TCountedContent(TString(static_cast(contents), realsize), Counter, InflightCounter, Handle, Threshold)); + } return realsize; } @@ -583,6 +591,8 @@ class TEasyCurlStream : public TEasyCurl { bool Paused = false; bool Cancelled = false; long HttpResponseCode = 0L; + std::weak_ptr Handle; + size_t Threshold; }; using TKeyType = std::tuple; @@ -676,8 +686,8 @@ friend class IHTTPGateway; } ~THTTPMultiGateway() { - curl_multi_wakeup(Handle); IsStopped = true; + curl_multi_wakeup(Handle.get()); if (Thread.joinable()) { Thread.join(); } @@ -691,11 +701,18 @@ friend class IHTTPGateway; TCurlInitConfig InitConfig; void InitCurl() { + // FIXME: NOT SAFE (see man libcurl(3)) const CURLcode globalInitResult = curl_global_init(CURL_GLOBAL_ALL); if (globalInitResult != CURLE_OK) { throw yexception() << "curl_global_init error " << int(globalInitResult) << ": " << curl_easy_strerror(globalInitResult) << Endl; } - Handle = curl_multi_init(); + Handle = std::shared_ptr(curl_multi_init(), [](auto handle) { + const CURLMcode multiCleanupResult = curl_multi_cleanup(handle); + if (multiCleanupResult != CURLM_OK) { + Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl; + } + curl_global_cleanup(); // FIXME: NOT SAFE (see man libcurl(3)) + }); if (!Handle) { throw yexception() << "curl_multi_init error"; } @@ -703,11 +720,7 @@ friend class IHTTPGateway; void UninitCurl() { Y_ABORT_UNLESS(Handle); - const CURLMcode multiCleanupResult = curl_multi_cleanup(Handle); - if (multiCleanupResult != CURLM_OK) { - Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl; - } - curl_global_cleanup(); + Handle.reset(); } void Perform() { @@ -722,14 +735,14 @@ friend class IHTTPGateway; OutputMemory->Set(OutputSize); int running = 0; - if (const auto c = curl_multi_perform(Handle, &running); CURLM_OK != c) { + if (const auto c = curl_multi_perform(Handle.get(), &running); CURLM_OK != c) { Fail(c); break; } if (running < int(handlers)) { for (int messages = int(handlers) - running; messages;) { - if (const auto msg = curl_multi_info_read(Handle, &messages)) { + if (const auto msg = curl_multi_info_read(Handle.get(), &messages)) { if(msg->msg == CURLMSG_DONE) { Done(msg->easy_handle, msg->data.result); } @@ -737,7 +750,7 @@ friend class IHTTPGateway; } } else { const int timeoutMs = 300; - if (const auto c = curl_multi_poll(Handle, nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) { + if (const auto c = curl_multi_poll(Handle.get(), nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) { Fail(c); break; } @@ -752,7 +765,7 @@ friend class IHTTPGateway; const auto streamHandle = stream->GetHandle(); switch (stream->GetAction(BuffersSizePerStream)) { case TEasyCurlStream::EAction::Init: - curl_multi_add_handle(Handle, streamHandle); + curl_multi_add_handle(Handle.get(), streamHandle); break; case TEasyCurlStream::EAction::Work: curl_easy_pause(streamHandle, CURLPAUSE_RECV_CONT); @@ -761,7 +774,7 @@ friend class IHTTPGateway; curl_easy_pause(streamHandle, CURL_WRITEFUNC_PAUSE); break; case TEasyCurlStream::EAction::Drop: - curl_multi_remove_handle(Handle, streamHandle); + curl_multi_remove_handle(Handle.get(), streamHandle); Allocated.erase(streamHandle); break; case TEasyCurlStream::EAction::None: @@ -784,7 +797,7 @@ friend class IHTTPGateway; const auto handle = Await.front()->GetHandle(); Allocated.emplace(handle, std::move(Await.front())); Await.pop(); - curl_multi_add_handle(Handle, handle); + curl_multi_add_handle(Handle.get(), handle); } AwaitQueue->Set(Await.size()); AllocatedMemory->Set(AllocatedSize); @@ -859,7 +872,7 @@ friend class IHTTPGateway; const TIssue error(curl_multi_strerror(result)); while (!works.empty()) { - curl_multi_remove_handle(Handle, works.top()->GetHandle()); + curl_multi_remove_handle(Handle.get(), works.top()->GetHandle()); works.top()->Fail(CURLE_OK, error); works.pop(); } @@ -914,7 +927,7 @@ friend class IHTTPGateway; TOnDownloadFinish onFinish, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final { - auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList()); + auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, Handle, BuffersSizePerStream, InitConfig, DnsGateway.GetDNSCurlList()); const std::unique_lock lock(SyncRef()); const auto handle = stream->GetHandle(); TEasyCurlStream::TWeakPtr weak = stream; @@ -942,12 +955,12 @@ friend class IHTTPGateway; void Wakeup(size_t sizeLimit) { AwaitQueue->Set(Await.size()); if (Allocated.size() < MaxHandlers && AllocatedSize + sizeLimit + OutputSize.load() <= MaxSimulatenousDownloadsSize) { - curl_multi_wakeup(Handle); + curl_multi_wakeup(Handle.get()); } } CURLM* GetHandle() const { - return Handle; + return Handle.get(); } private: @@ -955,7 +968,7 @@ friend class IHTTPGateway; return *Sync; } - CURLM* Handle = nullptr; + std::shared_ptr Handle; std::queue Await; std::vector Streams; @@ -1043,8 +1056,9 @@ IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode, con {} IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr& counter, - const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) + const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, std::weak_ptr handle, size_t threshold) : TContentBase(std::move(data)), Counter(counter), InflightCounter(inflightCounter) + , Handle(handle), Threshold(threshold) { Counter->fetch_add(size()); if (InflightCounter) { @@ -1052,19 +1066,24 @@ IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared } } -IHTTPGateway::TCountedContent::~TCountedContent() -{ - Counter->fetch_sub(size()); +void IHTTPGateway::TCountedContent::BeforeRelease() { + auto oldSize = Counter->fetch_sub(size()); + if (oldSize >= Threshold && oldSize - size() < Threshold) { + if (auto handle = Handle.lock()) { + curl_multi_wakeup(handle.get()); + } + } if (InflightCounter) { InflightCounter->Sub(size()); } } +IHTTPGateway::TCountedContent::~TCountedContent() { + BeforeRelease(); +} + TString IHTTPGateway::TCountedContent::Extract() { - Counter->fetch_sub(size()); - if (InflightCounter) { - InflightCounter->Sub(size()); - } + BeforeRelease(); return TContentBase::Extract(); } diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 2095419a6270..2ac830acf4cc 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -101,7 +101,7 @@ class IHTTPGateway { class TCountedContent : public TContentBase { public: - TCountedContent(TString&& data, const std::shared_ptr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter); + TCountedContent(TString&& data, const std::shared_ptr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, std::weak_ptr handle, size_t threshold); ~TCountedContent(); TCountedContent(TCountedContent&&) = default; @@ -109,8 +109,12 @@ class IHTTPGateway { TString Extract(); private: + void BeforeRelease(); + const std::shared_ptr Counter; const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter; + std::weak_ptr Handle; + const size_t Threshold; }; using TOnDownloadStart = std::function; // http code.