Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions justfiles/testing.just
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# just commands for CI & local development

[group('testing')]
bench-rustdoc-page host="http://127.0.0.1:8888":
rm -rf ignored/cratesfyi-prefix/archive_cache/*
ab \
-n 10000 \
-c 500 \
{{ host }}/rayon/1.11.0/rayon/

[group('testing')]
[group('sqlx')]
sqlx-prepare *args: _ensure_db_and_s3_are_running
Expand Down
19 changes: 19 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ pub struct Config {
// for the remote archives?
pub(crate) local_archive_cache_path: PathBuf,

// expected number of entries in the local archive cache.
// Makes server restarts faster by preallocating some data structures.
// General numbers (as of 2025-12):
// * we have ~1.5 mio releases with archive storage (and 400k without)
// * each release has on average 2 archive files (rustdoc, source)
// so, over all, 3 mio archive index files in S3.
//
// While due to crawlers we will download _all_ of them over time, the old
// metric "releases accessed in the last 10 minutes" was around 50k, if I
// recall correctly.
// We're using a local DashMap to store some locks for these indexes,
// and we already know in advance we need these 50k entries.
// So we can preallocate the DashMap with this number to avoid resizes.
pub(crate) local_archive_cache_expected_count: usize,

// Where to collect metrics for the metrics initiative.
// When empty, we won't collect metrics.
pub(crate) compiler_metrics_collection_path: Option<PathBuf>,
Expand Down Expand Up @@ -214,6 +229,10 @@ impl Config {
"DOCSRS_ARCHIVE_INDEX_CACHE_PATH",
prefix.join("archive_cache"),
)?)?)
.local_archive_cache_expected_count(env(
"DOCSRS_ARCHIVE_INDEX_EXPECTED_COUNT",
100_000usize,
)?)
.compiler_metrics_collection_path(maybe_env("DOCSRS_COMPILER_METRICS_PATH")?)
.temp_dir(temp_dir)
.rustwide_workspace(env(
Expand Down
154 changes: 102 additions & 52 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ use std::{
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
time::Duration,
};
use tokio::{
io::{AsyncBufRead, AsyncBufReadExt},
runtime,
sync::RwLock,
sync::Mutex,
time::sleep,
};
use tracing::{error, info_span, instrument, trace, warn};
use tracing::{debug, error, info_span, instrument, trace, warn};
use walkdir::WalkDir;

const ARCHIVE_INDEX_FILE_EXTENSION: &str = "index";
Expand Down Expand Up @@ -258,8 +260,8 @@ enum StorageBackend {
pub struct AsyncStorage {
backend: StorageBackend,
config: Arc<Config>,
/// Locks to synchronize access to the locally cached archive index files.
locks: DashMap<PathBuf, Arc<RwLock<()>>>,
/// Locks to synchronize write-access to the locally cached archive index files.
locks: DashMap<PathBuf, Arc<Mutex<()>>>,
}

impl AsyncStorage {
Expand All @@ -279,8 +281,8 @@ impl AsyncStorage {
StorageBackend::S3(Box::new(S3Backend::new(&config, otel_metrics).await?))
}
},
locks: DashMap::with_capacity(config.local_archive_cache_expected_count),
config,
locks: DashMap::new(),
})
}

Expand Down Expand Up @@ -454,12 +456,12 @@ impl AsyncStorage {
Ok(raw_stream.decompress().await?)
}

fn local_index_cache_lock(&self, local_index_path: impl AsRef<Path>) -> Arc<RwLock<()>> {
fn local_index_cache_lock(&self, local_index_path: impl AsRef<Path>) -> Arc<Mutex<()>> {
let local_index_path = local_index_path.as_ref().to_path_buf();

self.locks
.entry(local_index_path)
.or_insert_with(|| Arc::new(RwLock::new(())))
.or_insert_with(|| Arc::new(Mutex::new(())))
.downgrade()
.clone()
}
Expand All @@ -478,7 +480,7 @@ impl AsyncStorage {

let rwlock = self.local_index_cache_lock(&local_index_path);

let _write_guard = rwlock.write().await;
let _write_guard = rwlock.lock().await;

if tokio::fs::try_exists(&local_index_path).await? {
tokio::fs::remove_file(&local_index_path).await?;
Expand All @@ -487,10 +489,52 @@ impl AsyncStorage {
Ok(())
}

#[instrument(skip(self))]
async fn download_archive_index(
&self,
local_index_path: &Path,
remote_index_path: &str,
) -> Result<()> {
let parent = local_index_path
.parent()
.ok_or_else(|| anyhow::anyhow!("index path without parent"))?
.to_path_buf();
tokio::fs::create_dir_all(&parent).await?;

// Create a unique temp file in the cache folder.
let (temp_file, mut temp_path) = spawn_blocking({
let folder = self.config.local_archive_cache_path.clone();
move || -> Result<_> { tempfile::NamedTempFile::new_in(&folder).map_err(Into::into) }
})
.await?
.into_parts();

// Download into temp file.
let mut temp_file = tokio::fs::File::from_std(temp_file);
let mut stream = self.get_stream(remote_index_path).await?.content;
tokio::io::copy(&mut stream, &mut temp_file).await?;
temp_file.sync_all().await?;

temp_path.disable_cleanup(true);

// Publish atomically.
// Will replace any existing file.
tokio::fs::rename(&temp_path, local_index_path).await?;

// fsync parent dir to make rename durable
spawn_blocking(move || {
let dir = std::fs::File::open(parent)?;
dir.sync_all().map_err(Into::into)
})
.await?;

Ok(())
}

/// Find find the file into needed to fetch a certain path inside a remote archive.
/// Will try to use a local cache of the index file, and otherwise download it
/// from storage.
#[instrument]
#[instrument(skip(self))]
async fn find_in_archive_index(
&self,
archive_path: &str,
Expand All @@ -504,57 +548,63 @@ impl AsyncStorage {
latest_build_id.map(|id| id.0).unwrap_or(0)
));

let rwlock = self.local_index_cache_lock(&local_index_path);
// fast path: try to use whatever is there, no locking
match archive_index::find_in_file(&local_index_path, path_in_archive).await {
Ok(res) => return Ok(res),
Err(err) => {
debug!(?err, "archive index lookup failed, will try repair.");
}
}

// directly acquire the read-lock, so the syscall (`path.exists()`) below is already
// protected.
let mut _read_guard = rwlock.read().await;

if !tokio::fs::try_exists(&local_index_path).await? {
// upgrade the lock to a write-lock for downloading & storing the index.
drop(_read_guard);
let _write_guard = rwlock.write().await;

// check existence again in case of Race Condition (TOCTOU)
if !tokio::fs::try_exists(&local_index_path).await? {
// remote/folder/and/x.zip.index
let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}");

let parent = local_index_path
.parent()
.ok_or_else(|| anyhow!("index path without parent"))?
.to_path_buf();
tokio::fs::create_dir_all(&parent).await?;

let mut temp_path = spawn_blocking({
// this creates the tempfile and directly drops it again,
// just to return a valid temp-path.
// This could be optimized.
let folder = self.config.local_archive_cache_path.clone();
move || Ok(tempfile::NamedTempFile::new_in(&folder)?.into_temp_path())
})
.await?;
let lock = self.local_index_cache_lock(&local_index_path);

let mut file = tokio::fs::File::create(&temp_path).await?;
let mut stream = self.get_stream(&remote_index_path).await?.content;
tokio::io::copy(&mut stream, &mut file).await?;
file.sync_all().await?;
// At this point we know the index is missing or broken.
// Try to become the "downloader" without queueing as a writer.
if let Ok(write_guard) = lock.try_lock() {
// Double-check: maybe someone fixed it between our first failure and now.
if let Ok(res) = archive_index::find_in_file(&local_index_path, path_in_archive).await {
return Ok(res);
}

temp_path.disable_cleanup(true);
tokio::fs::rename(&temp_path, &local_index_path).await?;
let remote_index_path = format!("{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}");

// fsync parent dir to make rename durable (blocking)
spawn_blocking(move || {
let dir = std::fs::File::open(parent)?;
dir.sync_all().map_err(Into::into)
})
// We are the repairer: download fresh index into place.
self.download_archive_index(&local_index_path, &remote_index_path)
.await?;
}

_read_guard = _write_guard.downgrade();
// Write lock is dropped here (end of scope), so others can proceed.
drop(write_guard);

// Final attempt: if this still fails, bubble the error.
return archive_index::find_in_file(local_index_path, path_in_archive).await;
}

// Someone else is already downloading/repairing. Don't queue on write(); just wait
// a bit and poll the fast path until it becomes readable or we give up.
const STEP_MS: u64 = 10;
const ATTEMPTS: u64 = 50; // = 500ms total wait
const TOTAL_WAIT_MS: u64 = STEP_MS * ATTEMPTS;

let mut last_err = None;

for _ in 0..ATTEMPTS {
sleep(Duration::from_millis(STEP_MS)).await;

match archive_index::find_in_file(local_index_path.clone(), path_in_archive).await {
Ok(res) => return Ok(res),
Err(err) => {
// keep waiting; repair may still be in progress
last_err = Some(err);
}
}
}

archive_index::find_in_file(local_index_path, path_in_archive).await
// Still not usable after waiting: return the last error we saw.
Err(last_err
.unwrap_or_else(|| anyhow!("archive index unavailable after repair wait"))
.context(format!(
"no archive index after waiting for {TOTAL_WAIT_MS}ms"
)))
}

#[instrument]
Expand Down
Loading