Skip to content

Commit 5a6466e

Browse files
pzhan9facebook-github-bot
authored andcommitted
Remove unused flag CHANNEL_MULTIPART (#1994)
Summary: `CHANNEL_MULTIPART` has been set to true for a long time. It is okay to remove it. Reviewed By: dulinriley, thomasywang Differential Revision: D87839991
1 parent f974b26 commit 5a6466e

File tree

7 files changed

+13
-102
lines changed

7 files changed

+13
-102
lines changed

hyperactor/src/channel/net.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@ use super::*;
6464
use crate::RemoteMessage;
6565
use crate::clock::Clock;
6666
use crate::clock::RealClock;
67-
use crate::config;
68-
use crate::config::CHANNEL_MULTIPART;
6967

7068
mod client;
7169
mod framed;
@@ -120,18 +118,6 @@ fn deserialize_response(data: Bytes) -> Result<NetRxResponse, bincode::Error> {
120118
bincode::deserialize(&data)
121119
}
122120

123-
/// Serializes using the "illegal" multipart encoding whenever multipart
124-
/// is not enabled.
125-
fn serialize_bincode<S: ?Sized + serde::Serialize>(
126-
value: &S,
127-
) -> Result<serde_multipart::Message, bincode::Error> {
128-
if config::global::get(CHANNEL_MULTIPART) {
129-
serde_multipart::serialize_bincode(value)
130-
} else {
131-
serde_multipart::serialize_illegal_bincode(value)
132-
}
133-
}
134-
135121
/// A Tx implemented on top of a Link. The Tx manages the link state,
136122
/// reconnections, etc.
137123
#[derive(Debug)]
@@ -914,6 +900,7 @@ mod tests {
914900
use crate::channel::net::framed::FrameWrite;
915901
use crate::channel::net::server::ServerConn;
916902
use crate::channel::net::server::SessionManager;
903+
use crate::config;
917904
use crate::metrics;
918905
use crate::sync::mvar::MVar;
919906

hyperactor/src/channel/net/client.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ use crate::channel::net::NetRxResponse;
4343
use crate::channel::net::NetTx;
4444
use crate::channel::net::Stream;
4545
use crate::channel::net::deserialize_response;
46-
use crate::channel::net::serialize_bincode;
4746
use crate::clock::Clock;
4847
use crate::clock::RealClock;
4948
use crate::config;
@@ -265,7 +264,8 @@ impl<'a, M: RemoteMessage> Outbox<'a, M> {
265264
);
266265

267266
let frame = Frame::Message(self.next_seq, message);
268-
let message = serialize_bincode(&frame).map_err(|e| format!("serialization error: {e}"))?;
267+
let message = serde_multipart::serialize_bincode(&frame)
268+
.map_err(|e| format!("serialization error: {e}"))?;
269269
let message_size = message.frame_len();
270270
metrics::REMOTE_MESSAGE_SEND_SIZE.record(message_size as f64, &[]);
271271

@@ -1128,7 +1128,9 @@ where
11281128
} else {
11291129
match link.connect().await {
11301130
Ok(stream) => {
1131-
let message = serialize_bincode(&Frame::<M>::Init(session_id)).unwrap();
1131+
let message =
1132+
serde_multipart::serialize_bincode(&Frame::<M>::Init(session_id))
1133+
.unwrap();
11321134

11331135
let mut write = FrameWrite::new(
11341136
stream,

hyperactor/src/config.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,6 @@ declare_attrs! {
179179
})
180180
pub attr DEFAULT_ENCODING: Encoding = Encoding::Multipart;
181181

182-
/// Whether to use multipart encoding for network channel communications.
183-
@meta(CONFIG = ConfigAttr {
184-
env_name: Some("HYPERACTOR_CHANNEL_MULTIPART".to_string()),
185-
py_name: None,
186-
})
187-
pub attr CHANNEL_MULTIPART: bool = true;
188-
189182
/// How often to check for full MPSC channel on NetRx.
190183
@meta(CONFIG = ConfigAttr {
191184
env_name: Some("HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL".to_string()),
@@ -344,7 +337,6 @@ mod tests {
344337
let expected_lines: HashSet<&str> = indoc! {"
345338
# export HYPERACTOR_MESSAGE_LATENCY_SAMPLING_RATE=0.01
346339
# export HYPERACTOR_CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL=5s
347-
# export HYPERACTOR_CHANNEL_MULTIPART=1
348340
# export HYPERACTOR_DEFAULT_ENCODING=serde_multipart
349341
# export HYPERACTOR_REMOTE_ALLOCATOR_HEARTBEAT_INTERVAL=5m
350342
# export HYPERACTOR_STOP_ACTOR_TIMEOUT=10s

hyperactor/src/config/global.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1107,12 +1107,12 @@ mod tests {
11071107
reset_to_defaults();
11081108

11091109
let mut env = Attrs::new();
1110-
env[CHANNEL_MULTIPART] = false;
1110+
env[MESSAGE_DELIVERY_TIMEOUT] = Duration::from_secs(120);
11111111
set(Source::Env, env);
11121112

1113-
assert!(!get(CHANNEL_MULTIPART));
1114-
let v = get_cloned(CHANNEL_MULTIPART);
1115-
assert!(!v);
1113+
assert_eq!(get(MESSAGE_DELIVERY_TIMEOUT), Duration::from_secs(120));
1114+
let v = get_cloned(MESSAGE_DELIVERY_TIMEOUT);
1115+
assert_eq!(v, Duration::from_secs(120));
11161116
}
11171117

11181118
#[test]

hyperactor/src/data.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -398,12 +398,7 @@ impl std::fmt::Debug for Encoded {
398398
Encoded::Bincode(data) => write!(f, "Encoded::Bincode({})", HexFmt(data)),
399399
Encoded::Json(data) => write!(f, "Encoded::Json({})", HexFmt(data)),
400400
Encoded::Multipart(message) => {
401-
write!(
402-
f,
403-
"Encoded::Multipart(illegal?={} body={}",
404-
message.is_illegal(),
405-
HexFmt(message.body())
406-
)?;
401+
write!(f, "Encoded::Multipart(body={}", HexFmt(message.body()))?;
407402
for (index, part) in message.parts().iter().enumerate() {
408403
write!(f, ", part[{}]={}", index, HexFmt(part))?;
409404
}

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,7 +1526,7 @@ mod tests {
15261526
hyperactor::mailbox::headers::set_rust_message_type::<Payload>(&mut headers);
15271527
let envelope = MessageEnvelope::new(src.clone(), dst.clone(), serialized, headers);
15281528
let frame = Frame::Message(0u64, envelope);
1529-
let message = serde_multipart::serialize_illegal_bincode(&frame).unwrap();
1529+
let message = serde_multipart::serialize_bincode(&frame).unwrap();
15301530
message.frame_len()
15311531
}
15321532

@@ -1542,7 +1542,6 @@ mod tests {
15421542
};
15431543
let _guard3 =
15441544
config.override_key(hyperactor::config::DEFAULT_ENCODING, Encoding::Bincode);
1545-
let _guard4 = config.override_key(hyperactor::config::CHANNEL_MULTIPART, false);
15461545

15471546
let alloc = process_allocator()
15481547
.allocate(AllocSpec {

serde_multipart/src/lib.rs

Lines changed: 1 addition & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,12 @@ use serde::Serialize;
5757
pub struct Message {
5858
body: Part,
5959
parts: Vec<Part>,
60-
is_illegal: bool,
6160
}
6261

6362
impl Message {
6463
/// Returns a new message with the given body and parts.
6564
pub fn from_body_and_parts(body: Part, parts: Vec<Part>) -> Self {
66-
Self {
67-
body,
68-
parts,
69-
is_illegal: false,
70-
}
65+
Self { body, parts }
7166
}
7267

7368
/// The body of the message.
@@ -118,17 +113,8 @@ impl Message {
118113
/// each part
119114
/// ```
120115
pub fn framed(self) -> Frame {
121-
let is_illegal = self.is_illegal;
122116
let (body, parts) = self.into_inner();
123117

124-
if is_illegal {
125-
assert!(parts.is_empty(), "illegal illegal message");
126-
return Frame::from_buffers(vec![
127-
Bytes::from_owner(u64::MAX.to_be_bytes()),
128-
body.into_inner(),
129-
]);
130-
}
131-
132118
let mut buffers = Vec::with_capacity(2 + 2 * parts.len());
133119

134120
let body = body.into_inner();
@@ -150,14 +136,6 @@ impl Message {
150136
return Err(std::io::ErrorKind::UnexpectedEof.into());
151137
}
152138
let body_len = buf.get_u64();
153-
if body_len == u64::MAX {
154-
return Ok(Self {
155-
body: buf.into(),
156-
parts: vec![],
157-
is_illegal: true,
158-
});
159-
}
160-
161139
let body = buf.split_to(body_len as usize);
162140
let mut parts = Vec::new();
163141
while !buf.is_empty() {
@@ -166,7 +144,6 @@ impl Message {
166144
Ok(Self {
167145
body: body.into(),
168146
parts,
169-
is_illegal: false,
170147
})
171148
}
172149

@@ -180,10 +157,6 @@ impl Message {
180157
}
181158
Ok(buf.split_to(at))
182159
}
183-
184-
pub fn is_illegal(&self) -> bool {
185-
self.is_illegal
186-
}
187160
}
188161

189162
/// An encoded [`Message`] frame. Implements [`bytes::Buf`],
@@ -325,7 +298,6 @@ pub fn serialize_bincode<S: ?Sized + serde::Serialize>(
325298
Ok(Message {
326299
body: Part(buffer.into_inner().freeze()),
327300
parts: serializer.into_parts(),
328-
is_illegal: false,
329301
})
330302
}
331303

@@ -335,14 +307,6 @@ pub fn deserialize_bincode<T>(message: Message) -> Result<T, bincode::Error>
335307
where
336308
T: serde::de::DeserializeOwned,
337309
{
338-
if message.is_illegal {
339-
let (body, parts) = message.into_inner();
340-
if !parts.is_empty() {
341-
return Err(bincode::ErrorKind::Custom("illegal illegal message".to_string()).into());
342-
}
343-
return bincode::deserialize_from(body.into_inner().reader());
344-
}
345-
346310
let (body, parts) = message.into_inner();
347311
let mut deserializer = part::BincodeDeserializer::new(
348312
bincode::Deserializer::with_reader(body.into_inner().reader(), options()),
@@ -354,22 +318,6 @@ where
354318
Ok(value)
355319
}
356320

357-
/// Serializes the provided value into an "illegal" Message that acts as an ordinary
358-
/// bincode-encoded buffer (bypassing multipart extraction and encoding). This is
359-
/// only used to support hyperactor's transition to multipart encoding, and will be
360-
/// removed after it is complete.
361-
///
362-
/// **YOU SHOULD NOT USE THIS**
363-
pub fn serialize_illegal_bincode<S: ?Sized + serde::Serialize>(
364-
value: &S,
365-
) -> Result<Message, bincode::Error> {
366-
Ok(Message {
367-
body: Part::from(bincode::serialize(value)?),
368-
parts: vec![],
369-
is_illegal: true,
370-
})
371-
}
372-
373321
/// Construct the set of options used by the specialized serializer and deserializer.
374322
fn options() -> part::BincodeOptionsType {
375323
bincode::DefaultOptions::new()
@@ -412,16 +360,6 @@ mod tests {
412360
let bincode_serialized = bincode::serialize(&value).unwrap();
413361
let bincode_deserialized = bincode::deserialize(&bincode_serialized).unwrap();
414362
assert_eq!(value, bincode_deserialized);
415-
416-
// Illegal encoding:
417-
let bincode_illegal = serialize_illegal_bincode(&value).unwrap();
418-
let mut bincode_illegal_framed = bincode_illegal.clone().framed();
419-
let bincode_illegal_framed =
420-
bincode_illegal_framed.copy_to_bytes(bincode_illegal_framed.remaining());
421-
let bincode_illegal_unframed = Message::from_framed(bincode_illegal_framed).unwrap();
422-
let bincode_illegal_deserialized_value =
423-
deserialize_bincode(bincode_illegal_unframed.clone()).unwrap();
424-
assert_eq!(value, bincode_illegal_deserialized_value);
425363
}
426364

427365
#[test]
@@ -507,7 +445,6 @@ mod tests {
507445
let message = Message {
508446
body: Part::from("hello"),
509447
parts: vec![Part::from("world")],
510-
is_illegal: false,
511448
};
512449
let err = deserialize_bincode::<String>(message).unwrap_err();
513450

@@ -565,7 +502,6 @@ mod tests {
565502
Part::from("xyz"),
566503
Part::from("xyzd"),
567504
],
568-
is_illegal: false,
569505
};
570506

571507
let mut framed = message.clone().framed();

0 commit comments

Comments
 (0)