From 43805ab9986d37906555abd36f986c0e9ba1edc9 Mon Sep 17 00:00:00 2001 From: Qi Date: Sun, 31 Aug 2025 10:56:49 +0800 Subject: [PATCH 01/18] io: add `tokio_util::io::simplex` Signed-off-by: ADD-SP --- tokio-util/Cargo.toml | 2 +- tokio-util/src/io/mod.rs | 1 + tokio-util/src/io/simplex.rs | 233 +++++++++++++++++++++++++++++++++ tokio-util/tests/io_simplex.rs | 161 +++++++++++++++++++++++ 4 files changed, 396 insertions(+), 1 deletion(-) create mode 100644 tokio-util/src/io/simplex.rs create mode 100644 tokio-util/tests/io_simplex.rs diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 29ea79ef726..22c7e17c839 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -27,7 +27,7 @@ net = ["tokio/net"] compat = ["futures-io"] codec = [] time = ["tokio/time", "slab"] -io = [] +io = ["tokio/rt"] io-util = ["io", "tokio/rt", "tokio/io-util"] rt = ["tokio/rt", "tokio/sync", "futures-util"] join-map = ["rt", "hashbrown"] diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs index f5a182b5ce8..15a3b4011ee 100644 --- a/tokio-util/src/io/mod.rs +++ b/tokio-util/src/io/mod.rs @@ -14,6 +14,7 @@ mod copy_to_bytes; mod inspect; mod read_buf; mod reader_stream; +pub mod simplex; mod sink_writer; mod stream_reader; diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs new file mode 100644 index 00000000000..96169de3566 --- /dev/null +++ b/tokio-util/src/io/simplex.rs @@ -0,0 +1,233 @@ +//! Unidirectional byte-oriented channel. + +use bytes::Buf; +use bytes::BytesMut; +use futures_core::ready; +use std::io::Error as IoError; +use std::io::ErrorKind as IoErrorKind; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::task::{Context, Poll, Waker}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::task::coop::poll_proceed; + +type IoResult = Result; + +#[derive(Debug)] +struct Inner { + /// `poll_*` will return [`Poll::Pending`] if the backpressure boundary is reached + backpressure_boundary: usize, + + /// either [`Sender`] or [`Receiver`] is closed + is_closed: bool, + + /// Waker used to wake the [`Receiver`] + receiver_waker: Option, + + /// Waker used to wake the [`Sender`] + sender_waker: Option, + + /// Buffer used to read and write data + buf: BytesMut, +} + +impl Inner { + fn with_capacity(backpressure_boundary: usize) -> Self { + Self { + backpressure_boundary, + is_closed: false, + receiver_waker: None, + sender_waker: None, + buf: BytesMut::new(), + } + } + + fn register_receiver_waker(&mut self, waker: &Waker) { + match self.receiver_waker.as_mut() { + Some(old) if old.will_wake(waker) => {} + Some(old) => old.clone_from(waker), + None => self.receiver_waker = Some(waker.clone()), + } + } + + fn register_sender_waker(&mut self, waker: &Waker) { + match self.sender_waker.as_mut() { + Some(old) if old.will_wake(waker) => {} + Some(old) => old.clone_from(waker), + None => self.sender_waker = Some(waker.clone()), + } + } + + fn wake_receiver(&mut self) { + if let Some(waker) = self.receiver_waker.take() { + waker.wake(); + } + } + + fn wake_sender(&mut self) { + if let Some(waker) = self.sender_waker.take() { + waker.wake(); + } + } + + fn is_closed(&self) -> bool { + self.is_closed + } + + fn close_receiver(&mut self) { + self.is_closed = true; + self.wake_sender(); + } + + fn close_sender(&mut self) { + self.is_closed = true; + self.wake_receiver(); + } +} + +/// Receiver of the simplex channel. +/// +/// You can still read the remaining data from the buffer +/// even if the write half has been dropped. +/// See [`Sender::poll_shutdown`] and [`Sender::drop`] for more details. +#[derive(Debug)] +pub struct Receiver { + inner: Arc>, +} + +impl Drop for Receiver { + /// This also wakes up the [`Sender`]. + fn drop(&mut self) { + self.inner.lock().unwrap().close_receiver(); + } +} + +impl AsyncRead for Receiver { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let mut inner = self.inner.lock().unwrap(); + + let to_read = buf.remaining().min(inner.buf.remaining()); + if to_read == 0 { + return if inner.is_closed() { + Poll::Ready(Ok(())) + } else { + inner.register_receiver_waker(cx.waker()); + inner.wake_sender(); + Poll::Pending + }; + } + + ready!(poll_proceed(cx)).made_progress(); + + buf.put_slice(&inner.buf[..to_read]); + inner.buf.advance(to_read); + inner.wake_sender(); + Poll::Ready(Ok(())) + } +} + +/// Sender of the simplex channel. +/// +/// ## Shutdown +/// +/// See [`Sender::poll_shutdown`]. +#[derive(Debug)] +pub struct Sender { + inner: Arc>, +} + +impl Drop for Sender { + /// This also wakes up the [`Receiver`]. + fn drop(&mut self) { + self.inner.lock().unwrap().close_sender(); + } +} + +impl AsyncWrite for Sender { + /// # Error + /// + /// This method will return [`IoErrorKind::BrokenPipe`] + /// if the channel has been closed. + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let mut inner = self.inner.lock().unwrap(); + + if inner.is_closed() { + return Poll::Ready(Err(IoError::new( + IoErrorKind::BrokenPipe, + "simplex has been closed", + ))); + } + + let free = inner + .backpressure_boundary + .checked_sub(inner.buf.len()) + .expect("backpressure boundary overflow"); + let to_write = buf.len().min(free); + if to_write == 0 { + inner.register_sender_waker(cx.waker()); + inner.wake_receiver(); + return Poll::Pending; + } + + // this is to avoid starving other tasks + ready!(poll_proceed(cx)).made_progress(); + + inner.buf.extend_from_slice(&buf[..to_write]); + inner.wake_receiver(); + Poll::Ready(Ok(to_write)) + } + + /// # Error + /// + /// This method will return [`IoErrorKind::BrokenPipe`] + /// if the channel has been closed. + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let inner = self.inner.lock().unwrap(); + if inner.is_closed() { + Poll::Ready(Err(IoError::new( + IoErrorKind::BrokenPipe, + "simplex has been shut down", + ))) + } else { + Poll::Ready(Ok(())) + } + } + + /// After returns [`Poll::Ready`], all the following call to + /// [`Sender::poll_write`] and [`Sender::poll_flush`] + /// will return error. + /// + /// The [`Receiver`] can still be used to read remaining data + /// until all bytes have been consumed. + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let mut inner = self.inner.lock().unwrap(); + + if inner.is_closed() { + Poll::Ready(Err(IoError::new( + IoErrorKind::BrokenPipe, + "simplex has already been shut down, cannot be shut down again", + ))) + } else { + inner.close_sender(); + Poll::Ready(Ok(())) + } + } +} + +/// Create a simplex channel. +/// +/// The `capacity` parameter specifies the maximum number of bytes that can be +/// stored in the channel without making the [`Sender::poll_write`] +/// return [`Poll::Pending`]. +pub fn new(capacity: usize) -> (Sender, Receiver) { + let inner = Arc::new(Mutex::new(Inner::with_capacity(capacity))); + let tx = Sender { + inner: Arc::clone(&inner), + }; + let rx = Receiver { inner }; + (tx, rx) +} diff --git a/tokio-util/tests/io_simplex.rs b/tokio-util/tests/io_simplex.rs new file mode 100644 index 00000000000..912d0c2ea41 --- /dev/null +++ b/tokio-util/tests/io_simplex.rs @@ -0,0 +1,161 @@ +use futures::pin_mut; +use futures_test::task::noop_context; +use std::{future::Future, task::Poll}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; +use tokio_test::{assert_pending, assert_ready_err}; +use tokio_util::io::simplex; + +/// Sanity check for single-threaded operation. +#[tokio::test] +async fn single_thread() { + const N: usize = 64; + const MSG: &[u8] = b"Hello, world!"; + + let (mut tx, mut rx) = simplex::new(32); + + for _ in 0..N { + tx.write_all(MSG).await.unwrap(); + let mut buf = vec![0; MSG.len()]; + rx.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf[..], MSG); + } +} + +/// Sanity check for multi-threaded operation. +#[test] +#[cfg(not(target_os = "wasi"))] // No thread on wasi. +fn multi_thread() { + use futures::executor::block_on; + use std::thread; + + const N: usize = 64; + const MSG: &[u8] = b"Hello, world!"; + + let (mut tx, mut rx) = simplex::new(32); + + let jh0 = thread::spawn(move || { + block_on(async { + let mut buf = vec![0; MSG.len()]; + for _ in 0..N { + rx.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf[..], MSG); + buf.clear(); + buf.resize(MSG.len(), 0); + } + }); + }); + + let jh1 = thread::spawn(move || { + block_on(async { + for _ in 0..N { + tx.write_all(MSG).await.unwrap(); + } + }); + }); + + jh0.join().unwrap(); + jh1.join().unwrap(); +} + +/// The `Sender` should returns error if the read half has been dropped. +#[tokio::test] +async fn drop_receiver_0() { + const MSG: &[u8] = b"Hello, world!"; + + let (mut tx, rx) = simplex::new(32); + drop(rx); + + tx.write_all(MSG).await.unwrap_err(); +} + +/// The `Sender` should be woken up if the read half is dropped. +#[tokio::test] +async fn drop_receiver_1() { + const MSG: &[u8] = b"Hello, world!"; + + // only set `1` capacity to make sure the write half will be blocked + // by the read half + let (mut tx, rx) = simplex::new(1); + let fut = tx.write_all(MSG); + pin_mut!(fut); + assert_pending!(fut.as_mut().poll(&mut noop_context())); + drop(rx); + assert_ready_err!(fut.poll(&mut noop_context())); +} + +/// The `Receiver` should returns error if: +/// +/// - The `Sender` has been dropped. +/// - AND there is no remaining data in the buffer. +#[tokio::test] +async fn drop_sender_0() { + const MSG: &[u8] = b"Hello, world!"; + + let (tx, mut rx) = simplex::new(32); + drop(tx); + + let mut buf = vec![0; MSG.len()]; + rx.read_exact(&mut buf).await.unwrap_err(); +} + +/// The `Receiver` should be woken up if: +/// +/// - The `Sender` has been dropped. +/// - AND there is no sufficient data to read. +#[tokio::test] +async fn drop_sender_1() { + const MSG: &[u8] = b"Hello, world!"; + + // only set `1` capacity to make sure the write half will be blocked + // by the read half + let (tx, mut rx) = simplex::new(1); + let mut buf = [0u8; MSG.len()]; + let fut = rx.read_exact(&mut buf); + pin_mut!(fut); + assert_pending!(fut.as_mut().poll(&mut noop_context())); + drop(tx); + assert_ready_err!(fut.poll(&mut noop_context())); +} + +/// Both `Sender` and `Receiver` should yield periodically +/// in a tight-loop. +#[tokio::test] +async fn cooperative_scheduling() { + // this magic number is copied from + // https://github.com/tokio-rs/tokio/blob/925c614c89d0a26777a334612e2ed6ad0e7935c3/tokio/src/task/coop/mod.rs#L116 + const INITIAL_BUDGET: usize = 128; + + let (tx, _rx) = simplex::new(INITIAL_BUDGET * 2); + pin_mut!(tx); + let mut is_pending = false; + for _ in 0..INITIAL_BUDGET + 1 { + match tx.as_mut().poll_write(&mut noop_context(), &[0u8; 1]) { + Poll::Pending => { + is_pending = true; + break; + } + Poll::Ready(Ok(1)) => {} + Poll::Ready(Ok(n)) => panic!("wrote too many bytes: {n}"), + Poll::Ready(Err(e)) => panic!("{e}"), + } + } + assert!(is_pending); + + let (mut tx, rx) = simplex::new(INITIAL_BUDGET * 2); + tx.write_all(&[0u8; INITIAL_BUDGET + 2]).await.unwrap(); + pin_mut!(rx); + let mut is_pending = false; + for _ in 0..INITIAL_BUDGET + 1 { + let mut buf = [0u8; 1]; + let mut buf = ReadBuf::new(&mut buf); + match rx.as_mut().poll_read(&mut noop_context(), &mut buf) { + Poll::Pending => { + is_pending = true; + break; + } + Poll::Ready(Ok(())) => assert_eq!(buf.filled().len(), 1), + Poll::Ready(Err(e)) => panic!("{e}"), + } + } + assert!(is_pending); +} From 5b8b0198bae3a0d0f4ddd2e2846129485cc526cf Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 3 Sep 2025 21:42:52 +0800 Subject: [PATCH 02/18] io: unlock before waking up Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 46 +++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index 96169de3566..17e47aa30cf 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -15,7 +15,7 @@ type IoResult = Result; #[derive(Debug)] struct Inner { - /// `poll_*` will return [`Poll::Pending`] if the backpressure boundary is reached + /// `poll_write` will return [`Poll::Pending`] if the backpressure boundary is reached backpressure_boundary: usize, /// either [`Sender`] or [`Receiver`] is closed @@ -58,30 +58,26 @@ impl Inner { } } - fn wake_receiver(&mut self) { - if let Some(waker) = self.receiver_waker.take() { - waker.wake(); - } + fn take_receiver_waker(&mut self) -> Option { + self.receiver_waker.take() } - fn wake_sender(&mut self) { - if let Some(waker) = self.sender_waker.take() { - waker.wake(); - } + fn take_sender_waker(&mut self) -> Option { + self.sender_waker.take() } fn is_closed(&self) -> bool { self.is_closed } - fn close_receiver(&mut self) { + fn close_receiver(&mut self) -> Option { self.is_closed = true; - self.wake_sender(); + self.take_sender_waker() } - fn close_sender(&mut self) { + fn close_sender(&mut self) -> Option { self.is_closed = true; - self.wake_receiver(); + self.take_receiver_waker() } } @@ -116,7 +112,11 @@ impl AsyncRead for Receiver { Poll::Ready(Ok(())) } else { inner.register_receiver_waker(cx.waker()); - inner.wake_sender(); + let waker = inner.take_sender_waker(); + drop(inner); // unlock before waking up + if let Some(waker) = waker { + waker.wake(); + } Poll::Pending }; } @@ -125,7 +125,11 @@ impl AsyncRead for Receiver { buf.put_slice(&inner.buf[..to_read]); inner.buf.advance(to_read); - inner.wake_sender(); + let waker = inner.take_sender_waker(); + drop(inner); // unlock before waking up + if let Some(waker) = waker { + waker.wake(); + } Poll::Ready(Ok(())) } } @@ -169,7 +173,11 @@ impl AsyncWrite for Sender { let to_write = buf.len().min(free); if to_write == 0 { inner.register_sender_waker(cx.waker()); - inner.wake_receiver(); + let waker = inner.take_receiver_waker(); + drop(inner); // unlock before waking up + if let Some(waker) = waker { + waker.wake(); + } return Poll::Pending; } @@ -177,7 +185,11 @@ impl AsyncWrite for Sender { ready!(poll_proceed(cx)).made_progress(); inner.buf.extend_from_slice(&buf[..to_write]); - inner.wake_receiver(); + let waker = inner.take_receiver_waker(); + drop(inner); // unlock before waking up + if let Some(waker) = waker { + waker.wake(); + } Poll::Ready(Ok(to_write)) } From afc7e070a4366c53b4e738fff87ecf7e779710f5 Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 3 Sep 2025 21:43:20 +0800 Subject: [PATCH 03/18] io: make `Inner::with_capacity` more "with_capacity" Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index 17e47aa30cf..7f36c01d8bc 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -38,7 +38,7 @@ impl Inner { is_closed: false, receiver_waker: None, sender_waker: None, - buf: BytesMut::new(), + buf: BytesMut::with_capacity(backpressure_boundary), } } From 585a957916797aa81008ec418720ab06700ae312 Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 5 Nov 2025 19:43:01 +0800 Subject: [PATCH 04/18] do not include `tokio/rt` by default Signed-off-by: ADD-SP --- tokio-util/Cargo.toml | 2 +- tokio-util/src/cfg.rs | 9 +++++++++ tokio-util/src/io/simplex.rs | 7 ++++--- tokio-util/src/util/mod.rs | 21 +++++++++++++++++++++ tokio-util/tests/io_simplex.rs | 1 + 5 files changed, 36 insertions(+), 4 deletions(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index e9a5eb4fe97..aa00edc1e7d 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -27,7 +27,7 @@ net = ["tokio/net"] compat = ["futures-io"] codec = [] time = ["tokio/time", "slab"] -io = ["tokio/rt"] +io = [] io-util = ["io", "tokio/rt", "tokio/io-util"] rt = ["tokio/rt", "tokio/sync", "futures-util"] join-map = ["rt", "hashbrown"] diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index 4035255aff0..7957f482409 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -60,6 +60,15 @@ macro_rules! cfg_rt { } } +macro_rules! cfg_not_rt { + ($($item:item)*) => { + $( + #[cfg(not(feature = "rt"))] + $item + )* + } +} + macro_rules! cfg_time { ($($item:item)*) => { $( diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index 7f36c01d8bc..90d992a3eeb 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -1,5 +1,7 @@ //! Unidirectional byte-oriented channel. +use crate::util::poll_proceed_and_make_progress; + use bytes::Buf; use bytes::BytesMut; use futures_core::ready; @@ -9,7 +11,6 @@ use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio::task::coop::poll_proceed; type IoResult = Result; @@ -121,7 +122,7 @@ impl AsyncRead for Receiver { }; } - ready!(poll_proceed(cx)).made_progress(); + ready!(poll_proceed_and_make_progress(cx)); buf.put_slice(&inner.buf[..to_read]); inner.buf.advance(to_read); @@ -182,7 +183,7 @@ impl AsyncWrite for Sender { } // this is to avoid starving other tasks - ready!(poll_proceed(cx)).made_progress(); + ready!(poll_proceed_and_make_progress(cx)); inner.buf.extend_from_slice(&buf[..to_write]); let waker = inner.take_receiver_waker(); diff --git a/tokio-util/src/util/mod.rs b/tokio-util/src/util/mod.rs index a17f25a6b91..95390fc7bae 100644 --- a/tokio-util/src/util/mod.rs +++ b/tokio-util/src/util/mod.rs @@ -6,3 +6,24 @@ pub(crate) use maybe_dangling::MaybeDangling; #[cfg(any(feature = "io", feature = "codec"))] #[cfg_attr(not(feature = "io"), allow(unreachable_pub))] pub use poll_buf::{poll_read_buf, poll_write_buf}; + +cfg_rt! { + use std::task::{Context, Poll}; + use tokio::task::coop::poll_proceed; + use futures_core::ready; + + #[cfg_attr(not(feature = "io"), allow(unused))] + pub(crate) fn poll_proceed_and_make_progress(cx: &mut Context<'_>) -> Poll<()> { + ready!(poll_proceed(cx)).made_progress(); + Poll::Ready(()) + } +} + +cfg_not_rt! { + use std::task::{Context, Poll}; + + #[cfg_attr(not(feature = "io"), allow(unused))] + pub(crate) fn poll_proceed_and_make_progress(_cx: &mut Context<'_>) -> Poll<()> { + Poll::Ready(()) + } +} diff --git a/tokio-util/tests/io_simplex.rs b/tokio-util/tests/io_simplex.rs index 912d0c2ea41..0b19b894e69 100644 --- a/tokio-util/tests/io_simplex.rs +++ b/tokio-util/tests/io_simplex.rs @@ -120,6 +120,7 @@ async fn drop_sender_1() { /// Both `Sender` and `Receiver` should yield periodically /// in a tight-loop. #[tokio::test] +#[cfg(feature = "rt")] async fn cooperative_scheduling() { // this magic number is copied from // https://github.com/tokio-rs/tokio/blob/925c614c89d0a26777a334612e2ed6ad0e7935c3/tokio/src/task/coop/mod.rs#L116 From f6fc30a55c8f92054c43779bf4db02f024efe69f Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 5 Nov 2025 19:48:54 +0800 Subject: [PATCH 05/18] panics on zero capacity Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index 90d992a3eeb..a9dae7f5a6d 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -33,13 +33,13 @@ struct Inner { } impl Inner { - fn with_capacity(backpressure_boundary: usize) -> Self { + fn with_capacity(capacity: usize) -> Self { Self { - backpressure_boundary, + backpressure_boundary: capacity, is_closed: false, receiver_waker: None, sender_waker: None, - buf: BytesMut::with_capacity(backpressure_boundary), + buf: BytesMut::with_capacity(capacity), } } @@ -236,7 +236,13 @@ impl AsyncWrite for Sender { /// The `capacity` parameter specifies the maximum number of bytes that can be /// stored in the channel without making the [`Sender::poll_write`] /// return [`Poll::Pending`]. +/// +/// # Panics +/// +/// This function will panic if `capacity` is zero. pub fn new(capacity: usize) -> (Sender, Receiver) { + assert_ne!(capacity, 0, "capacity must be greater than zero"); + let inner = Arc::new(Mutex::new(Inner::with_capacity(capacity))); let tx = Sender { inner: Arc::clone(&inner), From 41665dc848bbed5bae536023d23290635c014022 Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 5 Nov 2025 20:09:17 +0800 Subject: [PATCH 06/18] poll_shutdown should be called multiple times without error Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 12 ++---------- tokio-util/tests/io_simplex.rs | 22 ++++++++++++++++++++++ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index a9dae7f5a6d..424f240a7e7 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -218,16 +218,8 @@ impl AsyncWrite for Sender { /// until all bytes have been consumed. fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { let mut inner = self.inner.lock().unwrap(); - - if inner.is_closed() { - Poll::Ready(Err(IoError::new( - IoErrorKind::BrokenPipe, - "simplex has already been shut down, cannot be shut down again", - ))) - } else { - inner.close_sender(); - Poll::Ready(Ok(())) - } + inner.close_sender(); + Poll::Ready(Ok(())) } } diff --git a/tokio-util/tests/io_simplex.rs b/tokio-util/tests/io_simplex.rs index 0b19b894e69..16025577942 100644 --- a/tokio-util/tests/io_simplex.rs +++ b/tokio-util/tests/io_simplex.rs @@ -117,6 +117,28 @@ async fn drop_sender_1() { assert_ready_err!(fut.poll(&mut noop_context())); } +/// All following calls to `Sender::poll_write` and `Sender::poll_flush` +/// should return error after `shutdown` has been called. +#[tokio::test] +async fn shutdown_sender_0() { + const MSG: &[u8] = b"Hello, world!"; + + let (mut tx, _rx) = simplex::new(32); + tx.shutdown().await.unwrap(); + + tx.write_all(MSG).await.unwrap_err(); + tx.flush().await.unwrap_err(); +} + +/// The `Sender::poll_shutdown` should be called multiple times +/// without error. +#[tokio::test] +async fn shutdown_sender_1() { + let (mut tx, _rx) = simplex::new(32); + tx.shutdown().await.unwrap(); + tx.shutdown().await.unwrap(); +} + /// Both `Sender` and `Receiver` should yield periodically /// in a tight-loop. #[tokio::test] From 6eafcd15e4adfa5afa1605635a19e15343ce0cd5 Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 5 Nov 2025 20:13:18 +0800 Subject: [PATCH 07/18] wake up the receiver on shutdown Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 11 +++++++++-- tokio-util/tests/io_simplex.rs | 21 +++++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index 424f240a7e7..2f00021987a 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -217,8 +217,15 @@ impl AsyncWrite for Sender { /// The [`Receiver`] can still be used to read remaining data /// until all bytes have been consumed. fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let mut inner = self.inner.lock().unwrap(); - inner.close_sender(); + let maybe_waker = { + let mut inner = self.inner.lock().unwrap(); + inner.close_sender() + }; + + if let Some(waker) = maybe_waker { + waker.wake(); + } + Poll::Ready(Ok(())) } } diff --git a/tokio-util/tests/io_simplex.rs b/tokio-util/tests/io_simplex.rs index 16025577942..60bebf75851 100644 --- a/tokio-util/tests/io_simplex.rs +++ b/tokio-util/tests/io_simplex.rs @@ -3,6 +3,7 @@ use futures_test::task::noop_context; use std::{future::Future, task::Poll}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio_test::{assert_pending, assert_ready_err}; +use tokio_test::task::spawn; use tokio_util::io::simplex; /// Sanity check for single-threaded operation. @@ -139,6 +140,26 @@ async fn shutdown_sender_1() { tx.shutdown().await.unwrap(); } +/// The `Sender::poll_shutdown` should wake up the `Receiver` +#[tokio::test] +async fn shutdown_sender_2() { + let (mut tx, mut rx) = simplex::new(32); + + let mut buf = vec![]; + let mut read_task = spawn(rx.read_to_end(&mut buf)); + assert_pending!(read_task.poll()); + + tx.write_u8(1).await.unwrap(); + assert_pending!(read_task.poll()); + + assert!(!read_task.is_woken()); + tx.shutdown().await.unwrap(); + assert!(read_task.is_woken()); + + read_task.await.unwrap(); + assert_eq!(buf, vec![1]); +} + /// Both `Sender` and `Receiver` should yield periodically /// in a tight-loop. #[tokio::test] From 95837ba0665bc24b6d07c5fac76b9748505dd51d Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 5 Nov 2025 20:17:42 +0800 Subject: [PATCH 08/18] wake up the receiver after dropping the sender Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 9 ++++++++- tokio-util/tests/io_simplex.rs | 23 +++++++++++++---------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index 2f00021987a..ffd87b3763f 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -148,7 +148,14 @@ pub struct Sender { impl Drop for Sender { /// This also wakes up the [`Receiver`]. fn drop(&mut self) { - self.inner.lock().unwrap().close_sender(); + let maybe_waker = { + let mut inner = self.inner.lock().unwrap(); + inner.close_sender() + }; + + if let Some(waker) = maybe_waker { + waker.wake(); + } } } diff --git a/tokio-util/tests/io_simplex.rs b/tokio-util/tests/io_simplex.rs index 60bebf75851..06b1a1b331c 100644 --- a/tokio-util/tests/io_simplex.rs +++ b/tokio-util/tests/io_simplex.rs @@ -102,20 +102,23 @@ async fn drop_sender_0() { /// The `Receiver` should be woken up if: /// /// - The `Sender` has been dropped. -/// - AND there is no sufficient data to read. +/// - AND there is still remaining data in the buffer. #[tokio::test] async fn drop_sender_1() { - const MSG: &[u8] = b"Hello, world!"; + let (mut tx, mut rx) = simplex::new(2); + let mut buf = vec![]; + let mut read_task = spawn(rx.read_to_end(&mut buf)); + assert_pending!(read_task.poll()); - // only set `1` capacity to make sure the write half will be blocked - // by the read half - let (tx, mut rx) = simplex::new(1); - let mut buf = [0u8; MSG.len()]; - let fut = rx.read_exact(&mut buf); - pin_mut!(fut); - assert_pending!(fut.as_mut().poll(&mut noop_context())); + tx.write_u8(1).await.unwrap(); + assert_pending!(read_task.poll()); + + assert!(!read_task.is_woken()); drop(tx); - assert_ready_err!(fut.poll(&mut noop_context())); + assert!(read_task.is_woken()); + + read_task.await.unwrap(); + assert_eq!(buf, vec![1]); } /// All following calls to `Sender::poll_write` and `Sender::poll_flush` From d7f6d20a3bef3b3821ce53f622b645ccde447994 Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 5 Nov 2025 20:21:34 +0800 Subject: [PATCH 09/18] wake up the sender after dropping the receiver Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 9 ++++++++- tokio-util/tests/io_simplex.rs | 25 ++++++++++--------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index ffd87b3763f..6c22d64107f 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -95,7 +95,14 @@ pub struct Receiver { impl Drop for Receiver { /// This also wakes up the [`Sender`]. fn drop(&mut self) { - self.inner.lock().unwrap().close_receiver(); + let maybe_waker = { + let mut inner = self.inner.lock().unwrap(); + inner.close_receiver() + }; + + if let Some(waker) = maybe_waker { + waker.wake(); + } } } diff --git a/tokio-util/tests/io_simplex.rs b/tokio-util/tests/io_simplex.rs index 06b1a1b331c..8367752457d 100644 --- a/tokio-util/tests/io_simplex.rs +++ b/tokio-util/tests/io_simplex.rs @@ -1,8 +1,8 @@ use futures::pin_mut; use futures_test::task::noop_context; -use std::{future::Future, task::Poll}; +use std::task::Poll; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; -use tokio_test::{assert_pending, assert_ready_err}; +use tokio_test::assert_pending; use tokio_test::task::spawn; use tokio_util::io::simplex; @@ -58,30 +58,25 @@ fn multi_thread() { jh1.join().unwrap(); } -/// The `Sender` should returns error if the read half has been dropped. +/// The `Sender` should returns error if the `Receiver` has been dropped. #[tokio::test] async fn drop_receiver_0() { - const MSG: &[u8] = b"Hello, world!"; - let (mut tx, rx) = simplex::new(32); drop(rx); - tx.write_all(MSG).await.unwrap_err(); + tx.write_u8(1).await.unwrap_err(); } -/// The `Sender` should be woken up if the read half is dropped. +/// The `Sender` should be woken up if the `Receiver` has been dropped. #[tokio::test] async fn drop_receiver_1() { - const MSG: &[u8] = b"Hello, world!"; - - // only set `1` capacity to make sure the write half will be blocked - // by the read half let (mut tx, rx) = simplex::new(1); - let fut = tx.write_all(MSG); - pin_mut!(fut); - assert_pending!(fut.as_mut().poll(&mut noop_context())); + let mut write_task = spawn(tx.write_u16(1)); + assert_pending!(write_task.poll()); + + assert!(!write_task.is_woken()); drop(rx); - assert_ready_err!(fut.poll(&mut noop_context())); + assert!(write_task.is_woken()); } /// The `Receiver` should returns error if: From 1e30455d09c6f80f691c1af8cd25f5c412b09bab Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 5 Nov 2025 22:29:51 +0800 Subject: [PATCH 10/18] fix hang forever issues 1. The `Sender` should not hang forever when the input buf is empty 2. The `Receiver` should not hange forever when the input buf is full Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 26 +++++++++++++++----------- tokio-util/tests/io_simplex.rs | 23 ++++++++++++++++++++++- 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index 6c22d64107f..5d7fe1cbd1e 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -116,17 +116,17 @@ impl AsyncRead for Receiver { let to_read = buf.remaining().min(inner.buf.remaining()); if to_read == 0 { - return if inner.is_closed() { - Poll::Ready(Ok(())) - } else { - inner.register_receiver_waker(cx.waker()); - let waker = inner.take_sender_waker(); - drop(inner); // unlock before waking up - if let Some(waker) = waker { - waker.wake(); - } - Poll::Pending - }; + if inner.is_closed() || buf.remaining() == 0 { + return Poll::Ready(Ok(())); + } + + inner.register_receiver_waker(cx.waker()); + let maybe_waker = inner.take_sender_waker(); + drop(inner); // unlock before waking up + if let Some(waker) = maybe_waker { + waker.wake(); + } + return Poll::Pending; } ready!(poll_proceed_and_make_progress(cx)); @@ -187,6 +187,10 @@ impl AsyncWrite for Sender { .expect("backpressure boundary overflow"); let to_write = buf.len().min(free); if to_write == 0 { + if buf.is_empty() { + return Poll::Ready(Ok(0)); + } + inner.register_sender_waker(cx.waker()); let waker = inner.take_receiver_waker(); drop(inner); // unlock before waking up diff --git a/tokio-util/tests/io_simplex.rs b/tokio-util/tests/io_simplex.rs index 8367752457d..c05cf79f73b 100644 --- a/tokio-util/tests/io_simplex.rs +++ b/tokio-util/tests/io_simplex.rs @@ -2,8 +2,8 @@ use futures::pin_mut; use futures_test::task::noop_context; use std::task::Poll; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; -use tokio_test::assert_pending; use tokio_test::task::spawn; +use tokio_test::{assert_pending, assert_ready}; use tokio_util::io::simplex; /// Sanity check for single-threaded operation. @@ -58,6 +58,27 @@ fn multi_thread() { jh1.join().unwrap(); } +/// The `Receiver::poll_read` should return `Poll::Ready(Ok(()))` +/// if the `ReadBuf` has zero remaining capacity. +#[tokio::test] +async fn read_buf_is_full() { + let (_tx, rx) = simplex::new(32); + let mut buf = ReadBuf::new(&mut []); + tokio::pin!(rx); + assert_ready!(rx.as_mut().poll_read(&mut noop_context(), &mut buf)).unwrap(); + assert_eq!(buf.filled().len(), 0); +} + +/// The `Sender::poll_write` should return `Poll::Ready(Ok(0))` +/// if the input buffer has zero length. +#[tokio::test] +async fn write_buf_is_empty() { + let (tx, _rx) = simplex::new(32); + tokio::pin!(tx); + let n = assert_ready!(tx.as_mut().poll_write(&mut noop_context(), &[])).unwrap(); + assert_eq!(n, 0); +} + /// The `Sender` should returns error if the `Receiver` has been dropped. #[tokio::test] async fn drop_receiver_0() { From 53c6d81b4ee41390669a7763936ea897c537fe81 Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 5 Nov 2025 22:32:08 +0800 Subject: [PATCH 11/18] unify the error message Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index 5d7fe1cbd1e..f30e85dc7bc 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -14,6 +14,8 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; type IoResult = Result; +const CLOSED_ERROR_MSG: &str = "simplex has been closed"; + #[derive(Debug)] struct Inner { /// `poll_write` will return [`Poll::Pending`] if the backpressure boundary is reached @@ -177,7 +179,7 @@ impl AsyncWrite for Sender { if inner.is_closed() { return Poll::Ready(Err(IoError::new( IoErrorKind::BrokenPipe, - "simplex has been closed", + CLOSED_ERROR_MSG, ))); } @@ -221,7 +223,7 @@ impl AsyncWrite for Sender { if inner.is_closed() { Poll::Ready(Err(IoError::new( IoErrorKind::BrokenPipe, - "simplex has been shut down", + CLOSED_ERROR_MSG, ))) } else { Poll::Ready(Ok(())) From 6eb96592de2279204652d1f854e0de948d37c8e5 Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 5 Nov 2025 22:48:41 +0800 Subject: [PATCH 12/18] add more tests Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 10 +--- tokio-util/tests/io_simplex.rs | 85 ++++++++++++++++++++++------------ 2 files changed, 58 insertions(+), 37 deletions(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index f30e85dc7bc..7e3e6f08f24 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -177,10 +177,7 @@ impl AsyncWrite for Sender { let mut inner = self.inner.lock().unwrap(); if inner.is_closed() { - return Poll::Ready(Err(IoError::new( - IoErrorKind::BrokenPipe, - CLOSED_ERROR_MSG, - ))); + return Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG))); } let free = inner @@ -221,10 +218,7 @@ impl AsyncWrite for Sender { fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { let inner = self.inner.lock().unwrap(); if inner.is_closed() { - Poll::Ready(Err(IoError::new( - IoErrorKind::BrokenPipe, - CLOSED_ERROR_MSG, - ))) + Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG))) } else { Poll::Ready(Ok(())) } diff --git a/tokio-util/tests/io_simplex.rs b/tokio-util/tests/io_simplex.rs index c05cf79f73b..79b79802be5 100644 --- a/tokio-util/tests/io_simplex.rs +++ b/tokio-util/tests/io_simplex.rs @@ -11,14 +11,31 @@ use tokio_util::io::simplex; async fn single_thread() { const N: usize = 64; const MSG: &[u8] = b"Hello, world!"; + const CAPS: &[usize] = &[1, MSG.len() / 2, MSG.len() - 1, MSG.len(), MSG.len() + 1]; + + // test different buffer capacities to cover edge cases + for &capacity in CAPS { + let (mut tx, mut rx) = simplex::new(capacity); + + for _ in 0..N { + let mut read = 0; + let mut write = 0; + let mut buf = [0; MSG.len()]; + + while read < MSG.len() || write < MSG.len() { + if write < MSG.len() { + let n = tx.write(&MSG[write..]).await.unwrap(); + write += n; + } + + if read < MSG.len() { + let n = rx.read(&mut buf[read..]).await.unwrap(); + read += n; + } + } - let (mut tx, mut rx) = simplex::new(32); - - for _ in 0..N { - tx.write_all(MSG).await.unwrap(); - let mut buf = vec![0; MSG.len()]; - rx.read_exact(&mut buf).await.unwrap(); - assert_eq!(&buf[..], MSG); + assert_eq!(&buf[..], MSG); + } } } @@ -31,31 +48,41 @@ fn multi_thread() { const N: usize = 64; const MSG: &[u8] = b"Hello, world!"; - - let (mut tx, mut rx) = simplex::new(32); - - let jh0 = thread::spawn(move || { - block_on(async { - let mut buf = vec![0; MSG.len()]; - for _ in 0..N { - rx.read_exact(&mut buf).await.unwrap(); - assert_eq!(&buf[..], MSG); - buf.clear(); - buf.resize(MSG.len(), 0); - } + const CAPS: &[usize] = &[1, MSG.len() / 2, MSG.len() - 1, MSG.len(), MSG.len() + 1]; + + // test different buffer capacities to cover edge cases + for &capacity in CAPS { + let (mut tx, mut rx) = simplex::new(capacity); + + let jh0 = thread::spawn(move || { + block_on(async { + let mut buf = vec![0; MSG.len()]; + for _ in 0..N { + rx.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf[..], MSG); + buf.clear(); + buf.resize(MSG.len(), 0); + } + }); }); - }); - let jh1 = thread::spawn(move || { - block_on(async { - for _ in 0..N { - tx.write_all(MSG).await.unwrap(); - } + let jh1 = thread::spawn(move || { + block_on(async { + for _ in 0..N { + tx.write_all(MSG).await.unwrap(); + } + }); }); - }); - jh0.join().unwrap(); - jh1.join().unwrap(); + jh0.join().unwrap(); + jh1.join().unwrap(); + } +} + +#[test] +#[should_panic(expected = "capacity must be greater than zero")] +fn zero_capacity() { + let _ = simplex::new(0); } /// The `Receiver::poll_read` should return `Poll::Ready(Ok(()))` @@ -100,7 +127,7 @@ async fn drop_receiver_1() { assert!(write_task.is_woken()); } -/// The `Receiver` should returns error if: +/// The `Receiver` should return error if: /// /// - The `Sender` has been dropped. /// - AND there is no remaining data in the buffer. From 45976a99fe90f8f9430e419eb82cb8b8010832d6 Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 5 Nov 2025 23:24:29 +0800 Subject: [PATCH 13/18] support poll_write_vectored Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 55 ++++++++++++++++++++++ tokio-util/tests/io_simplex.rs | 85 ++++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index 7e3e6f08f24..ff0197adfcb 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -7,6 +7,7 @@ use bytes::BytesMut; use futures_core::ready; use std::io::Error as IoError; use std::io::ErrorKind as IoErrorKind; +use std::io::IoSlice; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; @@ -242,6 +243,60 @@ impl AsyncWrite for Sender { Poll::Ready(Ok(())) } + + fn is_write_vectored(&self) -> bool { + true + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + let mut inner = self.inner.lock().unwrap(); + if inner.is_closed() { + return Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG))); + } + + let free = inner + .backpressure_boundary + .checked_sub(inner.buf.len()) + .expect("backpressure boundary overflow"); + if free == 0 { + inner.register_sender_waker(cx.waker()); + let maybe_waker = inner.take_receiver_waker(); + drop(inner); // unlock before waking up + if let Some(waker) = maybe_waker { + waker.wake(); + } + return Poll::Pending; + } + + let mut rem = free; + for buf in bufs { + if rem == 0 { + break; + } + + let to_write = buf.len().min(rem); + if to_write == 0 { + assert_ne!(rem, 0); + assert_eq!(buf.len(), 0); + continue; + } + + inner.buf.extend_from_slice(&buf[..to_write]); + rem -= to_write; + } + + let waker = inner.take_receiver_waker(); + drop(inner); // unlock before waking up + if let Some(waker) = waker { + waker.wake(); + } + + Poll::Ready(Ok(free - rem)) + } } /// Create a simplex channel. diff --git a/tokio-util/tests/io_simplex.rs b/tokio-util/tests/io_simplex.rs index 79b79802be5..123d36c9f0b 100644 --- a/tokio-util/tests/io_simplex.rs +++ b/tokio-util/tests/io_simplex.rs @@ -1,5 +1,6 @@ use futures::pin_mut; use futures_test::task::noop_context; +use std::io::IoSlice; use std::task::Poll; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio_test::task::spawn; @@ -249,3 +250,87 @@ async fn cooperative_scheduling() { } assert!(is_pending); } + +/// The capacity is exactly same as the total length of the vectored buffers. +#[tokio::test] +async fn poll_write_vectored_0() { + const MSG1: &[u8] = b"1"; + const MSG2: &[u8] = b"22"; + const MSG3: &[u8] = b"333"; + const MSG_LEN: usize = MSG1.len() + MSG2.len() + MSG3.len(); + + let io_slices = &[IoSlice::new(MSG1), IoSlice::new(MSG2), IoSlice::new(MSG3)]; + + let (tx, mut rx) = simplex::new(MSG_LEN); + tokio::pin!(tx); + let res = tx.poll_write_vectored(&mut noop_context(), io_slices); + let n = assert_ready!(res).unwrap(); + assert_eq!(n, MSG_LEN); + let mut buf = [0; MSG_LEN]; + let n = rx.read_exact(&mut buf).await.unwrap(); + assert_eq!(n, MSG_LEN); + assert_eq!(&buf, b"122333"); +} + +/// The capacity is smaller than the total length of the vectored buffers. +#[tokio::test] +async fn poll_write_vectored_1() { + const MSG1: &[u8] = b"1"; + const MSG2: &[u8] = b"22"; + const MSG3: &[u8] = b"333"; + const CAPACITY: usize = MSG1.len() + MSG2.len() + 1; + + let io_slices = &[IoSlice::new(MSG1), IoSlice::new(MSG2), IoSlice::new(MSG3)]; + + let (tx, mut rx) = simplex::new(CAPACITY); + tokio::pin!(tx); + + // ==== The poll_write_vectored should write MSG1 and MSG2 fully, and MSG3 partially. ==== + let res = tx.poll_write_vectored(&mut noop_context(), io_slices); + let n = assert_ready!(res).unwrap(); + assert_eq!(n, CAPACITY); + let mut buf = [0; CAPACITY]; + let n = rx.read_exact(&mut buf).await.unwrap(); + assert_eq!(n, CAPACITY); + assert_eq!(&buf, b"1223"); +} + +/// There are two empty buffers in the vectored buffers. +#[tokio::test] +async fn poll_write_vectored_2() { + const MSG1: &[u8] = b"1"; + const MSG2: &[u8] = b""; + const MSG3: &[u8] = b"22"; + const MSG4: &[u8] = b""; + const MSG5: &[u8] = b"333"; + const MSG_LEN: usize = MSG1.len() + MSG2.len() + MSG3.len() + MSG4.len() + MSG5.len(); + + let io_slices = &[ + IoSlice::new(MSG1), + IoSlice::new(MSG2), + IoSlice::new(MSG3), + IoSlice::new(MSG4), + IoSlice::new(MSG5), + ]; + + let (tx, mut rx) = simplex::new(MSG_LEN); + tokio::pin!(tx); + let res = tx.poll_write_vectored(&mut noop_context(), io_slices); + let n = assert_ready!(res).unwrap(); + assert_eq!(n, MSG_LEN); + let mut buf = [0; MSG_LEN]; + let n = rx.read_exact(&mut buf).await.unwrap(); + assert_eq!(n, MSG_LEN); + assert_eq!(&buf, b"122333"); +} + +/// The `Sender::poll_write_vectored` should return `Poll::Ready(Ok(0))` +/// if all the input buffers have zero length. +#[tokio::test] +async fn poll_write_vectored_3() { + let io_slices = &[IoSlice::new(&[]), IoSlice::new(&[]), IoSlice::new(&[])]; + let (tx, _rx) = simplex::new(32); + tokio::pin!(tx); + let n = assert_ready!(tx.poll_write_vectored(&mut noop_context(), io_slices)).unwrap(); + assert_eq!(n, 0); +} From d35012bbe988a811995af6bf3686f9f09a4bbfde Mon Sep 17 00:00:00 2001 From: Qi Date: Thu, 6 Nov 2025 19:21:51 +0800 Subject: [PATCH 14/18] fix typos Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index ff0197adfcb..86719461522 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -170,7 +170,7 @@ impl Drop for Sender { } impl AsyncWrite for Sender { - /// # Error + /// # Errors /// /// This method will return [`IoErrorKind::BrokenPipe`] /// if the channel has been closed. @@ -212,7 +212,7 @@ impl AsyncWrite for Sender { Poll::Ready(Ok(to_write)) } - /// # Error + /// # Errors /// /// This method will return [`IoErrorKind::BrokenPipe`] /// if the channel has been closed. From d9809a88845bee2e9be41e4932b63864f43032ce Mon Sep 17 00:00:00 2001 From: Qi Date: Thu, 6 Nov 2025 19:41:47 +0800 Subject: [PATCH 15/18] adopt coop for poll_write_vectored Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 2 ++ tokio-util/tests/io_simplex.rs | 17 +++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index 86719461522..94939dbd10c 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -272,6 +272,8 @@ impl AsyncWrite for Sender { return Poll::Pending; } + ready!(poll_proceed_and_make_progress(cx)); + let mut rem = free; for buf in bufs { if rem == 0 { diff --git a/tokio-util/tests/io_simplex.rs b/tokio-util/tests/io_simplex.rs index 123d36c9f0b..fa98a0fb721 100644 --- a/tokio-util/tests/io_simplex.rs +++ b/tokio-util/tests/io_simplex.rs @@ -232,6 +232,23 @@ async fn cooperative_scheduling() { } assert!(is_pending); + let (tx, _rx) = simplex::new(INITIAL_BUDGET * 2); + pin_mut!(tx); + let mut is_pending = false; + let io_slices = &[IoSlice::new(&[0u8; 1])]; + for _ in 0..INITIAL_BUDGET + 1 { + match tx.as_mut().poll_write_vectored(&mut noop_context(), io_slices) { + Poll::Pending => { + is_pending = true; + break; + } + Poll::Ready(Ok(1)) => {} + Poll::Ready(Ok(n)) => panic!("wrote too many bytes: {n}"), + Poll::Ready(Err(e)) => panic!("{e}"), + } + } + assert!(is_pending); + let (mut tx, rx) = simplex::new(INITIAL_BUDGET * 2); tx.write_all(&[0u8; INITIAL_BUDGET + 2]).await.unwrap(); pin_mut!(rx); From f954e93076f0f9c5d50898e32a2087003c3511c5 Mon Sep 17 00:00:00 2001 From: Qi Date: Thu, 6 Nov 2025 19:45:00 +0800 Subject: [PATCH 16/18] fix rustfmt reports Signed-off-by: ADD-SP --- tokio-util/tests/io_simplex.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tokio-util/tests/io_simplex.rs b/tokio-util/tests/io_simplex.rs index fa98a0fb721..0b54b79864a 100644 --- a/tokio-util/tests/io_simplex.rs +++ b/tokio-util/tests/io_simplex.rs @@ -237,7 +237,10 @@ async fn cooperative_scheduling() { let mut is_pending = false; let io_slices = &[IoSlice::new(&[0u8; 1])]; for _ in 0..INITIAL_BUDGET + 1 { - match tx.as_mut().poll_write_vectored(&mut noop_context(), io_slices) { + match tx + .as_mut() + .poll_write_vectored(&mut noop_context(), io_slices) + { Poll::Pending => { is_pending = true; break; From 555e13cf44c2bd97d060ea4a59e8f75504f86d00 Mon Sep 17 00:00:00 2001 From: Qi Date: Thu, 20 Nov 2025 09:33:29 +0800 Subject: [PATCH 17/18] fix coop and unblock before dropping old waker Signed-off-by: ADD-SP --- tokio-util/src/io/simplex.rs | 57 ++++++++++++++++++++++++------------ tokio-util/src/util/mod.rs | 20 +++++++------ 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/tokio-util/src/io/simplex.rs b/tokio-util/src/io/simplex.rs index 94939dbd10c..bdf803fbad2 100644 --- a/tokio-util/src/io/simplex.rs +++ b/tokio-util/src/io/simplex.rs @@ -1,6 +1,6 @@ //! Unidirectional byte-oriented channel. -use crate::util::poll_proceed_and_make_progress; +use crate::util::poll_proceed; use bytes::Buf; use bytes::BytesMut; @@ -46,19 +46,17 @@ impl Inner { } } - fn register_receiver_waker(&mut self, waker: &Waker) { + fn register_receiver_waker(&mut self, waker: &Waker) -> Option { match self.receiver_waker.as_mut() { - Some(old) if old.will_wake(waker) => {} - Some(old) => old.clone_from(waker), - None => self.receiver_waker = Some(waker.clone()), + Some(old) if old.will_wake(waker) => None, + _ => self.receiver_waker.replace(waker.clone()), } } - fn register_sender_waker(&mut self, waker: &Waker) { + fn register_sender_waker(&mut self, waker: &Waker) -> Option { match self.sender_waker.as_mut() { - Some(old) if old.will_wake(waker) => {} - Some(old) => old.clone_from(waker), - None => self.sender_waker = Some(waker.clone()), + Some(old) if old.will_wake(waker) => None, + _ => self.sender_waker.replace(waker.clone()), } } @@ -115,6 +113,8 @@ impl AsyncRead for Receiver { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { + let coop = ready!(poll_proceed(cx)); + let mut inner = self.inner.lock().unwrap(); let to_read = buf.remaining().min(inner.buf.remaining()); @@ -123,24 +123,30 @@ impl AsyncRead for Receiver { return Poll::Ready(Ok(())); } - inner.register_receiver_waker(cx.waker()); + let old_waker = inner.register_receiver_waker(cx.waker()); let maybe_waker = inner.take_sender_waker(); - drop(inner); // unlock before waking up + + // unlock before waking up and dropping old waker + drop(inner); + drop(old_waker); if let Some(waker) = maybe_waker { waker.wake(); } return Poll::Pending; } - ready!(poll_proceed_and_make_progress(cx)); + // this is to avoid starving other tasks + coop.made_progress(); buf.put_slice(&inner.buf[..to_read]); inner.buf.advance(to_read); + let waker = inner.take_sender_waker(); drop(inner); // unlock before waking up if let Some(waker) = waker { waker.wake(); } + Poll::Ready(Ok(())) } } @@ -175,6 +181,8 @@ impl AsyncWrite for Sender { /// This method will return [`IoErrorKind::BrokenPipe`] /// if the channel has been closed. fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let coop = ready!(poll_proceed(cx)); + let mut inner = self.inner.lock().unwrap(); if inner.is_closed() { @@ -191,24 +199,30 @@ impl AsyncWrite for Sender { return Poll::Ready(Ok(0)); } - inner.register_sender_waker(cx.waker()); + let old_waker = inner.register_sender_waker(cx.waker()); let waker = inner.take_receiver_waker(); - drop(inner); // unlock before waking up + + // unlock before waking up and dropping old waker + drop(inner); + drop(old_waker); if let Some(waker) = waker { waker.wake(); } + return Poll::Pending; } // this is to avoid starving other tasks - ready!(poll_proceed_and_make_progress(cx)); + coop.made_progress(); inner.buf.extend_from_slice(&buf[..to_write]); + let waker = inner.take_receiver_waker(); drop(inner); // unlock before waking up if let Some(waker) = waker { waker.wake(); } + Poll::Ready(Ok(to_write)) } @@ -253,6 +267,8 @@ impl AsyncWrite for Sender { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { + let coop = ready!(poll_proceed(cx)); + let mut inner = self.inner.lock().unwrap(); if inner.is_closed() { return Poll::Ready(Err(IoError::new(IoErrorKind::BrokenPipe, CLOSED_ERROR_MSG))); @@ -263,16 +279,21 @@ impl AsyncWrite for Sender { .checked_sub(inner.buf.len()) .expect("backpressure boundary overflow"); if free == 0 { - inner.register_sender_waker(cx.waker()); + let old_waker = inner.register_sender_waker(cx.waker()); let maybe_waker = inner.take_receiver_waker(); - drop(inner); // unlock before waking up + + // unlock before waking up and dropping old waker + drop(inner); + drop(old_waker); if let Some(waker) = maybe_waker { waker.wake(); } + return Poll::Pending; } - ready!(poll_proceed_and_make_progress(cx)); + // this is to avoid starving other tasks + coop.made_progress(); let mut rem = free; for buf in bufs { diff --git a/tokio-util/src/util/mod.rs b/tokio-util/src/util/mod.rs index 95390fc7bae..0c671085007 100644 --- a/tokio-util/src/util/mod.rs +++ b/tokio-util/src/util/mod.rs @@ -8,22 +8,24 @@ pub(crate) use maybe_dangling::MaybeDangling; pub use poll_buf::{poll_read_buf, poll_write_buf}; cfg_rt! { - use std::task::{Context, Poll}; - use tokio::task::coop::poll_proceed; - use futures_core::ready; - #[cfg_attr(not(feature = "io"), allow(unused))] - pub(crate) fn poll_proceed_and_make_progress(cx: &mut Context<'_>) -> Poll<()> { - ready!(poll_proceed(cx)).made_progress(); - Poll::Ready(()) - } + pub(crate) use tokio::task::coop::poll_proceed; } cfg_not_rt! { + #[cfg_attr(not(feature = "io"), allow(unused))] use std::task::{Context, Poll}; #[cfg_attr(not(feature = "io"), allow(unused))] - pub(crate) fn poll_proceed_and_make_progress(_cx: &mut Context<'_>) -> Poll<()> { + pub(crate) struct RestoreOnPending; + + #[cfg_attr(not(feature = "io"), allow(unused))] + impl RestoreOnPending { + pub(crate) fn made_progress(&self) {} + } + + #[cfg_attr(not(feature = "io"), allow(unused))] + pub(crate) fn poll_proceed(_cx: &mut Context<'_>) -> Poll { Poll::Ready(()) } } From 1f2e6c70a4a292a1bfab8e590115e118036e7b60 Mon Sep 17 00:00:00 2001 From: Qi Date: Thu, 20 Nov 2025 09:43:21 +0800 Subject: [PATCH 18/18] fixup! fix coop and unblock before dropping old waker --- tokio-util/src/util/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/util/mod.rs b/tokio-util/src/util/mod.rs index 0c671085007..aaba542c289 100644 --- a/tokio-util/src/util/mod.rs +++ b/tokio-util/src/util/mod.rs @@ -26,6 +26,6 @@ cfg_not_rt! { #[cfg_attr(not(feature = "io"), allow(unused))] pub(crate) fn poll_proceed(_cx: &mut Context<'_>) -> Poll { - Poll::Ready(()) + Poll::Ready(RestoreOnPending) } }