diff --git a/include/boost/beast2/impl/write.hpp b/include/boost/beast2/impl/write.hpp index 7eb10f1e..1e5e66c7 100644 --- a/include/boost/beast2/impl/write.hpp +++ b/include/boost/beast2/impl/write.hpp @@ -54,6 +54,9 @@ class write_some_op BOOST_ASIO_CORO_REENTER(*this) { + self.reset_cancellation_state( + asio::enable_total_cancellation()); + rv = sr_.prepare(); if(! rv) { @@ -82,6 +85,9 @@ class write_some_op } sr_.consume(bytes_transferred); + //if(!!self.cancelled() && !sr_.is_done()) + // ec = asio::error::operation_aborted; + upcall: self.complete( ec, bytes_transferred ); @@ -117,8 +123,24 @@ class write_op { BOOST_ASIO_CORO_REENTER(*this) { + self.reset_cancellation_state(asio::enable_total_cancellation()); + do { + if(!!self.cancelled()) + { + ec = asio::error::operation_aborted; + BOOST_ASIO_CORO_YIELD + { + BOOST_ASIO_HANDLER_LOCATION( + (__FILE__, __LINE__, "immediate")); + auto io_ex = self.get_io_executor(); + asio::async_immediate( + io_ex, asio::append(std::move(self), ec)); + } + break; // goto upcall + } + BOOST_ASIO_CORO_YIELD { BOOST_ASIO_HANDLER_LOCATION(( @@ -128,12 +150,13 @@ class write_op dest_, sr_, std::move(self)); } n_ += bytes_transferred; + if(ec.failed()) - break; + break; // goto upcall } while(! sr_.is_done()); - // upcall + // upcall: self.complete(ec, n_ ); } } diff --git a/include/boost/beast2/test/detail/stream_state.hpp b/include/boost/beast2/test/detail/stream_state.hpp index fc73055e..53bb5550 100644 --- a/include/boost/beast2/test/detail/stream_state.hpp +++ b/include/boost/beast2/test/detail/stream_state.hpp @@ -66,9 +66,9 @@ class stream_service //------------------------------------------------------------------------------ -struct stream_read_op_base +struct stream_op_base { - virtual ~stream_read_op_base() = default; + virtual ~stream_op_base() = default; virtual void operator()(system::error_code ec) = 0; }; @@ -89,8 +89,9 @@ struct stream_state std::mutex m; std::string storage; buffers::string_buffer b; - std::condition_variable cv; - std::unique_ptr op; + //std::condition_variable cv; + std::unique_ptr rop; + std::unique_ptr wop; stream_status code = stream_status::ok; fail_count* fc = nullptr; std::size_t nread = 0; @@ -133,14 +134,17 @@ void stream_service:: shutdown() { - std::vector> v; - std::lock_guard g1(sp_->m_); - v.reserve(sp_->v_.size()); - for(auto p : sp_->v_) + std::vector> v; { - std::lock_guard g2(p->m); - v.emplace_back(std::move(p->op)); - p->code = detail::stream_status::eof; + std::lock_guard g1(sp_->m_); + v.reserve(2 * sp_->v_.size()); + for(auto p : sp_->v_) + { + std::lock_guard g2(p->m); + v.emplace_back(std::move(p->rop)); + v.emplace_back(std::move(p->wop)); + p->code = detail::stream_status::eof; + } } } @@ -200,8 +204,11 @@ stream_state:: ~stream_state() { // cancel outstanding read - if(op != nullptr) - (*op)(asio::error::operation_aborted); + if(rop != nullptr) + (*rop)(asio::error::operation_aborted); + // cancel outstanding write + if(wop != nullptr) + (*wop)(asio::error::operation_aborted); } inline @@ -223,16 +230,13 @@ void stream_state:: notify_read() { - if(op) + if(rop) { - auto op_ = std::move(op); + auto op_ = std::move(rop); op_->operator()(system::error_code{}); } - else - { - cv.notify_all(); - } } + } // detail } // test } // beast2 diff --git a/include/boost/beast2/test/impl/stream.hpp b/include/boost/beast2/test/impl/stream.hpp index 5144609d..55ca5a80 100644 --- a/include/boost/beast2/test/impl/stream.hpp +++ b/include/boost/beast2/test/impl/stream.hpp @@ -16,7 +16,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -47,136 +49,252 @@ struct extract_executor_op return ex; } }; -} // detail -template -template -class basic_stream::read_op : public detail::stream_read_op_base +template +class lambda_base { - class lambda +protected: + Handler h_; + boost::weak_ptr iwp_; + boost::weak_ptr owp_; + Buffers b_; + asio::executor_work_guard< + asio::associated_executor_t> + wg2_; + + class cancellation_handler { - Handler h_; - boost::weak_ptr wp_; - Buffers b_; - asio::executor_work_guard< - asio::associated_executor_t> wg2_; - - class cancellation_handler + public: + explicit cancellation_handler( + boost::weak_ptr wp) + : wp_(std::move(wp)) { - public: - explicit - cancellation_handler( - boost::weak_ptr wp) - : wp_(std::move(wp)) - { - } + } - void - operator()(asio::cancellation_type type) const + void + operator()(asio::cancellation_type type) const + { + if(type != asio::cancellation_type::none) { - if(type != asio::cancellation_type::none) + if(auto sp = wp_.lock()) { - if(auto sp = wp_.lock()) + std::unique_ptr p; { - std::unique_ptr p; - { - std::lock_guard lock(sp->m); - p = std::move(sp->op); - } - if(p != nullptr) - (*p)(asio::error::operation_aborted); + std::lock_guard lock(sp->m); + if(Reader) + p = std::move(sp->rop); + else + p = std::move(sp->wop); } + if(p != nullptr) + (*p)(asio::error::operation_aborted); } } + } - private: - boost::weak_ptr wp_; - }; + private: + boost::weak_ptr wp_; + }; - public: - template - lambda( - Handler_&& h, - boost::shared_ptr const& s, - Buffers const& b) - : h_(std::forward(h)) - , wp_(s) - , b_(b) - , wg2_(asio::get_associated_executor(h_, s->exec)) - { - auto c_slot = asio::get_associated_cancellation_slot(h_); - if (c_slot.is_connected()) - c_slot.template emplace(wp_); - } +public: + template + lambda_base( + Handler_&& h, + boost::shared_ptr const& in, + boost::weak_ptr const& out, + Buffers const& b) + : h_(std::forward(h)) + , iwp_(in) + , owp_(out) + , b_(b) + , wg2_(asio::get_associated_executor(h_, in->exec)) + { + auto c_slot = asio::get_associated_cancellation_slot(h_); + if(c_slot.is_connected()) + c_slot.template emplace(iwp_); + } - using allocator_type = asio::associated_allocator_t; + using allocator_type = asio::associated_allocator_t; - allocator_type get_allocator() const noexcept - { - return asio::get_associated_allocator(h_); - } + allocator_type + get_allocator() const noexcept + { + return asio::get_associated_allocator(h_); + } - using cancellation_slot_type = - asio::associated_cancellation_slot_t; + using cancellation_slot_type = + asio::associated_cancellation_slot_t; - cancellation_slot_type - get_cancellation_slot() const noexcept - { - return asio::get_associated_cancellation_slot(h_, - asio::cancellation_slot()); - } + cancellation_slot_type + get_cancellation_slot() const noexcept + { + return asio::get_associated_cancellation_slot( + h_, asio::cancellation_slot()); + } +}; + +} // detail + +template +template +class basic_stream::read_op : public detail::stream_op_base +{ + class lambda : public detail::lambda_base + { + public: + using base = detail::lambda_base; void operator()(system::error_code ec) { std::size_t bytes_transferred = 0; - auto sp = wp_.lock(); + auto sp = base::iwp_.lock(); if(! sp) { ec = asio::error::operation_aborted; } - if(! ec) + if(!ec) { std::lock_guard lock(sp->m); - BOOST_ASSERT(! sp->op); + BOOST_ASSERT(!sp->rop); if(sp->b.size() > 0) { - bytes_transferred = - buffers::copy( - b_, sp->b.data(), sp->read_max); + bytes_transferred = buffers::copy( + base::b_, + sp->b.data(), + sp->read_max); sp->b.consume(bytes_transferred); sp->nread_bytes += bytes_transferred; } - else if (buffers::size(b_) > 0) + else if( + buffers::size( + base::b_) > 0) { ec = asio::error::eof; } } - asio::dispatch(wg2_.get_executor(), - asio::append(std::move(h_), ec, bytes_transferred)); - wg2_.reset(); + asio::dispatch( + base::wg2_.get_executor(), + asio::append(std::move(base::h_), ec, bytes_transferred)); + base::wg2_.reset(); + sp->rop.release(); + } + + template + lambda( + Handler_&& h, + boost::shared_ptr const& in, + boost::weak_ptr const& out, + Buffers const& b) + : base(std::forward(h), in, out, b) + { } }; - lambda fn_; + std::unique_ptr fnp_; asio::executor_work_guard wg1_; public: template read_op( Handler_&& h, - boost::shared_ptr const& s, + boost::shared_ptr const& in, + boost::weak_ptr const& out, + Buffers const& b) + : fnp_(new lambda(std::forward(h), in, out, b)) + , wg1_(in->exec) + { + } + + void + operator()(system::error_code ec) override + { + std::unique_ptr fnp(std::move(fnp_)); + if(fnp) + asio::post( + wg1_.get_executor(), asio::append(std::move(*fnp), ec)); + wg1_.reset(); + } +}; + +template +template +class basic_stream::write_op : public detail::stream_op_base +{ + class lambda : public detail::lambda_base + { + public: + using base = detail::lambda_base; + + void + operator()(system::error_code ec) + { + std::size_t bytes_transferred = 0; + auto isp = base::iwp_.lock(); + if(!isp) + { + ec = asio::error::operation_aborted; + } + auto osp = base::owp_.lock(); + if(!osp) + { + ec = asio::error::operation_aborted; + } + if(!ec) + { + // copy buffers + std::size_t n = std::min( + buffers::size(base::b_), isp->write_max); + { + std::lock_guard lock(osp->m); + n = buffers::copy(osp->b.prepare(n), base::b_); + osp->b.commit(n); + osp->nwrite_bytes += n; + osp->notify_read(); + } + bytes_transferred = n; + } + + asio::dispatch( + base::wg2_.get_executor(), + asio::append(std::move(base::h_), ec, bytes_transferred)); + base::wg2_.reset(); + isp->wop.release(); + } + + template + lambda( + Handler_&& h, + boost::shared_ptr const& in, + boost::weak_ptr const& out, + Buffers const& b) + : base(std::forward(h), in, out, b) + { + } + }; + + std::unique_ptr fnp_; + asio::executor_work_guard wg1_; + +public: + template + write_op( + Handler_&& h, + boost::shared_ptr const& in, + boost::weak_ptr const& out, Buffers const& b) - : fn_(std::forward(h), s, b) - , wg1_(s->exec) + : fnp_(new lambda(std::forward(h), in, out, b)) + , wg1_(in->exec) { } void operator()(system::error_code ec) override { - asio::post(wg1_.get_executor(), asio::append(std::move(fn_), ec)); + std::unique_ptr fnp(std::move(fnp_)); + if(fnp) + asio::post(wg1_.get_executor(), asio::append(std::move(*fnp), ec)); wg1_.reset(); } }; @@ -184,14 +302,14 @@ class basic_stream::read_op : public detail::stream_read_op_base template struct basic_stream::run_read_op { - boost::shared_ptr const& in; + boost::shared_ptr const& in_; using executor_type = typename basic_stream::executor_type; executor_type get_executor() const noexcept { - return detail::extract_executor_op()(in->exec); + return detail::extract_executor_op()(in_->exec); } template< @@ -200,6 +318,7 @@ struct basic_stream::run_read_op void operator()( ReadHandler&& h, + boost::weak_ptr out, MutableBufferSequence const& buffers) { // If you get an error on the following line it means @@ -207,14 +326,11 @@ struct basic_stream::run_read_op // requirements for the handler. initiate_read( - in, - std::unique_ptr{ - new read_op< + in_, + out, + std::unique_ptr{ new read_op< typename std::decay::type, - MutableBufferSequence>( - std::move(h), - in, - buffers)}, + MutableBufferSequence>(std::move(h), in_, out, buffers) }, buffers::size(buffers)); } }; @@ -238,46 +354,20 @@ struct basic_stream::run_write_op void operator()( WriteHandler&& h, - boost::weak_ptr out_, + boost::weak_ptr out, ConstBufferSequence const& buffers) { // If you get an error on the following line it means // that your handler does not meet the documented type // requirements for the handler. - ++in_->nwrite; - auto const upcall = [&](system::error_code ec, std::size_t n) - { - asio::post(in_->exec, asio::append(std::move(h), ec, n)); - }; - - // test failure - system::error_code ec; - std::size_t n = 0; - if(in_->fc && in_->fc->fail(ec)) - return upcall(ec, n); - - // A request to write 0 bytes to a stream is a no-op. - if(buffers::size(buffers) == 0) - return upcall(ec, n); - - // connection closed - auto out = out_.lock(); - if(! out) - return upcall(asio::error::connection_reset, n); - - // copy buffers - n = std::min( - buffers::size(buffers), in_->write_max); - { - std::lock_guard lock(out->m); - n = buffers::copy(out->b.prepare(n), buffers); - out->b.commit(n); - out->nwrite_bytes += n; - out->notify_read(); - } - BOOST_ASSERT(! ec); - upcall(ec, n); + initiate_write( + in_, + out, + std::unique_ptr{ new write_op< + typename std::decay::type, + ConstBufferSequence>(std::move(h), in_, out, buffers) }, + buffers::size(buffers)); } }; @@ -301,6 +391,7 @@ async_read_some( void(system::error_code, std::size_t)>( run_read_op{in_}, handler, + out_, buffers); } @@ -345,49 +436,95 @@ auto basic_stream::get_executor() noexcept -> executor_type return detail::extract_executor_op()(in_->exec); } - //------------------------------------------------------------------------------ template -void basic_stream::initiate_read( - boost::shared_ptr const& in_, - std::unique_ptr&& op, +void +basic_stream::initiate_read( + boost::shared_ptr const& in, + boost::weak_ptr const& out, + std::unique_ptr&& rop, std::size_t buf_size) { - std::unique_lock lock(in_->m); + (void)out; + + std::unique_lock lock(in->m); - ++in_->nread; - if(in_->op != nullptr) - BOOST_THROW_EXCEPTION( - std::logic_error{"in_->op != nullptr"}); + ++in->nread; + if(in->rop != nullptr) + BOOST_THROW_EXCEPTION(std::logic_error{ "in_->rop != nullptr" }); // test failure system::error_code ec; - if(in_->fc && in_->fc->fail(ec)) + if(in->fc && in->fc->fail(ec)) { lock.unlock(); - (*op)(ec); + (*rop)(ec); return; } // A request to read 0 bytes from a stream is a no-op. - if(buf_size == 0 || buffers::size(in_->b.data()) > 0) + if(buf_size == 0 || buffers::size(in->b.data()) > 0) { lock.unlock(); - (*op)(ec); + (*rop)(ec); return; } // deliver error - if(in_->code != detail::stream_status::ok) + if(in->code != detail::stream_status::ok) { lock.unlock(); - (*op)(asio::error::eof); + (*rop)(asio::error::eof); return; } // complete when bytes available or closed - in_->op = std::move(op); + in->rop = std::move(rop); +} + +//------------------------------------------------------------------------------ + +template +void basic_stream::initiate_write( + boost::shared_ptr const& in, + boost::weak_ptr const& out, + std::unique_ptr&& wop, + std::size_t buf_size) +{ + { + std::unique_lock lock(in->m); + + ++in->nwrite; + + // test failure + system::error_code ec; + if(in->fc && in->fc->fail(ec)) + { + lock.unlock(); + (*wop)(ec); + return; + } + } + + // A request to write 0 bytes to a stream is a no-op. + if(buf_size == 0) + { + (*wop)(system::error_code{}); + return; + } + + // connection closed + auto osp = out.lock(); + if(!osp) + { + (*wop)(asio::error::connection_reset); + return; + } + + in->wop = std::move(wop); + //auto op = std::move(in_->wop); + in->wop->operator()(system::error_code{}); } //------------------------------------------------------------------------------ diff --git a/include/boost/beast2/test/stream.hpp b/include/boost/beast2/test/stream.hpp index a5b636ea..9cf5051b 100644 --- a/include/boost/beast2/test/stream.hpp +++ b/include/boost/beast2/test/stream.hpp @@ -131,6 +131,9 @@ class basic_stream template class read_op; + template + class write_op; + struct run_read_op; struct run_write_op; @@ -138,7 +141,16 @@ class basic_stream void initiate_read( boost::shared_ptr const& in, - std::unique_ptr&& op, + boost::weak_ptr const& out, + std::unique_ptr&& op, + std::size_t buf_size); + + + static void + initiate_write( + boost::shared_ptr const& in, + boost::weak_ptr const& out, + std::unique_ptr&& op, std::size_t buf_size); #if ! BOOST_BEAST2_DOXYGEN diff --git a/test/unit/write.cpp b/test/unit/write.cpp index 90c5da40..12bf78c9 100644 --- a/test/unit/write.cpp +++ b/test/unit/write.cpp @@ -15,28 +15,309 @@ namespace boost { namespace beast2 { -class any_async_read_stream -{ -}; +//class any_async_read_stream +//{ +//}; +// +//class write_test +//{ +//public: +// void +// testWrite() +// { +// } +// +// void +// run() +// { +// testWrite(); +// } +//}; +// +//TEST_SUITE( +// write_test, +// "boost.beast2.write"); + +} // beast2 +} // boost + +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// Copyright (c) 2025 Mohammad Nejati +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/beast2 +// + +// Test that header file is self-contained. +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "test_helpers.hpp" + +#include + +namespace boost { +namespace beast2 { class write_test { + core::string_view const headers = + "HTTP/1.1 200 OK\r\n" + "Content-Length: 3\r\n" + "\r\n"; + core::string_view const body = + "abc"; + core::string_view const msg = + "HTTP/1.1 200 OK\r\n" + "Content-Length: 3\r\n" + "\r\n" + "abc"; + public: void - testWrite() + testAsyncWriteSome() { + boost::asio::io_context ioc; + boost::capy::polystore capy_ctx; + http_proto::install_serializer_service(capy_ctx, {}); + + // async_write_some completes when the serializer writes the message. + { + test::stream ts{ ioc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(1); + + http_proto::serializer sr(capy_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); + + for(std::size_t total = 0; total < msg.size(); total++) + { + async_write_some( + ts, + sr, + [&](system::error_code ec, std::size_t n) + { + BOOST_TEST(!ec.failed()); + BOOST_TEST_EQ(n, 1); + }); + test::run(ioc); + } + BOOST_TEST(sr.is_done()); + BOOST_TEST_EQ(tr.str(), msg); + + BOOST_TEST_EQ(tr.str(), msg); + } + + // async_write_some reports stream errors + { + test::fail_count fc(3, asio::error::network_down); + test::stream ts{ ioc, fc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(1); + + http_proto::serializer sr(capy_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); + + for(int count = 0; count < 3; count++) + { + async_write_some( + ts, + sr, + [&](system::error_code ec, std::size_t n) + { + if (count < 2) + { + BOOST_TEST(!ec.failed()); + BOOST_TEST_EQ(n, 1); + } + else + { + BOOST_TEST_EQ(ec, asio::error::network_down); + BOOST_TEST_EQ(n, 0); + } + }); + test::run(ioc); + + auto expected = msg.substr(0, (count == 0) ? 1 : 2); + BOOST_TEST_EQ(tr.str(), expected); + } + } + + // async_write_some cancellation + { + boost::array ctypes( + { asio::cancellation_type::total, + asio::cancellation_type::partial, + asio::cancellation_type::terminal }); + + for(auto ctype : ctypes) + { + test::stream ts{ ioc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(5); + + asio::cancellation_signal c_signal; + + http_proto::serializer sr(capy_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); + + // async_read_some cancels after reading 0 bytes + async_write_some( + ts, + sr, + asio::bind_cancellation_slot( + c_signal.slot(), + [](system::error_code ec, std::size_t n) + { + BOOST_TEST_EQ(n, 5); + BOOST_TEST(!ec.failed()); + })); + c_signal.emit(ctype); + + test::run(ioc); + + BOOST_TEST_EQ(tr.str(), "HTTP/"); + } + } + } + + void + testAsyncWrite() + { + boost::asio::io_context ioc; + capy::polystore capy_ctx; + http_proto::install_serializer_service(capy_ctx, {}); + + // async_write completes when the serializer writes + // the entire message. + { + test::stream ts{ ioc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(1); + + http_proto::serializer sr(capy_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); + + async_write( + ts, + sr, + [&](system::error_code ec, std::size_t n) + { + BOOST_TEST(!ec.failed()); + BOOST_TEST_EQ(n, msg.size()); + }); + + test::run(ioc); + + BOOST_TEST_EQ(ts.nwrite(), msg.size()); // because of ts.write_size(1) + BOOST_TEST(sr.is_done()); + BOOST_TEST_EQ(tr.str(), msg); + } + + // async_write reports stream errors + { + test::fail_count fc(3, asio::error::network_down); + test::stream ts{ ioc, fc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(1); + + http_proto::serializer sr(capy_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); + + async_write( + ts, + sr, + [&](system::error_code ec, std::size_t n) + { + BOOST_TEST_EQ(ec, asio::error::network_down); + BOOST_TEST_EQ(n, 2); + }); + test::run(ioc); + + auto expected = msg.substr(0, 2); + BOOST_TEST_EQ(tr.str(), expected); + + } + + // async_write cancellation + { + boost::array ctypes( + { asio::cancellation_type::total, + asio::cancellation_type::partial, + asio::cancellation_type::terminal }); + + for(auto ctype : ctypes) + { + test::stream ts{ ioc }, tr{ ioc }; + ts.connect(tr); + ts.write_size(5); + + asio::cancellation_signal c_signal; + + http_proto::serializer sr(capy_ctx); + sr.reset(); + + http_proto::response res(headers); + sr.start(res, buffers::const_buffer(body.data(), body.size())); + + // cancel after writing + async_write( + ts, + sr, + asio::bind_cancellation_slot( + c_signal.slot(), + [](system::error_code ec, std::size_t n) + { + BOOST_TEST_EQ(n, 5); + BOOST_TEST_EQ(ec, asio::error::operation_aborted); + })); + c_signal.emit(ctype); + + test::run(ioc); + + BOOST_TEST_EQ(tr.str(), "HTTP/"); + } + } } void run() { - testWrite(); + testAsyncWriteSome(); + testAsyncWrite(); } }; -TEST_SUITE( - write_test, - "boost.beast2.write"); +TEST_SUITE(write_test, "boost.beast2.write"); } // beast2 } // boost