Skip to content

Commit d7f6d20

Browse files
committed
wake up the sender after dropping the receiver
Signed-off-by: ADD-SP <qiqi.zhang@konghq.com>
1 parent 95837ba commit d7f6d20

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

tokio-util/src/io/simplex.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,14 @@ pub struct Receiver {
9595
impl Drop for Receiver {
9696
/// This also wakes up the [`Sender`].
9797
fn drop(&mut self) {
98-
self.inner.lock().unwrap().close_receiver();
98+
let maybe_waker = {
99+
let mut inner = self.inner.lock().unwrap();
100+
inner.close_receiver()
101+
};
102+
103+
if let Some(waker) = maybe_waker {
104+
waker.wake();
105+
}
99106
}
100107
}
101108

tokio-util/tests/io_simplex.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use futures::pin_mut;
22
use futures_test::task::noop_context;
3-
use std::{future::Future, task::Poll};
3+
use std::task::Poll;
44
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
5-
use tokio_test::{assert_pending, assert_ready_err};
5+
use tokio_test::assert_pending;
66
use tokio_test::task::spawn;
77
use tokio_util::io::simplex;
88

@@ -58,30 +58,25 @@ fn multi_thread() {
5858
jh1.join().unwrap();
5959
}
6060

61-
/// The `Sender` should returns error if the read half has been dropped.
61+
/// The `Sender` should returns error if the `Receiver` has been dropped.
6262
#[tokio::test]
6363
async fn drop_receiver_0() {
64-
const MSG: &[u8] = b"Hello, world!";
65-
6664
let (mut tx, rx) = simplex::new(32);
6765
drop(rx);
6866

69-
tx.write_all(MSG).await.unwrap_err();
67+
tx.write_u8(1).await.unwrap_err();
7068
}
7169

72-
/// The `Sender` should be woken up if the read half is dropped.
70+
/// The `Sender` should be woken up if the `Receiver` has been dropped.
7371
#[tokio::test]
7472
async fn drop_receiver_1() {
75-
const MSG: &[u8] = b"Hello, world!";
76-
77-
// only set `1` capacity to make sure the write half will be blocked
78-
// by the read half
7973
let (mut tx, rx) = simplex::new(1);
80-
let fut = tx.write_all(MSG);
81-
pin_mut!(fut);
82-
assert_pending!(fut.as_mut().poll(&mut noop_context()));
74+
let mut write_task = spawn(tx.write_u16(1));
75+
assert_pending!(write_task.poll());
76+
77+
assert!(!write_task.is_woken());
8378
drop(rx);
84-
assert_ready_err!(fut.poll(&mut noop_context()));
79+
assert!(write_task.is_woken());
8580
}
8681

8782
/// The `Receiver` should returns error if:

0 commit comments

Comments
 (0)