From fa893d7bdf6680aa0d3bb82501cad06672447d8e Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Tue, 25 Nov 2025 16:30:02 +0000 Subject: [PATCH 1/6] test stream write cancellation support and beast2::write tests --- include/boost/beast2/impl/write.hpp | 10 + .../boost/beast2/test/detail/stream_state.hpp | 32 +- include/boost/beast2/test/impl/stream.hpp | 281 +++++++++++++++--- include/boost/beast2/test/stream.hpp | 11 + test/unit/write.cpp | 260 +++++++++++++++- 5 files changed, 531 insertions(+), 63 deletions(-) diff --git a/include/boost/beast2/impl/write.hpp b/include/boost/beast2/impl/write.hpp index 7eb10f1e..80725142 100644 --- a/include/boost/beast2/impl/write.hpp +++ b/include/boost/beast2/impl/write.hpp @@ -54,6 +54,9 @@ class write_some_op BOOST_ASIO_CORO_REENTER(*this) { + self.reset_cancellation_state( + asio::enable_total_cancellation()); + rv = sr_.prepare(); if(! rv) { @@ -82,6 +85,9 @@ class write_some_op } sr_.consume(bytes_transferred); + if(!!self.cancelled() && !sr_.is_done()) + ec = asio::error::operation_aborted; + upcall: self.complete( ec, bytes_transferred ); @@ -128,6 +134,10 @@ class write_op dest_, sr_, std::move(self)); } n_ += bytes_transferred; + + if(!!self.cancelled() && ! sr_.is_done()) + ec = asio::error::operation_aborted; + if(ec.failed()) break; } diff --git a/include/boost/beast2/test/detail/stream_state.hpp b/include/boost/beast2/test/detail/stream_state.hpp index fc73055e..862e9e12 100644 --- a/include/boost/beast2/test/detail/stream_state.hpp +++ b/include/boost/beast2/test/detail/stream_state.hpp @@ -74,6 +74,15 @@ struct stream_read_op_base //------------------------------------------------------------------------------ +struct stream_write_op_base +{ + virtual ~stream_write_op_base() = default; + virtual void + operator()(system::error_code ec) = 0; +}; + +//------------------------------------------------------------------------------ + enum class stream_status { ok, @@ -89,8 +98,9 @@ struct stream_state std::mutex m; std::string storage; buffers::string_buffer b; - std::condition_variable cv; - std::unique_ptr op; + //std::condition_variable cv; + std::unique_ptr rop; + std::unique_ptr wop; stream_status code = stream_status::ok; fail_count* fc = nullptr; std::size_t nread = 0; @@ -139,7 +149,7 @@ shutdown() for(auto p : sp_->v_) { std::lock_guard g2(p->m); - v.emplace_back(std::move(p->op)); + v.emplace_back(std::move(p->rop)); p->code = detail::stream_status::eof; } } @@ -200,8 +210,11 @@ stream_state:: ~stream_state() { // cancel outstanding read - if(op != nullptr) - (*op)(asio::error::operation_aborted); + if(rop != nullptr) + (*rop)(asio::error::operation_aborted); + // cancel outstanding write + if(wop != nullptr) + (*wop)(asio::error::operation_aborted); } inline @@ -223,16 +236,13 @@ void stream_state:: notify_read() { - if(op) + if(rop) { - auto op_ = std::move(op); + auto op_ = std::move(rop); op_->operator()(system::error_code{}); } - else - { - cv.notify_all(); - } } + } // detail } // test } // beast2 diff --git a/include/boost/beast2/test/impl/stream.hpp b/include/boost/beast2/test/impl/stream.hpp index 5144609d..4963d485 100644 --- a/include/boost/beast2/test/impl/stream.hpp +++ b/include/boost/beast2/test/impl/stream.hpp @@ -81,7 +81,7 @@ class basic_stream::read_op : public detail::stream_read_op_base std::unique_ptr p; { std::lock_guard lock(sp->m); - p = std::move(sp->op); + p = std::move(sp->rop); } if(p != nullptr) (*p)(asio::error::operation_aborted); @@ -138,7 +138,7 @@ class basic_stream::read_op : public detail::stream_read_op_base if(! ec) { std::lock_guard lock(sp->m); - BOOST_ASSERT(! sp->op); + BOOST_ASSERT(! sp->rop); if(sp->b.size() > 0) { bytes_transferred = @@ -181,6 +181,146 @@ class basic_stream::read_op : public detail::stream_read_op_base } }; +template +template +class basic_stream::write_op : public detail::stream_write_op_base +{ + class lambda + { + Handler h_; + boost::weak_ptr iwp_; + boost::weak_ptr owp_; + Buffers b_; + asio::executor_work_guard< + asio::associated_executor_t> + wg2_; + + class cancellation_handler + { + public: + explicit cancellation_handler( + boost::weak_ptr wp) + : wp_(std::move(wp)) + { + } + + void + operator()(asio::cancellation_type type) const + { + if(type != asio::cancellation_type::none) + { + if(auto sp = wp_.lock()) + { + std::unique_ptr p; + { + std::lock_guard lock(sp->m); + p = std::move(sp->wop); + } + if(p != nullptr) + (*p)(asio::error::operation_aborted); + } + } + } + + private: + boost::weak_ptr wp_; + }; + + public: + template + lambda( + Handler_&& h, + boost::shared_ptr const& in, + boost::weak_ptr const& out, + Buffers const& b) + : h_(std::forward(h)) + , iwp_(in) + , owp_(out) + , b_(b) + , wg2_(asio::get_associated_executor(h_, in->exec)) + { + auto c_slot = asio::get_associated_cancellation_slot(h_); + if(c_slot.is_connected()) + c_slot.template emplace(iwp_); + } + + using allocator_type = asio::associated_allocator_t; + + allocator_type + get_allocator() const noexcept + { + return asio::get_associated_allocator(h_); + } + + using cancellation_slot_type = + asio::associated_cancellation_slot_t; + + cancellation_slot_type + get_cancellation_slot() const noexcept + { + return asio::get_associated_cancellation_slot( + h_, asio::cancellation_slot()); + } + + void + operator()(system::error_code ec) + { + std::size_t bytes_transferred = 0; + auto isp = iwp_.lock(); + if(!isp) + { + ec = asio::error::operation_aborted; + } + auto osp = owp_.lock(); + if(!osp) + { + ec = asio::error::operation_aborted; + } + if(!ec) + { + // copy buffers + std::size_t n = std::min( + buffers::size(b_), isp->write_max); + { + std::lock_guard lock(osp->m); + n = buffers::copy(osp->b.prepare(n), b_); + osp->b.commit(n); + osp->nwrite_bytes += n; + osp->notify_read(); + } + bytes_transferred = n; + } + + asio::dispatch( + wg2_.get_executor(), + asio::append(std::move(h_), ec, bytes_transferred)); + wg2_.reset(); + } + }; + + lambda fn_; + asio::executor_work_guard wg1_; + +public: + template + write_op( + Handler_&& h, + boost::shared_ptr const& in, + boost::weak_ptr const& out, + Buffers const& b) + : fn_(std::forward(h), in, out, b) + , wg1_(in->exec) + { + } + + void + operator()(system::error_code ec) override + { + asio::post(wg1_.get_executor(), asio::append(std::move(fn_), ec)); + wg1_.reset(); + } +}; + template struct basic_stream::run_read_op { @@ -245,39 +385,47 @@ struct basic_stream::run_write_op // that your handler does not meet the documented type // requirements for the handler. - ++in_->nwrite; - auto const upcall = [&](system::error_code ec, std::size_t n) - { - asio::post(in_->exec, asio::append(std::move(h), ec, n)); - }; - - // test failure - system::error_code ec; - std::size_t n = 0; - if(in_->fc && in_->fc->fail(ec)) - return upcall(ec, n); - - // A request to write 0 bytes to a stream is a no-op. - if(buffers::size(buffers) == 0) - return upcall(ec, n); - - // connection closed - auto out = out_.lock(); - if(! out) - return upcall(asio::error::connection_reset, n); + initiate_write( + in_, + out_, + std::unique_ptr{ new write_op< + typename std::decay::type, + ConstBufferSequence>(std::move(h), in_, out_, buffers) }, + buffers::size(buffers)); - // copy buffers - n = std::min( - buffers::size(buffers), in_->write_max); - { - std::lock_guard lock(out->m); - n = buffers::copy(out->b.prepare(n), buffers); - out->b.commit(n); - out->nwrite_bytes += n; - out->notify_read(); - } - BOOST_ASSERT(! ec); - upcall(ec, n); + //++in_->nwrite; + //auto const upcall = [&](system::error_code ec, std::size_t n) + //{ + // asio::post(in_->exec, asio::append(std::move(h), ec, n)); + //}; + + //// test failure + //system::error_code ec; + //std::size_t n = 0; + //if(in_->fc && in_->fc->fail(ec)) + // return upcall(ec, n); + + //// A request to write 0 bytes to a stream is a no-op. + //if(buffers::size(buffers) == 0) + // return upcall(ec, n); + + //// connection closed + //auto out = out_.lock(); + //if(! out) + // return upcall(asio::error::connection_reset, n); + + //// copy buffers + //n = std::min( + // buffers::size(buffers), in_->write_max); + //{ + // std::lock_guard lock(out->m); + // n = buffers::copy(out->b.prepare(n), buffers); + // out->b.commit(n); + // out->nwrite_bytes += n; + // out->notify_read(); + //} + //BOOST_ASSERT(! ec); + //upcall(ec, n); } }; @@ -345,28 +493,27 @@ auto basic_stream::get_executor() noexcept -> executor_type return detail::extract_executor_op()(in_->exec); } - //------------------------------------------------------------------------------ template -void basic_stream::initiate_read( +void +basic_stream::initiate_read( boost::shared_ptr const& in_, - std::unique_ptr&& op, + std::unique_ptr&& rop, std::size_t buf_size) { std::unique_lock lock(in_->m); ++in_->nread; - if(in_->op != nullptr) - BOOST_THROW_EXCEPTION( - std::logic_error{"in_->op != nullptr"}); + if(in_->rop != nullptr) + BOOST_THROW_EXCEPTION(std::logic_error{ "in_->rop != nullptr" }); // test failure system::error_code ec; if(in_->fc && in_->fc->fail(ec)) { lock.unlock(); - (*op)(ec); + (*rop)(ec); return; } @@ -374,7 +521,7 @@ void basic_stream::initiate_read( if(buf_size == 0 || buffers::size(in_->b.data()) > 0) { lock.unlock(); - (*op)(ec); + (*rop)(ec); return; } @@ -382,12 +529,58 @@ void basic_stream::initiate_read( if(in_->code != detail::stream_status::ok) { lock.unlock(); - (*op)(asio::error::eof); + (*rop)(asio::error::eof); return; } // complete when bytes available or closed - in_->op = std::move(op); + in_->rop = std::move(rop); +} + +//------------------------------------------------------------------------------ + +template +void basic_stream::initiate_write( + boost::shared_ptr const& in_, + boost::weak_ptr const& out_, + std::unique_ptr&& wop, + std::size_t buf_size) +{ + { + std::unique_lock lock(in_->m); + + ++in_->nwrite; + + // test failure + system::error_code ec; + if(in_->fc && in_->fc->fail(ec)) + { + lock.unlock(); + (*wop)(ec); + return; + } + } + + // A request to write 0 bytes to a stream is a no-op. + if(buf_size == 0) + { + (*wop)(system::error_code{}); + return; + } + + // connection closed + auto out = out_.lock(); + if(!out) + { + (*wop)(asio::error::connection_reset); + return; + } + + std::unique_lock lock(out->m); + + in_->wop = std::move(wop); + auto op = std::move(in_->wop); + op->operator()(system::error_code{}); } //------------------------------------------------------------------------------ diff --git a/include/boost/beast2/test/stream.hpp b/include/boost/beast2/test/stream.hpp index a5b636ea..23fcaa78 100644 --- a/include/boost/beast2/test/stream.hpp +++ b/include/boost/beast2/test/stream.hpp @@ -131,6 +131,9 @@ class basic_stream template class read_op; + template + class write_op; + struct run_read_op; struct run_write_op; @@ -141,6 +144,14 @@ class basic_stream std::unique_ptr&& op, std::size_t buf_size); + + static void + initiate_write( + boost::shared_ptr const& in, + boost::weak_ptr const& out, + std::unique_ptr&& op, + std::size_t buf_size); + #if ! BOOST_BEAST2_DOXYGEN // boost::asio::ssl::stream needs these // DEPRECATED diff --git a/test/unit/write.cpp b/test/unit/write.cpp index 90c5da40..ddd9116f 100644 --- a/test/unit/write.cpp +++ b/test/unit/write.cpp @@ -15,28 +15,272 @@ namespace boost { namespace beast2 { -class any_async_read_stream -{ -}; +//class any_async_read_stream +//{ +//}; +// +//class write_test +//{ +//public: +// void +// testWrite() +// { +// } +// +// void +// run() +// { +// testWrite(); +// } +//}; +// +//TEST_SUITE( +// write_test, +// "boost.beast2.write"); + +} // beast2 +} // boost + +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// Copyright (c) 2025 Mohammad Nejati +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/beast2 +// + +// Test that header file is self-contained. +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "test_helpers.hpp" + +#include + +namespace boost { +namespace beast2 { class write_test { + core::string_view const headers = + "HTTP/1.1 200 OK\r\n" + "Content-Length: 3\r\n" + "\r\n"; + core::string_view const body = + "abc"; + core::string_view const msg = + "HTTP/1.1 200 OK\r\n" + "Content-Length: 3\r\n" + "\r\n" + "abc"; + public: void - testWrite() + testAsyncWriteSome() + { + boost::asio::io_context ioc; + boost::rts::polystore rts_ctx; + http_proto::install_serializer_service(rts_ctx, {}); + + // async_read_some completes when the parser reads + // the header section of the message. + { + test::stream ts{ ioc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(1); + + http_proto::serializer sr(rts_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); + + for(std::size_t total = 0; total < msg.size(); total++) + { + async_write_some( + ts, + sr, + [&](system::error_code ec, std::size_t n) + { + BOOST_TEST(!ec.failed()); + BOOST_TEST_EQ(n, 1); // minus body + }); + test::run(ioc); + } + BOOST_TEST(sr.is_done()); + BOOST_TEST_EQ(tr.str(), msg); + + BOOST_TEST_EQ(tr.str(), msg); + } + + // async_write_some reports stream errors + { + test::fail_count fc(3, asio::error::network_down); + test::stream ts{ ioc, fc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(1); + + http_proto::serializer sr(rts_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); + + for(int count = 0; count < 3; count++) + { + async_write_some( + ts, + sr, + [&](system::error_code ec, std::size_t n) + { + if (count < 2) + { + BOOST_TEST(!ec.failed()); + BOOST_TEST_EQ(n, 1); + } + else + { + BOOST_TEST_EQ(ec, asio::error::network_down); + BOOST_TEST_EQ(n, 0); + } + }); + test::run(ioc); + + auto expected = msg.substr(0, (count == 0) ? 1 : 2); + BOOST_TEST_EQ(tr.str(), expected); + } + } + + // async_read_some cancellation + { + test::stream ts{ ioc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(5); + + asio::cancellation_signal c_signal; + + http_proto::serializer sr(rts_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); + + // async_read_some cancels after reading 0 bytes + async_write_some( + ts, + sr, + asio::bind_cancellation_slot( + c_signal.slot(), + [](system::error_code ec, std::size_t n) + { + BOOST_TEST_EQ(n, 5); + BOOST_TEST_EQ(ec, asio::error::operation_aborted); + })); + c_signal.emit(asio::cancellation_type::total); + + // Create a destination buffer + std::string s; + boost::buffers::string_buffer buf(&s); + + test::run(ioc); + + BOOST_TEST_EQ(tr.str(), "HTTP/"); + } + } + + void + testAsyncReadHeader() + { + // currently, async_read_header and + // async_read_some are identical + } + + void + testAsyncRead() { + boost::asio::io_context ioc; + rts::polystore rts_ctx; + http_proto::install_parser_service(rts_ctx, {}); + + // async_read completes when the parser reads + // the entire message. + { + test::stream ts(ioc, msg); + http_proto::response_parser pr(rts_ctx); + pr.reset(); + pr.start(); + + // limit async_read_some for better coverage + ts.read_size(1); + + async_read( + ts, + pr, + [&](system::error_code ec, std::size_t n) + { + BOOST_TEST(!ec.failed()); + BOOST_TEST_EQ(n, msg.size()); + }); + + test::run(ioc); + + BOOST_TEST_EQ(ts.nread(), msg.size()); // because of ts.read_size(1) + BOOST_TEST(pr.is_complete()); + BOOST_TEST(pr.body() == "abc"); + } + + // async_read completes immediatly when + // parser contains enough data + { + asio::post( + ioc, + [&]() + { + test::stream ts(ioc); + http_proto::response_parser pr(rts_ctx); + pr.reset(); + pr.start(); + + pr.commit(buffers::copy( + pr.prepare(), + buffers::const_buffer(msg.data(), msg.size()))); + + async_read( + ts, + pr, + asio::bind_immediate_executor( + ioc.get_executor(), test::success_handler())); + + BOOST_TEST_EQ(ts.nread(), 0); + BOOST_TEST(pr.is_complete()); + BOOST_TEST(pr.body() == "abc"); + }); + BOOST_TEST_EQ(test::run(ioc), 1); + } } void run() { - testWrite(); + testAsyncWriteSome(); + //testAsyncReadHeader(); + //testAsyncRead(); } }; -TEST_SUITE( - write_test, - "boost.beast2.write"); +TEST_SUITE(write_test, "boost.beast2.write"); } // beast2 } // boost From 303cb20ee27c22f79f2c4286e0fbb6f03c32b75a Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Tue, 25 Nov 2025 18:08:08 +0000 Subject: [PATCH 2/6] rts rename to capy --- test/unit/write.cpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/test/unit/write.cpp b/test/unit/write.cpp index ddd9116f..9144e7ff 100644 --- a/test/unit/write.cpp +++ b/test/unit/write.cpp @@ -62,8 +62,8 @@ namespace beast2 { #include #include #include +#include #include -#include #include "test_helpers.hpp" @@ -91,8 +91,8 @@ class write_test testAsyncWriteSome() { boost::asio::io_context ioc; - boost::rts::polystore rts_ctx; - http_proto::install_serializer_service(rts_ctx, {}); + boost::capy::polystore capy_ctx; + http_proto::install_serializer_service(capy_ctx, {}); // async_read_some completes when the parser reads // the header section of the message. @@ -101,7 +101,7 @@ class write_test ts.connect(tr); ts.write_size(1); - http_proto::serializer sr(rts_ctx); + http_proto::serializer sr(capy_ctx); sr.reset(); http_proto::response res(headers); @@ -132,7 +132,7 @@ class write_test ts.connect(tr); ts.write_size(1); - http_proto::serializer sr(rts_ctx); + http_proto::serializer sr(capy_ctx); sr.reset(); http_proto::response res(headers); @@ -171,7 +171,7 @@ class write_test asio::cancellation_signal c_signal; - http_proto::serializer sr(rts_ctx); + http_proto::serializer sr(capy_ctx); sr.reset(); http_proto::response res(headers); @@ -211,14 +211,14 @@ class write_test testAsyncRead() { boost::asio::io_context ioc; - rts::polystore rts_ctx; - http_proto::install_parser_service(rts_ctx, {}); + capy::polystore capy_ctx; + http_proto::install_parser_service(capy_ctx, {}); // async_read completes when the parser reads // the entire message. { test::stream ts(ioc, msg); - http_proto::response_parser pr(rts_ctx); + http_proto::response_parser pr(capy_ctx); pr.reset(); pr.start(); @@ -249,7 +249,7 @@ class write_test [&]() { test::stream ts(ioc); - http_proto::response_parser pr(rts_ctx); + http_proto::response_parser pr(capy_ctx); pr.reset(); pr.start(); From ed95c27d23ccf0915a6d5aa0811b510272336dbd Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Fri, 28 Nov 2025 13:37:12 +0000 Subject: [PATCH 3/6] checkpoint --- include/boost/beast2/impl/write.hpp | 27 ++- .../boost/beast2/test/detail/stream_state.hpp | 34 ++- include/boost/beast2/test/impl/stream.hpp | 66 ++---- include/boost/beast2/test/stream.hpp | 4 +- test/unit/write.cpp | 197 +++++++++++------- 5 files changed, 175 insertions(+), 153 deletions(-) diff --git a/include/boost/beast2/impl/write.hpp b/include/boost/beast2/impl/write.hpp index 80725142..1e5e66c7 100644 --- a/include/boost/beast2/impl/write.hpp +++ b/include/boost/beast2/impl/write.hpp @@ -85,8 +85,8 @@ class write_some_op } sr_.consume(bytes_transferred); - if(!!self.cancelled() && !sr_.is_done()) - ec = asio::error::operation_aborted; + //if(!!self.cancelled() && !sr_.is_done()) + // ec = asio::error::operation_aborted; upcall: self.complete( @@ -123,8 +123,24 @@ class write_op { BOOST_ASIO_CORO_REENTER(*this) { + self.reset_cancellation_state(asio::enable_total_cancellation()); + do { + if(!!self.cancelled()) + { + ec = asio::error::operation_aborted; + BOOST_ASIO_CORO_YIELD + { + BOOST_ASIO_HANDLER_LOCATION( + (__FILE__, __LINE__, "immediate")); + auto io_ex = self.get_io_executor(); + asio::async_immediate( + io_ex, asio::append(std::move(self), ec)); + } + break; // goto upcall + } + BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( @@ -135,15 +151,12 @@ class write_op } n_ += bytes_transferred; - if(!!self.cancelled() && ! sr_.is_done()) - ec = asio::error::operation_aborted; - if(ec.failed()) - break; + break; // goto upcall } while(! sr_.is_done()); - // upcall + // upcall: self.complete(ec, n_ ); } } diff --git a/include/boost/beast2/test/detail/stream_state.hpp b/include/boost/beast2/test/detail/stream_state.hpp index 862e9e12..53bb5550 100644 --- a/include/boost/beast2/test/detail/stream_state.hpp +++ b/include/boost/beast2/test/detail/stream_state.hpp @@ -66,23 +66,14 @@ class stream_service //------------------------------------------------------------------------------ -struct stream_read_op_base +struct stream_op_base { - virtual ~stream_read_op_base() = default; + virtual ~stream_op_base() = default; virtual void operator()(system::error_code ec) = 0; }; //------------------------------------------------------------------------------ -struct stream_write_op_base -{ - virtual ~stream_write_op_base() = default; - virtual void - operator()(system::error_code ec) = 0; -}; - -//------------------------------------------------------------------------------ - enum class stream_status { ok, @@ -99,8 +90,8 @@ struct stream_state std::string storage; buffers::string_buffer b; //std::condition_variable cv; - std::unique_ptr rop; - std::unique_ptr wop; + std::unique_ptr rop; + std::unique_ptr wop; stream_status code = stream_status::ok; fail_count* fc = nullptr; std::size_t nread = 0; @@ -143,14 +134,17 @@ void stream_service:: shutdown() { - std::vector> v; - std::lock_guard g1(sp_->m_); - v.reserve(sp_->v_.size()); - for(auto p : sp_->v_) + std::vector> v; { - std::lock_guard g2(p->m); - v.emplace_back(std::move(p->rop)); - p->code = detail::stream_status::eof; + std::lock_guard g1(sp_->m_); + v.reserve(2 * sp_->v_.size()); + for(auto p : sp_->v_) + { + std::lock_guard g2(p->m); + v.emplace_back(std::move(p->rop)); + v.emplace_back(std::move(p->wop)); + p->code = detail::stream_status::eof; + } } } diff --git a/include/boost/beast2/test/impl/stream.hpp b/include/boost/beast2/test/impl/stream.hpp index 4963d485..ed8ae594 100644 --- a/include/boost/beast2/test/impl/stream.hpp +++ b/include/boost/beast2/test/impl/stream.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -51,7 +52,7 @@ struct extract_executor_op template template -class basic_stream::read_op : public detail::stream_read_op_base +class basic_stream::read_op : public detail::stream_op_base { class lambda { @@ -78,7 +79,7 @@ class basic_stream::read_op : public detail::stream_read_op_base { if(auto sp = wp_.lock()) { - std::unique_ptr p; + std::unique_ptr p; { std::lock_guard lock(sp->m); p = std::move(sp->rop); @@ -176,14 +177,14 @@ class basic_stream::read_op : public detail::stream_read_op_base void operator()(system::error_code ec) override { - asio::post(wg1_.get_executor(), asio::append(std::move(fn_), ec)); + asio::defer(wg1_.get_executor(), asio::append(std::move(fn_), ec)); wg1_.reset(); } }; template template -class basic_stream::write_op : public detail::stream_write_op_base +class basic_stream::write_op : public detail::stream_op_base { class lambda { @@ -211,7 +212,7 @@ class basic_stream::write_op : public detail::stream_write_op_base { if(auto sp = wp_.lock()) { - std::unique_ptr p; + std::unique_ptr p; { std::lock_guard lock(sp->m); p = std::move(sp->wop); @@ -295,10 +296,11 @@ class basic_stream::write_op : public detail::stream_write_op_base wg2_.get_executor(), asio::append(std::move(h_), ec, bytes_transferred)); wg2_.reset(); + isp->wop.release(); } }; - lambda fn_; + std::unique_ptr fnp_; asio::executor_work_guard wg1_; public: @@ -308,7 +310,7 @@ class basic_stream::write_op : public detail::stream_write_op_base boost::shared_ptr const& in, boost::weak_ptr const& out, Buffers const& b) - : fn_(std::forward(h), in, out, b) + : fnp_(std::make_unique(std::forward(h), in, out, b)) , wg1_(in->exec) { } @@ -316,7 +318,9 @@ class basic_stream::write_op : public detail::stream_write_op_base void operator()(system::error_code ec) override { - asio::post(wg1_.get_executor(), asio::append(std::move(fn_), ec)); + std::unique_ptr fnp(std::move(fnp_)); + if(fnp) + asio::post(wg1_.get_executor(), asio::append(std::move(*fnp), ec)); wg1_.reset(); } }; @@ -348,7 +352,7 @@ struct basic_stream::run_read_op initiate_read( in, - std::unique_ptr{ + std::unique_ptr{ new read_op< typename std::decay::type, MutableBufferSequence>( @@ -388,44 +392,10 @@ struct basic_stream::run_write_op initiate_write( in_, out_, - std::unique_ptr{ new write_op< + std::unique_ptr{ new write_op< typename std::decay::type, ConstBufferSequence>(std::move(h), in_, out_, buffers) }, buffers::size(buffers)); - - //++in_->nwrite; - //auto const upcall = [&](system::error_code ec, std::size_t n) - //{ - // asio::post(in_->exec, asio::append(std::move(h), ec, n)); - //}; - - //// test failure - //system::error_code ec; - //std::size_t n = 0; - //if(in_->fc && in_->fc->fail(ec)) - // return upcall(ec, n); - - //// A request to write 0 bytes to a stream is a no-op. - //if(buffers::size(buffers) == 0) - // return upcall(ec, n); - - //// connection closed - //auto out = out_.lock(); - //if(! out) - // return upcall(asio::error::connection_reset, n); - - //// copy buffers - //n = std::min( - // buffers::size(buffers), in_->write_max); - //{ - // std::lock_guard lock(out->m); - // n = buffers::copy(out->b.prepare(n), buffers); - // out->b.commit(n); - // out->nwrite_bytes += n; - // out->notify_read(); - //} - //BOOST_ASSERT(! ec); - //upcall(ec, n); } }; @@ -499,7 +469,7 @@ template void basic_stream::initiate_read( boost::shared_ptr const& in_, - std::unique_ptr&& rop, + std::unique_ptr&& rop, std::size_t buf_size) { std::unique_lock lock(in_->m); @@ -543,7 +513,7 @@ template void basic_stream::initiate_write( boost::shared_ptr const& in_, boost::weak_ptr const& out_, - std::unique_ptr&& wop, + std::unique_ptr&& wop, std::size_t buf_size) { { @@ -579,8 +549,8 @@ void basic_stream::initiate_write( std::unique_lock lock(out->m); in_->wop = std::move(wop); - auto op = std::move(in_->wop); - op->operator()(system::error_code{}); + //auto op = std::move(in_->wop); + in_->wop->operator()(system::error_code{}); } //------------------------------------------------------------------------------ diff --git a/include/boost/beast2/test/stream.hpp b/include/boost/beast2/test/stream.hpp index 23fcaa78..c3634340 100644 --- a/include/boost/beast2/test/stream.hpp +++ b/include/boost/beast2/test/stream.hpp @@ -141,7 +141,7 @@ class basic_stream void initiate_read( boost::shared_ptr const& in, - std::unique_ptr&& op, + std::unique_ptr&& op, std::size_t buf_size); @@ -149,7 +149,7 @@ class basic_stream initiate_write( boost::shared_ptr const& in, boost::weak_ptr const& out, - std::unique_ptr&& op, + std::unique_ptr&& op, std::size_t buf_size); #if ! BOOST_BEAST2_DOXYGEN diff --git a/test/unit/write.cpp b/test/unit/write.cpp index 9144e7ff..616bd18b 100644 --- a/test/unit/write.cpp +++ b/test/unit/write.cpp @@ -55,6 +55,7 @@ namespace beast2 { #include #include +#include #include #include #include @@ -94,8 +95,7 @@ class write_test boost::capy::polystore capy_ctx; http_proto::install_serializer_service(capy_ctx, {}); - // async_read_some completes when the parser reads - // the header section of the message. + // async_write_some completes when the serializer writes the message. { test::stream ts{ ioc }, tr{ ioc }; ts.connect(tr); @@ -115,7 +115,7 @@ class write_test [&](system::error_code ec, std::size_t n) { BOOST_TEST(!ec.failed()); - BOOST_TEST_EQ(n, 1); // minus body + BOOST_TEST_EQ(n, 1); }); test::run(ioc); } @@ -163,71 +163,76 @@ class write_test } } - // async_read_some cancellation + // async_write_some cancellation { - test::stream ts{ ioc }, tr{ ioc }; - ts.connect(tr); - ts.write_size(5); + boost::array ctypes = { + asio::cancellation_type::total, + asio::cancellation_type::partial, + asio::cancellation_type::terminal + }; - asio::cancellation_signal c_signal; + for(auto ctype : ctypes) + { + test::stream ts{ ioc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(5); - http_proto::serializer sr(capy_ctx); - sr.reset(); + asio::cancellation_signal c_signal; - http_proto::response res(headers); - sr.start(res, buffers::const_buffer(body.data(), body.size())); + http_proto::serializer sr(capy_ctx); + sr.reset(); - // async_read_some cancels after reading 0 bytes - async_write_some( - ts, - sr, - asio::bind_cancellation_slot( - c_signal.slot(), - [](system::error_code ec, std::size_t n) - { - BOOST_TEST_EQ(n, 5); - BOOST_TEST_EQ(ec, asio::error::operation_aborted); - })); - c_signal.emit(asio::cancellation_type::total); + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); - // Create a destination buffer - std::string s; - boost::buffers::string_buffer buf(&s); + // async_read_some cancels after reading 0 bytes + async_write_some( + ts, + sr, + asio::bind_cancellation_slot( + c_signal.slot(), + [](system::error_code ec, std::size_t n) + { + BOOST_TEST_EQ(n, 5); + BOOST_TEST_EQ(ec, asio::error::operation_aborted); + })); + c_signal.emit(ctype); - test::run(ioc); + // Create a destination buffer + std::string s; + boost::buffers::string_buffer buf(&s); - BOOST_TEST_EQ(tr.str(), "HTTP/"); - } - } + test::run(ioc); - void - testAsyncReadHeader() - { - // currently, async_read_header and - // async_read_some are identical + BOOST_TEST_EQ(tr.str(), "HTTP/"); + } + } } void - testAsyncRead() + testAsyncWrite() { boost::asio::io_context ioc; capy::polystore capy_ctx; - http_proto::install_parser_service(capy_ctx, {}); + http_proto::install_serializer_service(capy_ctx, {}); - // async_read completes when the parser reads + // async_write completes when the serializer writes // the entire message. + if(false) { - test::stream ts(ioc, msg); - http_proto::response_parser pr(capy_ctx); - pr.reset(); - pr.start(); + test::stream ts{ ioc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(1); - // limit async_read_some for better coverage - ts.read_size(1); + http_proto::serializer sr(capy_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); - async_read( + async_write( ts, - pr, + sr, [&](system::error_code ec, std::size_t n) { BOOST_TEST(!ec.failed()); @@ -236,38 +241,79 @@ class write_test test::run(ioc); - BOOST_TEST_EQ(ts.nread(), msg.size()); // because of ts.read_size(1) - BOOST_TEST(pr.is_complete()); - BOOST_TEST(pr.body() == "abc"); + BOOST_TEST_EQ(ts.nwrite(), msg.size()); // because of ts.write_size(1) + BOOST_TEST(sr.is_done()); + BOOST_TEST_EQ(tr.str(), msg); } - // async_read completes immediatly when - // parser contains enough data + // async_write reports stream errors + if(false) { - asio::post( - ioc, - [&]() + test::fail_count fc(3, asio::error::network_down); + test::stream ts{ ioc, fc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(1); + + http_proto::serializer sr(capy_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); + + async_write( + ts, + sr, + [&](system::error_code ec, std::size_t n) { - test::stream ts(ioc); - http_proto::response_parser pr(capy_ctx); - pr.reset(); - pr.start(); - - pr.commit(buffers::copy( - pr.prepare(), - buffers::const_buffer(msg.data(), msg.size()))); - - async_read( - ts, - pr, - asio::bind_immediate_executor( - ioc.get_executor(), test::success_handler())); - - BOOST_TEST_EQ(ts.nread(), 0); - BOOST_TEST(pr.is_complete()); - BOOST_TEST(pr.body() == "abc"); + BOOST_TEST_EQ(ec, asio::error::network_down); + BOOST_TEST_EQ(n, 2); }); - BOOST_TEST_EQ(test::run(ioc), 1); + test::run(ioc); + + auto expected = msg.substr(0, 2); + BOOST_TEST_EQ(tr.str(), expected); + + } + + // async_write cancellation + { + boost::array ctypes = { + asio::cancellation_type::total, + asio::cancellation_type::partial, + asio::cancellation_type::terminal + }; + + for(auto ctype : ctypes) + { + test::stream ts{ ioc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(5); + + asio::cancellation_signal c_signal; + + http_proto::serializer sr(capy_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); + + // async_read_some cancels after reading 0 bytes + async_write( + ts, + sr, + asio::bind_cancellation_slot( + c_signal.slot(), + [](system::error_code ec, std::size_t n) + { + BOOST_TEST_EQ(n, 5); + BOOST_TEST_EQ(ec, asio::error::operation_aborted); + })); + c_signal.emit(ctype); + + test::run(ioc); + + BOOST_TEST_EQ(tr.str(), "HTTP/"); + } } } @@ -275,8 +321,7 @@ class write_test run() { testAsyncWriteSome(); - //testAsyncReadHeader(); - //testAsyncRead(); + testAsyncWrite(); } }; From 774ba47e5ac370e6652e39a566cb1adad20fa4ca Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Mon, 1 Dec 2025 14:21:52 +0000 Subject: [PATCH 4/6] chore: more tests and refactoring --- include/boost/beast2/test/impl/stream.hpp | 349 ++++++++++------------ include/boost/beast2/test/stream.hpp | 1 + test/unit/write.cpp | 10 +- 3 files changed, 165 insertions(+), 195 deletions(-) diff --git a/include/boost/beast2/test/impl/stream.hpp b/include/boost/beast2/test/impl/stream.hpp index ed8ae594..872e8f67 100644 --- a/include/boost/beast2/test/impl/stream.hpp +++ b/include/boost/beast2/test/impl/stream.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -48,136 +49,172 @@ struct extract_executor_op return ex; } }; -} // detail -template -template -class basic_stream::read_op : public detail::stream_op_base +template +class lambda_base { - class lambda +protected: + Handler h_; + boost::weak_ptr iwp_; + boost::weak_ptr owp_; + Buffers b_; + asio::executor_work_guard< + asio::associated_executor_t> + wg2_; + + class cancellation_handler { - Handler h_; - boost::weak_ptr wp_; - Buffers b_; - asio::executor_work_guard< - asio::associated_executor_t> wg2_; - - class cancellation_handler + public: + explicit cancellation_handler( + boost::weak_ptr wp) + : wp_(std::move(wp)) { - public: - explicit - cancellation_handler( - boost::weak_ptr wp) - : wp_(std::move(wp)) - { - } + } - void - operator()(asio::cancellation_type type) const + void + operator()(asio::cancellation_type type) const + { + if(type != asio::cancellation_type::none) { - if(type != asio::cancellation_type::none) + if(auto sp = wp_.lock()) { - if(auto sp = wp_.lock()) + std::unique_ptr p; { - std::unique_ptr p; - { - std::lock_guard lock(sp->m); + std::lock_guard lock(sp->m); + if(Reader) p = std::move(sp->rop); - } - if(p != nullptr) - (*p)(asio::error::operation_aborted); + else + p = std::move(sp->wop); } + if(p != nullptr) + (*p)(asio::error::operation_aborted); } } + } - private: - boost::weak_ptr wp_; - }; + private: + boost::weak_ptr wp_; + }; - public: - template - lambda( - Handler_&& h, - boost::shared_ptr const& s, - Buffers const& b) - : h_(std::forward(h)) - , wp_(s) - , b_(b) - , wg2_(asio::get_associated_executor(h_, s->exec)) - { - auto c_slot = asio::get_associated_cancellation_slot(h_); - if (c_slot.is_connected()) - c_slot.template emplace(wp_); - } +public: + template + lambda_base( + Handler_&& h, + boost::shared_ptr const& in, + boost::weak_ptr const& out, + Buffers const& b) + : h_(std::forward(h)) + , iwp_(in) + , owp_(out) + , b_(b) + , wg2_(asio::get_associated_executor(h_, in->exec)) + { + auto c_slot = asio::get_associated_cancellation_slot(h_); + if(c_slot.is_connected()) + c_slot.template emplace(iwp_); + } - using allocator_type = asio::associated_allocator_t; + using allocator_type = asio::associated_allocator_t; - allocator_type get_allocator() const noexcept - { - return asio::get_associated_allocator(h_); - } + allocator_type + get_allocator() const noexcept + { + return asio::get_associated_allocator(h_); + } - using cancellation_slot_type = - asio::associated_cancellation_slot_t; + using cancellation_slot_type = + asio::associated_cancellation_slot_t; - cancellation_slot_type - get_cancellation_slot() const noexcept - { - return asio::get_associated_cancellation_slot(h_, - asio::cancellation_slot()); - } + cancellation_slot_type + get_cancellation_slot() const noexcept + { + return asio::get_associated_cancellation_slot( + h_, asio::cancellation_slot()); + } +}; + +} // detail + +template +template +class basic_stream::read_op : public detail::stream_op_base +{ + class lambda : public detail::lambda_base + { + public: + using base = detail::lambda_base; void operator()(system::error_code ec) { std::size_t bytes_transferred = 0; - auto sp = wp_.lock(); + auto sp = base::iwp_.lock(); if(! sp) { ec = asio::error::operation_aborted; } - if(! ec) + if(!ec) { std::lock_guard lock(sp->m); - BOOST_ASSERT(! sp->rop); + BOOST_ASSERT(!sp->rop); if(sp->b.size() > 0) { - bytes_transferred = - buffers::copy( - b_, sp->b.data(), sp->read_max); + bytes_transferred = buffers::copy( + base::b_, + sp->b.data(), + sp->read_max); sp->b.consume(bytes_transferred); sp->nread_bytes += bytes_transferred; } - else if (buffers::size(b_) > 0) + else if( + buffers::size( + base::b_) > 0) { ec = asio::error::eof; } } - asio::dispatch(wg2_.get_executor(), - asio::append(std::move(h_), ec, bytes_transferred)); - wg2_.reset(); + asio::dispatch( + base::wg2_.get_executor(), + asio::append(std::move(base::h_), ec, bytes_transferred)); + base::wg2_.reset(); + sp->rop.release(); + } + + template + lambda( + Handler_&& h, + boost::shared_ptr const& in, + boost::weak_ptr const& out, + Buffers const& b) + : base(std::forward(h), in, out, b) + { } }; - lambda fn_; + std::unique_ptr fnp_; asio::executor_work_guard wg1_; public: template read_op( Handler_&& h, - boost::shared_ptr const& s, + boost::shared_ptr const& in, + boost::weak_ptr const& out, Buffers const& b) - : fn_(std::forward(h), s, b) - , wg1_(s->exec) + : fnp_( + std::make_unique(std::forward(h), in, out, b)) + , wg1_(in->exec) { } void operator()(system::error_code ec) override { - asio::defer(wg1_.get_executor(), asio::append(std::move(fn_), ec)); + std::unique_ptr fnp(std::move(fnp_)); + if(fnp) + asio::post( + wg1_.get_executor(), asio::append(std::move(*fnp), ec)); wg1_.reset(); } }; @@ -186,93 +223,21 @@ template template class basic_stream::write_op : public detail::stream_op_base { - class lambda + class lambda : public detail::lambda_base { - Handler h_; - boost::weak_ptr iwp_; - boost::weak_ptr owp_; - Buffers b_; - asio::executor_work_guard< - asio::associated_executor_t> - wg2_; - - class cancellation_handler - { - public: - explicit cancellation_handler( - boost::weak_ptr wp) - : wp_(std::move(wp)) - { - } - - void - operator()(asio::cancellation_type type) const - { - if(type != asio::cancellation_type::none) - { - if(auto sp = wp_.lock()) - { - std::unique_ptr p; - { - std::lock_guard lock(sp->m); - p = std::move(sp->wop); - } - if(p != nullptr) - (*p)(asio::error::operation_aborted); - } - } - } - - private: - boost::weak_ptr wp_; - }; - public: - template - lambda( - Handler_&& h, - boost::shared_ptr const& in, - boost::weak_ptr const& out, - Buffers const& b) - : h_(std::forward(h)) - , iwp_(in) - , owp_(out) - , b_(b) - , wg2_(asio::get_associated_executor(h_, in->exec)) - { - auto c_slot = asio::get_associated_cancellation_slot(h_); - if(c_slot.is_connected()) - c_slot.template emplace(iwp_); - } - - using allocator_type = asio::associated_allocator_t; - - allocator_type - get_allocator() const noexcept - { - return asio::get_associated_allocator(h_); - } - - using cancellation_slot_type = - asio::associated_cancellation_slot_t; - - cancellation_slot_type - get_cancellation_slot() const noexcept - { - return asio::get_associated_cancellation_slot( - h_, asio::cancellation_slot()); - } + using base = detail::lambda_base; void operator()(system::error_code ec) { std::size_t bytes_transferred = 0; - auto isp = iwp_.lock(); + auto isp = base::iwp_.lock(); if(!isp) { ec = asio::error::operation_aborted; } - auto osp = owp_.lock(); + auto osp = base::owp_.lock(); if(!osp) { ec = asio::error::operation_aborted; @@ -281,10 +246,10 @@ class basic_stream::write_op : public detail::stream_op_base { // copy buffers std::size_t n = std::min( - buffers::size(b_), isp->write_max); + buffers::size(base::b_), isp->write_max); { std::lock_guard lock(osp->m); - n = buffers::copy(osp->b.prepare(n), b_); + n = buffers::copy(osp->b.prepare(n), base::b_); osp->b.commit(n); osp->nwrite_bytes += n; osp->notify_read(); @@ -293,11 +258,21 @@ class basic_stream::write_op : public detail::stream_op_base } asio::dispatch( - wg2_.get_executor(), - asio::append(std::move(h_), ec, bytes_transferred)); - wg2_.reset(); + base::wg2_.get_executor(), + asio::append(std::move(base::h_), ec, bytes_transferred)); + base::wg2_.reset(); isp->wop.release(); } + + template + lambda( + Handler_&& h, + boost::shared_ptr const& in, + boost::weak_ptr const& out, + Buffers const& b) + : base(std::forward(h), in, out, b) + { + } }; std::unique_ptr fnp_; @@ -328,14 +303,14 @@ class basic_stream::write_op : public detail::stream_op_base template struct basic_stream::run_read_op { - boost::shared_ptr const& in; + boost::shared_ptr const& in_; using executor_type = typename basic_stream::executor_type; executor_type get_executor() const noexcept { - return detail::extract_executor_op()(in->exec); + return detail::extract_executor_op()(in_->exec); } template< @@ -344,6 +319,7 @@ struct basic_stream::run_read_op void operator()( ReadHandler&& h, + boost::weak_ptr out, MutableBufferSequence const& buffers) { // If you get an error on the following line it means @@ -351,14 +327,11 @@ struct basic_stream::run_read_op // requirements for the handler. initiate_read( - in, - std::unique_ptr{ - new read_op< + in_, + out, + std::unique_ptr{ new read_op< typename std::decay::type, - MutableBufferSequence>( - std::move(h), - in, - buffers)}, + MutableBufferSequence>(std::move(h), in_, out, buffers) }, buffers::size(buffers)); } }; @@ -382,7 +355,7 @@ struct basic_stream::run_write_op void operator()( WriteHandler&& h, - boost::weak_ptr out_, + boost::weak_ptr out, ConstBufferSequence const& buffers) { // If you get an error on the following line it means @@ -391,10 +364,10 @@ struct basic_stream::run_write_op initiate_write( in_, - out_, + out, std::unique_ptr{ new write_op< typename std::decay::type, - ConstBufferSequence>(std::move(h), in_, out_, buffers) }, + ConstBufferSequence>(std::move(h), in_, out, buffers) }, buffers::size(buffers)); } }; @@ -419,6 +392,7 @@ async_read_some( void(system::error_code, std::size_t)>( run_read_op{in_}, handler, + out_, buffers); } @@ -468,19 +442,22 @@ auto basic_stream::get_executor() noexcept -> executor_type template void basic_stream::initiate_read( - boost::shared_ptr const& in_, + boost::shared_ptr const& in, + boost::weak_ptr const& out, std::unique_ptr&& rop, std::size_t buf_size) { - std::unique_lock lock(in_->m); + (void)out; + + std::unique_lock lock(in->m); - ++in_->nread; - if(in_->rop != nullptr) + ++in->nread; + if(in->rop != nullptr) BOOST_THROW_EXCEPTION(std::logic_error{ "in_->rop != nullptr" }); // test failure system::error_code ec; - if(in_->fc && in_->fc->fail(ec)) + if(in->fc && in->fc->fail(ec)) { lock.unlock(); (*rop)(ec); @@ -488,7 +465,7 @@ basic_stream::initiate_read( } // A request to read 0 bytes from a stream is a no-op. - if(buf_size == 0 || buffers::size(in_->b.data()) > 0) + if(buf_size == 0 || buffers::size(in->b.data()) > 0) { lock.unlock(); (*rop)(ec); @@ -496,7 +473,7 @@ basic_stream::initiate_read( } // deliver error - if(in_->code != detail::stream_status::ok) + if(in->code != detail::stream_status::ok) { lock.unlock(); (*rop)(asio::error::eof); @@ -504,26 +481,26 @@ basic_stream::initiate_read( } // complete when bytes available or closed - in_->rop = std::move(rop); + in->rop = std::move(rop); } //------------------------------------------------------------------------------ template void basic_stream::initiate_write( - boost::shared_ptr const& in_, - boost::weak_ptr const& out_, + boost::shared_ptr const& in, + boost::weak_ptr const& out, std::unique_ptr&& wop, std::size_t buf_size) { { - std::unique_lock lock(in_->m); + std::unique_lock lock(in->m); - ++in_->nwrite; + ++in->nwrite; // test failure system::error_code ec; - if(in_->fc && in_->fc->fail(ec)) + if(in->fc && in->fc->fail(ec)) { lock.unlock(); (*wop)(ec); @@ -539,18 +516,16 @@ void basic_stream::initiate_write( } // connection closed - auto out = out_.lock(); - if(!out) + auto osp = out.lock(); + if(!osp) { (*wop)(asio::error::connection_reset); return; } - std::unique_lock lock(out->m); - - in_->wop = std::move(wop); + in->wop = std::move(wop); //auto op = std::move(in_->wop); - in_->wop->operator()(system::error_code{}); + in->wop->operator()(system::error_code{}); } //------------------------------------------------------------------------------ diff --git a/include/boost/beast2/test/stream.hpp b/include/boost/beast2/test/stream.hpp index c3634340..9cf5051b 100644 --- a/include/boost/beast2/test/stream.hpp +++ b/include/boost/beast2/test/stream.hpp @@ -141,6 +141,7 @@ class basic_stream void initiate_read( boost::shared_ptr const& in, + boost::weak_ptr const& out, std::unique_ptr&& op, std::size_t buf_size); diff --git a/test/unit/write.cpp b/test/unit/write.cpp index 616bd18b..3b29f484 100644 --- a/test/unit/write.cpp +++ b/test/unit/write.cpp @@ -194,14 +194,10 @@ class write_test [](system::error_code ec, std::size_t n) { BOOST_TEST_EQ(n, 5); - BOOST_TEST_EQ(ec, asio::error::operation_aborted); + BOOST_TEST(!ec.failed()); })); c_signal.emit(ctype); - // Create a destination buffer - std::string s; - boost::buffers::string_buffer buf(&s); - test::run(ioc); BOOST_TEST_EQ(tr.str(), "HTTP/"); @@ -218,7 +214,6 @@ class write_test // async_write completes when the serializer writes // the entire message. - if(false) { test::stream ts{ ioc }, tr{ ioc }; ts.connect(tr); @@ -247,7 +242,6 @@ class write_test } // async_write reports stream errors - if(false) { test::fail_count fc(3, asio::error::network_down); test::stream ts{ ioc, fc }, tr{ ioc }; @@ -297,7 +291,7 @@ class write_test http_proto::response res(headers); sr.start(res, buffers::const_buffer(body.data(), body.size())); - // async_read_some cancels after reading 0 bytes + // cancel after writing async_write( ts, sr, From c0b0a974f4247937ec3676e6dd6be6195ea80c4a Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Mon, 1 Dec 2025 14:34:30 +0000 Subject: [PATCH 5/6] C++11 fix: make_unique not available --- include/boost/beast2/test/impl/stream.hpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/include/boost/beast2/test/impl/stream.hpp b/include/boost/beast2/test/impl/stream.hpp index 872e8f67..55ca5a80 100644 --- a/include/boost/beast2/test/impl/stream.hpp +++ b/include/boost/beast2/test/impl/stream.hpp @@ -202,8 +202,7 @@ class basic_stream::read_op : public detail::stream_op_base boost::shared_ptr const& in, boost::weak_ptr const& out, Buffers const& b) - : fnp_( - std::make_unique(std::forward(h), in, out, b)) + : fnp_(new lambda(std::forward(h), in, out, b)) , wg1_(in->exec) { } @@ -285,7 +284,7 @@ class basic_stream::write_op : public detail::stream_op_base boost::shared_ptr const& in, boost::weak_ptr const& out, Buffers const& b) - : fnp_(std::make_unique(std::forward(h), in, out, b)) + : fnp_(new lambda(std::forward(h), in, out, b)) , wg1_(in->exec) { } From fac560bbd8d0404e0e65942e02c25196952e2057 Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Mon, 1 Dec 2025 17:19:24 +0000 Subject: [PATCH 6/6] compiler warning fix --- test/unit/write.cpp | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/test/unit/write.cpp b/test/unit/write.cpp index 3b29f484..12bf78c9 100644 --- a/test/unit/write.cpp +++ b/test/unit/write.cpp @@ -165,11 +165,10 @@ class write_test // async_write_some cancellation { - boost::array ctypes = { - asio::cancellation_type::total, - asio::cancellation_type::partial, - asio::cancellation_type::terminal - }; + boost::array ctypes( + { asio::cancellation_type::total, + asio::cancellation_type::partial, + asio::cancellation_type::terminal }); for(auto ctype : ctypes) { @@ -271,11 +270,10 @@ class write_test // async_write cancellation { - boost::array ctypes = { - asio::cancellation_type::total, - asio::cancellation_type::partial, - asio::cancellation_type::terminal - }; + boost::array ctypes( + { asio::cancellation_type::total, + asio::cancellation_type::partial, + asio::cancellation_type::terminal }); for(auto ctype : ctypes) {