Skip to content

Commit 2d8e465

Browse files
authored
chore(pegboard): only refresh serverless runner metadata when new or udpated endpoint (#3227)
1 parent f8e7b2a commit 2d8e465

File tree

4 files changed

+91
-39
lines changed

4 files changed

+91
-39
lines changed

out/openapi.json

Lines changed: 9 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core/api-peer/src/runner_configs.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,9 @@ pub struct UpsertRequest(pub rivet_api_types::namespaces::runner_configs::Runner
9393

9494
#[derive(Deserialize, Serialize, ToSchema)]
9595
#[schema(as = RunnerConfigsUpsertResponse)]
96-
pub struct UpsertResponse {}
96+
pub struct UpsertResponse {
97+
pub endpoint_config_changed: bool,
98+
}
9799

98100
#[tracing::instrument(skip_all)]
99101
pub async fn upsert(
@@ -109,14 +111,17 @@ pub async fn upsert(
109111
.await?
110112
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
111113

112-
ctx.op(namespace::ops::runner_config::upsert::Input {
113-
namespace_id: namespace.namespace_id,
114-
name: path.runner_name,
115-
config: body.0.into(),
116-
})
117-
.await?;
114+
let endpoint_config_changed = ctx
115+
.op(namespace::ops::runner_config::upsert::Input {
116+
namespace_id: namespace.namespace_id,
117+
name: path.runner_name,
118+
config: body.0.into(),
119+
})
120+
.await?;
118121

119-
Ok(UpsertResponse {})
122+
Ok(UpsertResponse {
123+
endpoint_config_changed,
124+
})
120125
}
121126

122127
#[derive(Debug, Serialize, Deserialize, IntoParams)]

packages/core/api-public/src/runner_configs/upsert.rs

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,19 @@ async fn upsert_inner(
5959

6060
tracing::debug!(runner_name = ?path.runner_name, datacenters_count = body.datacenters.len(), "starting upsert");
6161

62+
// Resolve namespace
63+
let namespace = ctx
64+
.op(namespace::ops::resolve_for_name_global::Input {
65+
name: query.namespace.clone(),
66+
})
67+
.await?
68+
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
69+
6270
// Store serverless config before processing (since we'll remove from body.datacenters)
6371
let serverless_config = body
6472
.datacenters
6573
.iter()
66-
.filter_map(|(dc_name, runner_config)| {
74+
.filter_map(|(_dc_name, runner_config)| {
6775
if let rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Serverless {
6876
url,
6977
headers,
@@ -78,16 +86,17 @@ async fn upsert_inner(
7886
.next();
7987

8088
// Apply config
89+
let mut any_endpoint_config_changed = false;
8190
for dc in &ctx.config().topology().datacenters {
8291
if let Some(runner_config) = body.datacenters.remove(&dc.name) {
83-
if ctx.config().dc_label() == dc.datacenter_label {
92+
let response = if ctx.config().dc_label() == dc.datacenter_label {
8493
rivet_api_peer::runner_configs::upsert(
8594
ctx.clone().into(),
8695
path.clone(),
8796
query.clone(),
8897
rivet_api_peer::runner_configs::UpsertRequest(runner_config),
8998
)
90-
.await?;
99+
.await?
91100
} else {
92101
request_remote_datacenter::<UpsertResponse>(
93102
ctx.config(),
@@ -97,7 +106,11 @@ async fn upsert_inner(
97106
Some(&query),
98107
Some(&runner_config),
99108
)
100-
.await?;
109+
.await?
110+
};
111+
112+
if response.endpoint_config_changed {
113+
any_endpoint_config_changed = true;
101114
}
102115
} else {
103116
if ctx.config().dc_label() == dc.datacenter_label {
@@ -125,28 +138,25 @@ async fn upsert_inner(
125138
}
126139
}
127140

128-
// Resolve namespace
129-
let namespace = ctx
130-
.op(namespace::ops::resolve_for_name_global::Input {
131-
name: query.namespace.clone(),
132-
})
133-
.await?
134-
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;
135-
136141
// Update runner metadata
137142
//
138143
// This allows us to populate the actor names immediately upon configuring a serverless runner
139144
if let Some((url, metadata_headers)) = serverless_config {
140-
if let Err(err) = utils::refresh_runner_config_metadata(
141-
ctx.clone(),
142-
namespace.namespace_id,
143-
path.runner_name.clone(),
144-
url,
145-
metadata_headers,
146-
)
147-
.await
148-
{
149-
tracing::warn!(?err, runner_name = ?path.runner_name, "failed to refresh runner config metadata");
145+
if any_endpoint_config_changed {
146+
tracing::debug!("endpoint config changed, refreshing metadata");
147+
if let Err(err) = utils::refresh_runner_config_metadata(
148+
ctx.clone(),
149+
namespace.namespace_id,
150+
path.runner_name.clone(),
151+
url,
152+
metadata_headers,
153+
)
154+
.await
155+
{
156+
tracing::warn!(?err, runner_name = ?path.runner_name, "failed to refresh runner config metadata");
157+
}
158+
} else {
159+
tracing::debug!("endpoint config unchanged, skipping metadata refresh");
150160
}
151161
}
152162

@@ -160,5 +170,7 @@ async fn upsert_inner(
160170
)
161171
.await?;
162172

163-
Ok(UpsertResponse {})
173+
Ok(UpsertResponse {
174+
endpoint_config_changed: any_endpoint_config_changed,
175+
})
164176
}

packages/services/namespace/src/ops/runner_config/upsert.rs

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,49 @@ pub struct Input {
1212
}
1313

1414
#[operation]
15-
pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result<()> {
16-
ctx.udb()?
15+
pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result<bool> {
16+
let endpoint_config_changed = ctx
17+
.udb()?
1718
.run(|tx| async move {
1819
let tx = tx.with_subspace(keys::subspace());
1920

2021
let runner_config_key =
2122
keys::runner_config::DataKey::new(input.namespace_id, input.name.clone());
2223

23-
// Delete previous index
24-
if let Some(existing_config) = tx.read_opt(&runner_config_key, Serializable).await? {
24+
// Check if config changed (for serverless, compare URL and headers)
25+
let endpoint_config_changed = if let Some(existing_config) =
26+
tx.read_opt(&runner_config_key, Serializable).await?
27+
{
28+
// Delete previous index
2529
tx.delete(&keys::runner_config::ByVariantKey::new(
2630
input.namespace_id,
2731
runner_config_variant(&existing_config),
2832
input.name.clone(),
2933
));
30-
}
34+
35+
// Check if serverless endpoint config changed
36+
match (&existing_config.kind, &input.config.kind) {
37+
(
38+
RunnerConfigKind::Serverless {
39+
url: old_url,
40+
headers: old_headers,
41+
..
42+
},
43+
RunnerConfigKind::Serverless {
44+
url: new_url,
45+
headers: new_headers,
46+
..
47+
},
48+
) => old_url != new_url || old_headers != new_headers,
49+
_ => {
50+
// Config type changed or is not serverless
51+
true
52+
}
53+
}
54+
} else {
55+
// New config
56+
true
57+
};
3158

3259
// Write new config
3360
tx.write(&runner_config_key, input.config.clone())?;
@@ -104,7 +131,7 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -
104131
}
105132
}
106133

107-
Ok(Ok(()))
134+
Ok(Ok(endpoint_config_changed))
108135
})
109136
.custom_instrument(tracing::info_span!("runner_config_upsert_tx"))
110137
.await?
@@ -117,5 +144,5 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -
117144
.await?;
118145
}
119146

120-
Ok(())
147+
Ok(endpoint_config_changed)
121148
}

0 commit comments

Comments
 (0)