Skip to content

Commit 5951eb7

Browse files
authored
http_gateway: use correct enum (backport #28315) / wake up poll when we released inflight buffers (backport #28390) (#28659)
1 parent bcc6200 commit 5951eb7

File tree

2 files changed

+54
-31
lines changed

2 files changed

+54
-31
lines changed

ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <util/generic/yexception.h>
77
#include <util/stream/str.h>
88
#include <util/string/builder.h>
9+
#include <util/datetime/base.h>
910
#include <yql/essentials/utils/log/log.h>
1011

1112
#include <thread>
@@ -480,6 +481,8 @@ class TEasyCurlStream : public TEasyCurl {
480481
IHTTPGateway::TOnNewDataPart onNewData,
481482
IHTTPGateway::TOnDownloadFinish onFinish,
482483
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter,
484+
std::weak_ptr<CURLM> handle,
485+
size_t threshold,
483486
const TCurlInitConfig& config = TCurlInitConfig(),
484487
TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr)
485488
: TEasyCurl(counter, downloadedBytes, uploadededBytes, url, std::move(headers), EMethod::GET, offset, sizeLimit, 0ULL, std::move(config), std::move(dnsCache))
@@ -488,6 +491,8 @@ class TEasyCurlStream : public TEasyCurl {
488491
, OnFinish(std::move(onFinish))
489492
, Counter(std::make_shared<std::atomic_size_t>(0ULL))
490493
, InflightCounter(inflightCounter)
494+
, Handle(std::move(handle))
495+
, Threshold(threshold)
491496
{}
492497

493498
static TPtr Make(
@@ -502,10 +507,12 @@ class TEasyCurlStream : public TEasyCurl {
502507
IHTTPGateway::TOnNewDataPart onNewData,
503508
IHTTPGateway::TOnDownloadFinish onFinish,
504509
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter,
510+
std::weak_ptr<CURLM> handle = {},
511+
size_t threshold = 0,
505512
const TCurlInitConfig& config = TCurlInitConfig(),
506513
TDNSGateway<>::TDNSConstCurlListPtr dnsCache = nullptr)
507514
{
508-
return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, std::move(config), std::move(dnsCache));
515+
return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, handle, threshold, std::move(config), std::move(dnsCache));
509516
}
510517

511518
enum class EAction : i8 {
@@ -565,8 +572,9 @@ class TEasyCurlStream : public TEasyCurl {
565572
size_t Write(void* contents, size_t size, size_t nmemb) final {
566573
MaybeStart(CURLE_OK);
567574
const auto realsize = size * nmemb;
568-
if (!Cancelled)
569-
OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter));
575+
if (!Cancelled) {
576+
OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter, InflightCounter, Handle, Threshold));
577+
}
570578
return realsize;
571579
}
572580

@@ -583,6 +591,8 @@ class TEasyCurlStream : public TEasyCurl {
583591
bool Paused = false;
584592
bool Cancelled = false;
585593
long HttpResponseCode = 0L;
594+
std::weak_ptr<CURLM> Handle;
595+
size_t Threshold;
586596
};
587597

588598
using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IHTTPGateway::TRetryPolicy::TPtr>;
@@ -676,8 +686,8 @@ friend class IHTTPGateway;
676686
}
677687

678688
~THTTPMultiGateway() {
679-
curl_multi_wakeup(Handle);
680689
IsStopped = true;
690+
curl_multi_wakeup(Handle.get());
681691
if (Thread.joinable()) {
682692
Thread.join();
683693
}
@@ -691,23 +701,26 @@ friend class IHTTPGateway;
691701
TCurlInitConfig InitConfig;
692702

693703
void InitCurl() {
704+
// FIXME: NOT SAFE (see man libcurl(3))
694705
const CURLcode globalInitResult = curl_global_init(CURL_GLOBAL_ALL);
695706
if (globalInitResult != CURLE_OK) {
696707
throw yexception() << "curl_global_init error " << int(globalInitResult) << ": " << curl_easy_strerror(globalInitResult) << Endl;
697708
}
698-
Handle = curl_multi_init();
709+
Handle = std::shared_ptr<CURLM>(curl_multi_init(), [](auto handle) {
710+
const CURLMcode multiCleanupResult = curl_multi_cleanup(handle);
711+
if (multiCleanupResult != CURLM_OK) {
712+
Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl;
713+
}
714+
curl_global_cleanup(); // FIXME: NOT SAFE (see man libcurl(3))
715+
});
699716
if (!Handle) {
700717
throw yexception() << "curl_multi_init error";
701718
}
702719
}
703720

704721
void UninitCurl() {
705722
Y_ABORT_UNLESS(Handle);
706-
const CURLMcode multiCleanupResult = curl_multi_cleanup(Handle);
707-
if (multiCleanupResult != CURLM_OK) {
708-
Cerr << "curl_multi_cleanup error " << int(multiCleanupResult) << ": " << curl_multi_strerror(multiCleanupResult) << Endl;
709-
}
710-
curl_global_cleanup();
723+
Handle.reset();
711724
}
712725

713726
void Perform() {
@@ -722,22 +735,22 @@ friend class IHTTPGateway;
722735
OutputMemory->Set(OutputSize);
723736

724737
int running = 0;
725-
if (const auto c = curl_multi_perform(Handle, &running); CURLM_OK != c) {
738+
if (const auto c = curl_multi_perform(Handle.get(), &running); CURLM_OK != c) {
726739
Fail(c);
727740
break;
728741
}
729742

730743
if (running < int(handlers)) {
731744
for (int messages = int(handlers) - running; messages;) {
732-
if (const auto msg = curl_multi_info_read(Handle, &messages)) {
745+
if (const auto msg = curl_multi_info_read(Handle.get(), &messages)) {
733746
if(msg->msg == CURLMSG_DONE) {
734747
Done(msg->easy_handle, msg->data.result);
735748
}
736749
}
737750
}
738751
} else {
739752
const int timeoutMs = 300;
740-
if (const auto c = curl_multi_poll(Handle, nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) {
753+
if (const auto c = curl_multi_poll(Handle.get(), nullptr, 0, timeoutMs, nullptr); CURLM_OK != c) {
741754
Fail(c);
742755
break;
743756
}
@@ -752,16 +765,16 @@ friend class IHTTPGateway;
752765
const auto streamHandle = stream->GetHandle();
753766
switch (stream->GetAction(BuffersSizePerStream)) {
754767
case TEasyCurlStream::EAction::Init:
755-
curl_multi_add_handle(Handle, streamHandle);
768+
curl_multi_add_handle(Handle.get(), streamHandle);
756769
break;
757770
case TEasyCurlStream::EAction::Work:
758771
curl_easy_pause(streamHandle, CURLPAUSE_RECV_CONT);
759772
break;
760773
case TEasyCurlStream::EAction::Stop:
761-
curl_easy_pause(streamHandle, CURL_WRITEFUNC_PAUSE);
774+
curl_easy_pause(streamHandle, CURLPAUSE_RECV);
762775
break;
763776
case TEasyCurlStream::EAction::Drop:
764-
curl_multi_remove_handle(Handle, streamHandle);
777+
curl_multi_remove_handle(Handle.get(), streamHandle);
765778
Allocated.erase(streamHandle);
766779
break;
767780
case TEasyCurlStream::EAction::None:
@@ -784,7 +797,7 @@ friend class IHTTPGateway;
784797
const auto handle = Await.front()->GetHandle();
785798
Allocated.emplace(handle, std::move(Await.front()));
786799
Await.pop();
787-
curl_multi_add_handle(Handle, handle);
800+
curl_multi_add_handle(Handle.get(), handle);
788801
}
789802
AwaitQueue->Set(Await.size());
790803
AllocatedMemory->Set(AllocatedSize);
@@ -859,7 +872,7 @@ friend class IHTTPGateway;
859872

860873
const TIssue error(curl_multi_strerror(result));
861874
while (!works.empty()) {
862-
curl_multi_remove_handle(Handle, works.top()->GetHandle());
875+
curl_multi_remove_handle(Handle.get(), works.top()->GetHandle());
863876
works.top()->Fail(CURLE_OK, error);
864877
works.pop();
865878
}
@@ -914,7 +927,7 @@ friend class IHTTPGateway;
914927
TOnDownloadFinish onFinish,
915928
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter) final
916929
{
917-
auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, InitConfig, DnsGateway.GetDNSCurlList());
930+
auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, sizeLimit, std::move(onStart), std::move(onNewData), std::move(onFinish), inflightCounter, Handle, BuffersSizePerStream, InitConfig, DnsGateway.GetDNSCurlList());
918931
const std::unique_lock lock(SyncRef());
919932
const auto handle = stream->GetHandle();
920933
TEasyCurlStream::TWeakPtr weak = stream;
@@ -942,20 +955,20 @@ friend class IHTTPGateway;
942955
void Wakeup(size_t sizeLimit) {
943956
AwaitQueue->Set(Await.size());
944957
if (Allocated.size() < MaxHandlers && AllocatedSize + sizeLimit + OutputSize.load() <= MaxSimulatenousDownloadsSize) {
945-
curl_multi_wakeup(Handle);
958+
curl_multi_wakeup(Handle.get());
946959
}
947960
}
948961

949962
CURLM* GetHandle() const {
950-
return Handle;
963+
return Handle.get();
951964
}
952965

953966
private:
954967
std::mutex& SyncRef() {
955968
return *Sync;
956969
}
957970

958-
CURLM* Handle = nullptr;
971+
std::shared_ptr<CURLM> Handle;
959972

960973
std::queue<TEasyCurlBuffer::TPtr> Await;
961974
std::vector<TEasyCurlStream::TWeakPtr> Streams;
@@ -1043,28 +1056,34 @@ IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode, con
10431056
{}
10441057

10451058
IHTTPGateway::TCountedContent::TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter,
1046-
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter)
1059+
const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, std::weak_ptr<CURLM> handle, size_t threshold)
10471060
: TContentBase(std::move(data)), Counter(counter), InflightCounter(inflightCounter)
1061+
, Handle(handle), Threshold(threshold)
10481062
{
10491063
Counter->fetch_add(size());
10501064
if (InflightCounter) {
10511065
InflightCounter->Add(size());
10521066
}
10531067
}
10541068

1055-
IHTTPGateway::TCountedContent::~TCountedContent()
1056-
{
1057-
Counter->fetch_sub(size());
1069+
void IHTTPGateway::TCountedContent::BeforeRelease() {
1070+
auto oldSize = Counter->fetch_sub(size());
1071+
if (oldSize >= Threshold && oldSize - size() < Threshold) {
1072+
if (auto handle = Handle.lock()) {
1073+
curl_multi_wakeup(handle.get());
1074+
}
1075+
}
10581076
if (InflightCounter) {
10591077
InflightCounter->Sub(size());
10601078
}
10611079
}
10621080

1081+
IHTTPGateway::TCountedContent::~TCountedContent() {
1082+
BeforeRelease();
1083+
}
1084+
10631085
TString IHTTPGateway::TCountedContent::Extract() {
1064-
Counter->fetch_sub(size());
1065-
if (InflightCounter) {
1066-
InflightCounter->Sub(size());
1067-
}
1086+
BeforeRelease();
10681087
return TContentBase::Extract();
10691088
}
10701089

ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,16 +101,20 @@ class IHTTPGateway {
101101

102102
class TCountedContent : public TContentBase {
103103
public:
104-
TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter);
104+
TCountedContent(TString&& data, const std::shared_ptr<std::atomic_size_t>& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& inflightCounter, std::weak_ptr<CURLM> handle, size_t threshold);
105105
~TCountedContent();
106106

107107
TCountedContent(TCountedContent&&) = default;
108108
TCountedContent& operator=(TCountedContent&& src) = default;
109109

110110
TString Extract();
111111
private:
112+
void BeforeRelease();
113+
112114
const std::shared_ptr<std::atomic_size_t> Counter;
113115
const ::NMonitoring::TDynamicCounters::TCounterPtr InflightCounter;
116+
std::weak_ptr<CURLM> Handle;
117+
const size_t Threshold;
114118
};
115119

116120
using TOnDownloadStart = std::function<void(CURLcode, long)>; // http code.

0 commit comments

Comments
 (0)