Skip to content

Commit fac29b9

Browse files
pzhan9facebook-github-bot
authored andcommitted
Close NetRx write stream on exit (meta-pytorch#2003)
Summary: Currently, when `NetRx` exits, it does not close the write stream gracefully. Since `NetTx` is listening to this write stream for the ack messages, NetTx will see an `close_notify` error, and generate a log like this: > [-]E1125 06:50:46.476994 518989 fbcode/monarch/hyperactor/src/channel/net/client.rs:988] [net i/o loop{session:metatls:2401:db00:eef0:1120:3520:0:6c09:3cba:44243.17588939158573269181, connected:true, next_seq:2, largest_acked:AckedSeq { seq: 0, timestamp: "2s 805us 589ns" }, outbox:QueueValue::NonEmpty { len: 1, num_bytes_queued: 20, front: QueueEntryValue { seq: 1, since_received: ("817us 855ns",), since_sent: () }, back: QueueEntryValue { seq: 1, since_received: ("821us 911ns",), since_sent: () } }, unacked:QueueValue::Empty}] failed while receiving ack, dest:metatls:2401:db00:eef0:1120:3520:0:6c09:3cba:44243, session_id:17588939158573269181, error:peer closed connection without sending TLS close_notify: https://docs.rs/rustls/latest/rustls/manual/_03_howto/index.html#unexpected-eof This diff fixes that. Reviewed By: shayne-fletcher Differential Revision: D87863524
1 parent f974b26 commit fac29b9

File tree

3 files changed

+39
-24
lines changed

3 files changed

+39
-24
lines changed

hyperactor/src/channel/net/client.rs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -640,34 +640,24 @@ async fn run<M: RemoteMessage>(
640640
let _ = notify.send(TxStatus::Closed);
641641

642642
match conn {
643-
Conn::Connected { write_state, .. } => {
644-
let write_half = match write_state {
645-
WriteState::Writing(mut frame_writer, ()) => {
646-
if let Err(err) = frame_writer.send().await {
647-
tracing::info!(
648-
parent: &span,
649-
dest = %dest,
650-
error = %err,
651-
session_id = session_id,
652-
"write error during cleanup"
653-
);
654-
}
655-
Some(frame_writer.complete())
656-
}
657-
WriteState::Idle(writer) => Some(writer),
658-
WriteState::Broken => None,
659-
};
660-
661-
if let Some(mut w) = write_half {
662-
if let Err(err) = w.shutdown().await {
643+
Conn::Connected {
644+
mut write_state, ..
645+
} => {
646+
if let WriteState::Writing(frame_writer, ()) = &mut write_state {
647+
if let Err(err) = frame_writer.send().await {
663648
tracing::info!(
664649
parent: &span,
665650
dest = %dest,
666651
error = %err,
667652
session_id = session_id,
668-
"failed to shutdown NetTx write stream during cleanup"
653+
"write error during cleanup"
669654
);
670655
}
656+
};
657+
if let Some(mut w) = write_state.into_writer() {
658+
// Try to shutdown the connection gracefully. This is a best effort
659+
// operation, and we don't care if it fails.
660+
let _ = w.shutdown().await;
671661
}
672662
}
673663
Conn::Disconnected(_) => (),

hyperactor/src/channel/net/framed.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,21 @@ impl<W: AsyncWrite + Unpin, F: Buf, T> WriteState<W, F, T> {
373373
Self::Broken => panic!("illegal state"),
374374
}
375375
}
376+
377+
/// Consume the state and return the underlying writer, if the
378+
/// stream is not broken.
379+
///
380+
/// For `Idle`, this returns the stored writer. For `Writing`,
381+
/// this assumes no more frames will be sent and calls
382+
/// `complete()` to recover the writer. For `Broken`, this returns
383+
/// `None`.
384+
pub fn into_writer(self) -> Option<W> {
385+
match self {
386+
Self::Idle(w) => Some(w),
387+
Self::Writing(w, _) => Some(w.complete()),
388+
Self::Broken => None,
389+
}
390+
}
376391
}
377392

378393
#[cfg(test)]

hyperactor/src/channel/net/server.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use dashmap::DashMap;
2121
use dashmap::mapref::entry::Entry;
2222
use tokio::io::AsyncRead;
2323
use tokio::io::AsyncWrite;
24+
use tokio::io::AsyncWriteExt as _;
2425
use tokio::io::ReadHalf;
2526
use tokio::io::WriteHalf;
2627
use tokio::sync::mpsc;
@@ -80,7 +81,7 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
8081

8182
/// Handles a server side stream created during the `listen` loop.
8283
async fn process<M: RemoteMessage>(
83-
&mut self,
84+
mut self,
8485
session_id: u64,
8586
tx: mpsc::Sender<M>,
8687
cancel_token: CancellationToken,
@@ -421,6 +422,12 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
421422
};
422423
}
423424

425+
if let Some(mut w) = self.write_state.into_writer() {
426+
// Try to shutdown the connection gracefully. This is a best effort
427+
// operation, and we don't care if it fails.
428+
let _ = w.shutdown().await;
429+
}
430+
424431
(final_next, final_result)
425432
}
426433

@@ -524,14 +531,17 @@ impl SessionManager {
524531
}
525532
};
526533

534+
let source = conn.source.clone();
535+
let dest = conn.dest.clone();
536+
527537
let next = session_var.take().await;
528538
let (next, res) = conn.process(session_id, tx, cancel_token, next).await;
529539
session_var.put(next).await;
530540

531541
if let Err(ref err) = res {
532542
tracing::info!(
533-
source = %conn.source,
534-
dest = %conn.dest,
543+
source = %source,
544+
dest = %dest,
535545
error = ?err,
536546
session_id = session_id,
537547
"process encountered an error"

0 commit comments

Comments
 (0)