Skip to content

Commit 0aa7d35

Browse files
thomasywangmeta-codesync[bot]
authored andcommitted
Create hyperactor_config crate
Summary: We want this to be reusable from any crate without taking a dependency on `hyperactor` to avoid circular dependencies. Differential Revision: D87890698
1 parent 379679a commit 0aa7d35

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+656
-582
lines changed

hyperactor/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ async-trait = "0.1.86"
4242
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
4343
bincode = "1.3.3"
4444
bytes = { version = "1.10", features = ["serde"] }
45-
chrono = { version = "0.4.41", features = ["clock", "serde", "std"], default-features = false }
4645
cityhasher = "0.1.0"
4746
clap = { version = "4.5.42", features = ["derive", "env", "string", "unicode", "wrap_help"] }
4847
crc32fast = "1.4"
@@ -55,6 +54,7 @@ fastrand = "2.1.1"
5554
futures = { version = "0.3.31", features = ["async-await", "compat"] }
5655
hostname = "0.3"
5756
humantime = "2.1"
57+
hyperactor_config = { version = "0.0.0", path = "../hyperactor_config" }
5858
hyperactor_macros = { version = "0.0.0", path = "../hyperactor_macros" }
5959
hyperactor_named = { version = "0.0.0", path = "../hyperactor_named" }
6060
hyperactor_telemetry = { version = "0.0.0", path = "../hyperactor_telemetry" }
@@ -73,8 +73,6 @@ rustls-pemfile = "1.0.0"
7373
serde = { version = "1.0.219", features = ["derive", "rc"] }
7474
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] }
7575
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
76-
serde_yaml = "0.9.25"
77-
shell-quote = "0.7.2"
7876
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }
7977
strum = { version = "0.27.1", features = ["derive"] }
8078
thiserror = "2.0.12"
@@ -94,6 +92,7 @@ indoc = "2.0.2"
9492
maplit = "1.0"
9593
proptest = "1.5"
9694
serde_bytes = "0.11"
95+
serde_yaml = "0.9.25"
9796
tempfile = "3.22"
9897
timed_test = { version = "0.0.0", path = "../timed_test" }
9998
tokio-test = "0.4.4"

hyperactor/src/accum.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub struct ReducerOpts {
5858
impl ReducerOpts {
5959
pub(crate) fn max_update_interval(&self) -> Duration {
6060
self.max_update_interval
61-
.unwrap_or(config::global::get(config::SPLIT_MAX_BUFFER_AGE))
61+
.unwrap_or(hyperactor_config::global::get(config::SPLIT_MAX_BUFFER_AGE))
6262
}
6363
}
6464

hyperactor/src/channel.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::str::FromStr;
2222

2323
use async_trait::async_trait;
2424
use enum_as_inner::EnumAsInner;
25+
use hyperactor_config::attrs::AttrValue;
2526
use lazy_static::lazy_static;
2627
use local_ip_address::local_ipv6;
2728
use serde::Deserialize;
@@ -33,7 +34,6 @@ use tokio::sync::watch;
3334
use crate as hyperactor;
3435
use crate::Named;
3536
use crate::RemoteMessage;
36-
use crate::attrs::AttrValue;
3737
use crate::channel::sim::SimAddr;
3838
use crate::simnet::SimNetError;
3939

@@ -1158,7 +1158,7 @@ mod tests {
11581158
// TODO: OSS: called `Result::unwrap()` on an `Err` value: Server(Listen(Tcp([::1]:0), Os { code: 99, kind: AddrNotAvailable, message: "Cannot assign requested address" }))
11591159
#[cfg_attr(not(fbcode_build), ignore)]
11601160
async fn test_send() {
1161-
let config = crate::config::global::lock();
1161+
let config = hyperactor_config::global::lock();
11621162

11631163
// Use temporary config for this test
11641164
let _guard1 = config.override_key(

hyperactor/src/channel/net.rs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ fn deserialize_response(data: Bytes) -> Result<NetRxResponse, bincode::Error> {
125125
fn serialize_bincode<S: ?Sized + serde::Serialize>(
126126
value: &S,
127127
) -> Result<serde_multipart::Message, bincode::Error> {
128-
if config::global::get(CHANNEL_MULTIPART) {
128+
if hyperactor_config::global::get(CHANNEL_MULTIPART) {
129129
serde_multipart::serialize_bincode(value)
130130
} else {
131131
serde_multipart::serialize_illegal_bincode(value)
@@ -1040,7 +1040,7 @@ mod tests {
10401040
async fn test_tcp_message_size() {
10411041
let default_size_in_bytes = 100 * 1024 * 1024;
10421042
// Use temporary config for this test
1043-
let config = config::global::lock();
1043+
let config = hyperactor_config::global::lock();
10441044
let _guard1 = config.override_key(config::MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(1));
10451045
let _guard2 = config.override_key(config::CODEC_MAX_FRAME_LENGTH, default_size_in_bytes);
10461046

@@ -1068,7 +1068,7 @@ mod tests {
10681068
// TODO: OSS: called `Result::unwrap()` on an `Err` value: Listen(Tcp([::1]:0), Os { code: 99, kind: AddrNotAvailable, message: "Cannot assign requested address" })
10691069
#[cfg_attr(not(fbcode_build), ignore)]
10701070
async fn test_ack_flush() {
1071-
let config = config::global::lock();
1071+
let config = hyperactor_config::global::lock();
10721072
// Set a large value to effectively prevent acks from being sent except
10731073
// during shutdown flush.
10741074
let _guard_message_ack =
@@ -1351,7 +1351,7 @@ mod tests {
13511351
}
13521352
}
13531353
}
1354-
let mut fw = FrameWrite::new(writer, data, config::global::get(config::CODEC_MAX_FRAME_LENGTH)).unwrap();
1354+
let mut fw = FrameWrite::new(writer, data, hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH)).unwrap();
13551355
if fw.send().await.is_err() {
13561356
break;
13571357
}
@@ -1376,7 +1376,7 @@ mod tests {
13761376
let (server_r, server_writer) = tokio::io::split(server_relay);
13771377
let (client_r, client_writer) = tokio::io::split(client_relay);
13781378

1379-
let max_len = config::global::get(config::CODEC_MAX_FRAME_LENGTH);
1379+
let max_len = hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH);
13801380
let server_reader = FrameReader::new(server_r, max_len);
13811381
let client_reader = FrameReader::new(client_r, max_len);
13821382

@@ -1480,7 +1480,10 @@ mod tests {
14801480
let join_handle =
14811481
tokio::spawn(async move { manager1.serve(conn, tx, cancel_token_1).await });
14821482
let (r, writer) = tokio::io::split(sender);
1483-
let reader = FrameReader::new(r, config::global::get(config::CODEC_MAX_FRAME_LENGTH));
1483+
let reader = FrameReader::new(
1484+
r,
1485+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1486+
);
14841487
(join_handle, reader, writer, rx, cancel_token)
14851488
}
14861489

@@ -1500,7 +1503,7 @@ mod tests {
15001503
let mut fw = FrameWrite::new(
15011504
writer,
15021505
message.framed(),
1503-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1506+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
15041507
)
15051508
.map_err(|(_w, e)| e)
15061509
.unwrap();
@@ -1515,7 +1518,7 @@ mod tests {
15151518
let mut fw = FrameWrite::new(
15161519
writer,
15171520
message.framed(),
1518-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1521+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
15191522
)
15201523
.map_err(|(_w, e)| e)
15211524
.unwrap();
@@ -1529,7 +1532,7 @@ mod tests {
15291532
#[async_timed_test(timeout_secs = 60)]
15301533
async fn test_persistent_server_session() {
15311534
// Use temporary config for this test
1532-
let config = config::global::lock();
1535+
let config = hyperactor_config::global::lock();
15331536
let _guard = config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 1);
15341537

15351538
async fn verify_ack(reader: &mut FrameReader<ReadHalf<DuplexStream>>, expected_last: u64) {
@@ -1632,7 +1635,7 @@ mod tests {
16321635

16331636
#[async_timed_test(timeout_secs = 60)]
16341637
async fn test_ack_from_server_session() {
1635-
let config = config::global::lock();
1638+
let config = hyperactor_config::global::lock();
16361639
let _guard = config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 1);
16371640
let manager = SessionManager::new();
16381641
let session_id = 123u64;
@@ -1694,7 +1697,7 @@ mod tests {
16941697
let link = MockLink::<u64>::fail_connects();
16951698
let tx = super::dial::<u64>(link);
16961699
// Override the default (1m) for the purposes of this test.
1697-
let config = config::global::lock();
1700+
let config = hyperactor_config::global::lock();
16981701
let _guard = config.override_key(config::MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(1));
16991702
let mut tx_receiver = tx.status().clone();
17001703
let (return_channel, _return_receiver) = oneshot::channel();
@@ -1707,7 +1710,10 @@ mod tests {
17071710
) -> (FrameReader<ReadHalf<DuplexStream>>, WriteHalf<DuplexStream>) {
17081711
let receiver = receiver_storage.take().await;
17091712
let (r, writer) = tokio::io::split(receiver);
1710-
let reader = FrameReader::new(r, config::global::get(config::CODEC_MAX_FRAME_LENGTH));
1713+
let reader = FrameReader::new(
1714+
r,
1715+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1716+
);
17111717
(reader, writer)
17121718
}
17131719

@@ -2062,7 +2068,7 @@ mod tests {
20622068

20632069
async fn verify_ack_exceeded_limit(disconnect_before_ack: bool) {
20642070
// Use temporary config for this test
2065-
let config = config::global::lock();
2071+
let config = hyperactor_config::global::lock();
20662072
let _guard = config.override_key(config::MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(2));
20672073

20682074
let link: MockLink<u64> = MockLink::<u64>::new();
@@ -2080,7 +2086,7 @@ mod tests {
20802086
let _ = FrameWrite::write_frame(
20812087
writer,
20822088
serialize_response(NetRxResponse::Ack(0)).unwrap(),
2083-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
2089+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
20842090
)
20852091
.await
20862092
.map_err(|(_, e)| e)
@@ -2197,7 +2203,7 @@ mod tests {
21972203

21982204
#[async_timed_test(timeout_secs = 60)]
21992205
async fn test_ack_every_n_messages() {
2200-
let config = config::global::lock();
2206+
let config = hyperactor_config::global::lock();
22012207
let _guard_message_ack = config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 600);
22022208
let _guard_time_interval =
22032209
config.override_key(config::MESSAGE_ACK_TIME_INTERVAL, Duration::from_secs(1000));
@@ -2206,7 +2212,7 @@ mod tests {
22062212

22072213
#[async_timed_test(timeout_secs = 60)]
22082214
async fn test_ack_every_time_interval() {
2209-
let config = config::global::lock();
2215+
let config = hyperactor_config::global::lock();
22102216
let _guard_message_ack =
22112217
config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 100000000);
22122218
let _guard_time_interval = config.override_key(
@@ -2297,7 +2303,7 @@ mod tests {
22972303
// TODO: OSS: called `Result::unwrap()` on an `Err` value: Listen(Tcp([::1]:0), Os { code: 99, kind: AddrNotAvailable, message: "Cannot assign requested address" })
22982304
#[cfg_attr(not(fbcode_build), ignore)]
22992305
async fn test_tcp_throughput() {
2300-
let config = config::global::lock();
2306+
let config = hyperactor_config::global::lock();
23012307
let _guard =
23022308
config.override_key(config::MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(300));
23032309

@@ -2374,7 +2380,7 @@ mod tests {
23742380

23752381
#[async_timed_test(timeout_secs = 60)]
23762382
async fn test_server_rejects_conn_on_out_of_sequence_message() {
2377-
let config = config::global::lock();
2383+
let config = hyperactor_config::global::lock();
23782384
let _guard = config.override_key(config::MESSAGE_ACK_EVERY_N_MESSAGES, 1);
23792385
let manager = SessionManager::new();
23802386
let session_id = 123u64;

hyperactor/src/channel/net/client.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,8 @@ impl<'a, M: RemoteMessage> Outbox<'a, M> {
229229
match self.deque.front() {
230230
None => false,
231231
Some(msg) => {
232-
msg.received_at.elapsed() > config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
232+
msg.received_at.elapsed()
233+
> hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
233234
}
234235
}
235236
}
@@ -436,7 +437,7 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> {
436437
fn is_expired(&self) -> bool {
437438
matches!(
438439
self.deque.front(),
439-
Some(msg) if msg.received_at.elapsed() > config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
440+
Some(msg) if msg.received_at.elapsed() > hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
440441
)
441442
}
442443

@@ -448,7 +449,8 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> {
448449
Some(msg) => {
449450
RealClock
450451
.sleep_until(
451-
msg.received_at + config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
452+
msg.received_at
453+
+ hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
452454
)
453455
.await
454456
}
@@ -873,7 +875,7 @@ where
873875
..
874876
},
875877
) if !outbox.is_empty() => {
876-
let max = config::global::get(config::CODEC_MAX_FRAME_LENGTH);
878+
let max = hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH);
877879
let len = outbox.front_size().expect("not empty");
878880
let message = outbox.front_message().expect("not empty");
879881

@@ -995,7 +997,7 @@ where
995997
_ = unacked.wait_for_timeout(), if !unacked.is_empty() => {
996998
let error_msg = format!(
997999
"failed to receive ack within timeout {:?}; link is currently connected",
998-
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
1000+
hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
9991001
);
10001002
tracing::error!(
10011003
dest = %link.dest(),
@@ -1094,7 +1096,7 @@ where
10941096
if outbox.is_expired() {
10951097
let error_msg = format!(
10961098
"failed to deliver message within timeout {:?}",
1097-
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
1099+
hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
10981100
);
10991101
tracing::error!(
11001102
dest = %link.dest(),
@@ -1111,7 +1113,7 @@ where
11111113
} else if unacked.is_expired() {
11121114
let error_msg = format!(
11131115
"failed to receive ack within timeout {:?}; link is currently broken",
1114-
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
1116+
hyperactor_config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
11151117
);
11161118
tracing::error!(
11171119
dest = %link.dest(),
@@ -1133,7 +1135,7 @@ where
11331135
let mut write = FrameWrite::new(
11341136
stream,
11351137
message.framed(),
1136-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1138+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
11371139
)
11381140
.expect("enough length");
11391141
let initialized = write.send().await.is_ok();
@@ -1176,7 +1178,9 @@ where
11761178
Conn::Connected {
11771179
reader: FrameReader::new(
11781180
reader,
1179-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
1181+
hyperactor_config::global::get(
1182+
config::CODEC_MAX_FRAME_LENGTH,
1183+
),
11801184
),
11811185
write_state: WriteState::Idle(writer),
11821186
}

hyperactor/src/channel/net/server.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,10 @@ impl<S: AsyncRead + AsyncWrite> ServerConn<S> {
5757
pub(super) fn new(stream: S, source: ChannelAddr, dest: ChannelAddr) -> Self {
5858
let (reader, writer) = tokio::io::split(stream);
5959
Self {
60-
reader: FrameReader::new(reader, config::global::get(config::CODEC_MAX_FRAME_LENGTH)),
60+
reader: FrameReader::new(
61+
reader,
62+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
63+
),
6164
write_state: WriteState::Idle(writer),
6265
source,
6366
dest,
@@ -97,8 +100,8 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
97100
let mut rcv_raw_frame_count = 0u64;
98101
let mut last_ack_time = RealClock.now();
99102

100-
let ack_time_interval = config::global::get(config::MESSAGE_ACK_TIME_INTERVAL);
101-
let ack_msg_interval = config::global::get(config::MESSAGE_ACK_EVERY_N_MESSAGES);
103+
let ack_time_interval = hyperactor_config::global::get(config::MESSAGE_ACK_TIME_INTERVAL);
104+
let ack_msg_interval = hyperactor_config::global::get(config::MESSAGE_ACK_EVERY_N_MESSAGES);
102105

103106
let (mut final_next, final_result, reject_conn) = loop {
104107
if self.write_state.is_idle()
@@ -123,7 +126,7 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
123126
match FrameWrite::new(
124127
writer,
125128
ack,
126-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
129+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
127130
) {
128131
Ok(fw) => {
129132
self.write_state = WriteState::Writing(fw, next.seq);
@@ -366,7 +369,7 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
366369
let ack = serialize_response(NetRxResponse::Ack(final_next.seq - 1))
367370
.map_err(anyhow::Error::from)?;
368371

369-
let max = config::global::get(config::CODEC_MAX_FRAME_LENGTH);
372+
let max = hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH);
370373
let fw =
371374
FrameWrite::new(writer, ack, max).map_err(|(_, e)| anyhow::Error::from(e))?;
372375
self.write_state = WriteState::Writing(fw, final_next.seq);
@@ -399,7 +402,7 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
399402
match FrameWrite::new(
400403
writer,
401404
data,
402-
config::global::get(config::CODEC_MAX_FRAME_LENGTH),
405+
hyperactor_config::global::get(config::CODEC_MAX_FRAME_LENGTH),
403406
) {
404407
Ok(fw) => {
405408
self.write_state = WriteState::Writing(fw, 0);
@@ -445,7 +448,7 @@ impl<S: AsyncRead + AsyncWrite + Send + 'static + Unpin> ServerConn<S> {
445448
permit_result?.send(message);
446449
return Ok(())
447450
}
448-
_ = RealClock.sleep(config::global::get(config::CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL)) => {
451+
_ = RealClock.sleep(hyperactor_config::global::get(config::CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL)) => {
449452
// When buffer is full too long, we log it.
450453
metrics::CHANNEL_NET_RX_BUFFER_FULL.add(
451454
1,

hyperactor/src/channel/sim.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,11 +397,11 @@ impl<M: RemoteMessage> Rx<M> for SimRx<M> {
397397
mod tests {
398398
use std::iter::zip;
399399

400+
use hyperactor_config::attrs::Attrs;
400401
use ndslice::extent;
401402

402403
use super::*;
403404
use crate::PortId;
404-
use crate::attrs::Attrs;
405405
use crate::clock::Clock;
406406
use crate::clock::RealClock;
407407
use crate::clock::SimClock;

0 commit comments

Comments
 (0)