Skip to content

Commit 95837ba

Browse files
committed
wake up the receiver after dropping the sender
Signed-off-by: ADD-SP <qiqi.zhang@konghq.com>
1 parent 6eafcd1 commit 95837ba

File tree

2 files changed

+21
-11
lines changed

2 files changed

+21
-11
lines changed

tokio-util/src/io/simplex.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,14 @@ pub struct Sender {
148148
impl Drop for Sender {
149149
/// This also wakes up the [`Receiver`].
150150
fn drop(&mut self) {
151-
self.inner.lock().unwrap().close_sender();
151+
let maybe_waker = {
152+
let mut inner = self.inner.lock().unwrap();
153+
inner.close_sender()
154+
};
155+
156+
if let Some(waker) = maybe_waker {
157+
waker.wake();
158+
}
152159
}
153160
}
154161

tokio-util/tests/io_simplex.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -102,20 +102,23 @@ async fn drop_sender_0() {
102102
/// The `Receiver` should be woken up if:
103103
///
104104
/// - The `Sender` has been dropped.
105-
/// - AND there is no sufficient data to read.
105+
/// - AND there is still remaining data in the buffer.
106106
#[tokio::test]
107107
async fn drop_sender_1() {
108-
const MSG: &[u8] = b"Hello, world!";
108+
let (mut tx, mut rx) = simplex::new(2);
109+
let mut buf = vec![];
110+
let mut read_task = spawn(rx.read_to_end(&mut buf));
111+
assert_pending!(read_task.poll());
109112

110-
// only set `1` capacity to make sure the write half will be blocked
111-
// by the read half
112-
let (tx, mut rx) = simplex::new(1);
113-
let mut buf = [0u8; MSG.len()];
114-
let fut = rx.read_exact(&mut buf);
115-
pin_mut!(fut);
116-
assert_pending!(fut.as_mut().poll(&mut noop_context()));
113+
tx.write_u8(1).await.unwrap();
114+
assert_pending!(read_task.poll());
115+
116+
assert!(!read_task.is_woken());
117117
drop(tx);
118-
assert_ready_err!(fut.poll(&mut noop_context()));
118+
assert!(read_task.is_woken());
119+
120+
read_task.await.unwrap();
121+
assert_eq!(buf, vec![1]);
119122
}
120123

121124
/// All following calls to `Sender::poll_write` and `Sender::poll_flush`

0 commit comments

Comments
 (0)