Skip to content

Commit 6eafcd1

Browse files
committed
wake up the receiver on shutdown
Signed-off-by: ADD-SP <qiqi.zhang@konghq.com>
1 parent 41665dc commit 6eafcd1

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

tokio-util/src/io/simplex.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,15 @@ impl AsyncWrite for Sender {
217217
/// The [`Receiver`] can still be used to read remaining data
218218
/// until all bytes have been consumed.
219219
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<()>> {
220-
let mut inner = self.inner.lock().unwrap();
221-
inner.close_sender();
220+
let maybe_waker = {
221+
let mut inner = self.inner.lock().unwrap();
222+
inner.close_sender()
223+
};
224+
225+
if let Some(waker) = maybe_waker {
226+
waker.wake();
227+
}
228+
222229
Poll::Ready(Ok(()))
223230
}
224231
}

tokio-util/tests/io_simplex.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use futures_test::task::noop_context;
33
use std::{future::Future, task::Poll};
44
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
55
use tokio_test::{assert_pending, assert_ready_err};
6+
use tokio_test::task::spawn;
67
use tokio_util::io::simplex;
78

89
/// Sanity check for single-threaded operation.
@@ -139,6 +140,26 @@ async fn shutdown_sender_1() {
139140
tx.shutdown().await.unwrap();
140141
}
141142

143+
/// The `Sender::poll_shutdown` should wake up the `Receiver`
144+
#[tokio::test]
145+
async fn shutdown_sender_2() {
146+
let (mut tx, mut rx) = simplex::new(32);
147+
148+
let mut buf = vec![];
149+
let mut read_task = spawn(rx.read_to_end(&mut buf));
150+
assert_pending!(read_task.poll());
151+
152+
tx.write_u8(1).await.unwrap();
153+
assert_pending!(read_task.poll());
154+
155+
assert!(!read_task.is_woken());
156+
tx.shutdown().await.unwrap();
157+
assert!(read_task.is_woken());
158+
159+
read_task.await.unwrap();
160+
assert_eq!(buf, vec![1]);
161+
}
162+
142163
/// Both `Sender` and `Receiver` should yield periodically
143164
/// in a tight-loop.
144165
#[tokio::test]

0 commit comments

Comments
 (0)