@@ -40,11 +40,13 @@ use std::{
4040 path:: { Path , PathBuf } ,
4141 str:: FromStr ,
4242 sync:: Arc ,
43+ time:: Duration ,
4344} ;
4445use tokio:: {
4546 io:: { AsyncBufRead , AsyncBufReadExt } ,
4647 runtime,
47- sync:: RwLock ,
48+ sync:: Mutex ,
49+ time:: sleep,
4850} ;
4951use tracing:: { error, info_span, instrument, trace, warn} ;
5052use walkdir:: WalkDir ;
@@ -258,8 +260,8 @@ enum StorageBackend {
258260pub struct AsyncStorage {
259261 backend : StorageBackend ,
260262 config : Arc < Config > ,
261- /// Locks to synchronize access to the locally cached archive index files.
262- locks : DashMap < PathBuf , Arc < RwLock < ( ) > > > ,
263+ /// Locks to synchronize write- access to the locally cached archive index files.
264+ locks : DashMap < PathBuf , Arc < Mutex < ( ) > > > ,
263265}
264266
265267impl AsyncStorage {
@@ -279,8 +281,8 @@ impl AsyncStorage {
279281 StorageBackend :: S3 ( Box :: new ( S3Backend :: new ( & config, otel_metrics) . await ?) )
280282 }
281283 } ,
284+ locks : DashMap :: with_capacity ( config. local_archive_cache_expected_count ) ,
282285 config,
283- locks : DashMap :: new ( ) ,
284286 } )
285287 }
286288
@@ -454,12 +456,12 @@ impl AsyncStorage {
454456 Ok ( raw_stream. decompress ( ) . await ?)
455457 }
456458
457- fn local_index_cache_lock ( & self , local_index_path : impl AsRef < Path > ) -> Arc < RwLock < ( ) > > {
459+ fn local_index_cache_lock ( & self , local_index_path : impl AsRef < Path > ) -> Arc < Mutex < ( ) > > {
458460 let local_index_path = local_index_path. as_ref ( ) . to_path_buf ( ) ;
459461
460462 self . locks
461463 . entry ( local_index_path)
462- . or_insert_with ( || Arc :: new ( RwLock :: new ( ( ) ) ) )
464+ . or_insert_with ( || Arc :: new ( Mutex :: new ( ( ) ) ) )
463465 . downgrade ( )
464466 . clone ( )
465467 }
@@ -478,7 +480,7 @@ impl AsyncStorage {
478480
479481 let rwlock = self . local_index_cache_lock ( & local_index_path) ;
480482
481- let _write_guard = rwlock. write ( ) . await ;
483+ let _write_guard = rwlock. lock ( ) . await ;
482484
483485 if tokio:: fs:: try_exists ( & local_index_path) . await ? {
484486 tokio:: fs:: remove_file ( & local_index_path) . await ?;
@@ -487,10 +489,52 @@ impl AsyncStorage {
487489 Ok ( ( ) )
488490 }
489491
492+ #[ instrument( skip( self ) ) ]
493+ async fn download_archive_index (
494+ & self ,
495+ local_index_path : & Path ,
496+ remote_index_path : & str ,
497+ ) -> Result < ( ) > {
498+ let parent = local_index_path
499+ . parent ( )
500+ . ok_or_else ( || anyhow:: anyhow!( "index path without parent" ) ) ?
501+ . to_path_buf ( ) ;
502+ tokio:: fs:: create_dir_all ( & parent) . await ?;
503+
504+ // Create a unique temp file in the cache folder.
505+ let ( temp_file, mut temp_path) = spawn_blocking ( {
506+ let folder = self . config . local_archive_cache_path . clone ( ) ;
507+ move || -> Result < _ > { tempfile:: NamedTempFile :: new_in ( & folder) . map_err ( Into :: into) }
508+ } )
509+ . await ?
510+ . into_parts ( ) ;
511+
512+ // Download into temp file.
513+ let mut temp_file = tokio:: fs:: File :: from_std ( temp_file) ;
514+ let mut stream = self . get_stream ( remote_index_path) . await ?. content ;
515+ tokio:: io:: copy ( & mut stream, & mut temp_file) . await ?;
516+ temp_file. sync_all ( ) . await ?;
517+
518+ temp_path. disable_cleanup ( true ) ;
519+
520+ // Publish atomically.
521+ // Will replace any existing file.
522+ tokio:: fs:: rename ( & temp_path, local_index_path) . await ?;
523+
524+ // fsync parent dir to make rename durable
525+ spawn_blocking ( move || {
526+ let dir = std:: fs:: File :: open ( parent) ?;
527+ dir. sync_all ( ) . map_err ( Into :: into)
528+ } )
529+ . await ?;
530+
531+ Ok ( ( ) )
532+ }
533+
490534 /// Find find the file into needed to fetch a certain path inside a remote archive.
491535 /// Will try to use a local cache of the index file, and otherwise download it
492536 /// from storage.
493- #[ instrument]
537+ #[ instrument( skip ( self ) ) ]
494538 async fn find_in_archive_index (
495539 & self ,
496540 archive_path : & str ,
@@ -504,57 +548,63 @@ impl AsyncStorage {
504548 latest_build_id. map( |id| id. 0 ) . unwrap_or( 0 )
505549 ) ) ;
506550
507- let rwlock = self . local_index_cache_lock ( & local_index_path) ;
551+ // fast path: try to use whatever is there, no locking
552+ match archive_index:: find_in_file ( & local_index_path, path_in_archive) . await {
553+ Ok ( res) => return Ok ( res) ,
554+ Err ( err) => {
555+ tracing:: debug!( ?err, "archive index lookup failed, will try repair." ) ;
556+ }
557+ }
508558
509- // directly acquire the read-lock, so the syscall (`path.exists()`) below is already
510- // protected.
511- let mut _read_guard = rwlock. read ( ) . await ;
512-
513- if !tokio:: fs:: try_exists ( & local_index_path) . await ? {
514- // upgrade the lock to a write-lock for downloading & storing the index.
515- drop ( _read_guard) ;
516- let _write_guard = rwlock. write ( ) . await ;
517-
518- // check existence again in case of Race Condition (TOCTOU)
519- if !tokio:: fs:: try_exists ( & local_index_path) . await ? {
520- // remote/folder/and/x.zip.index
521- let remote_index_path = format ! ( "{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}" ) ;
522-
523- let parent = local_index_path
524- . parent ( )
525- . ok_or_else ( || anyhow ! ( "index path without parent" ) ) ?
526- . to_path_buf ( ) ;
527- tokio:: fs:: create_dir_all ( & parent) . await ?;
528-
529- let mut temp_path = spawn_blocking ( {
530- // this creates the tempfile and directly drops it again,
531- // just to return a valid temp-path.
532- // This could be optimized.
533- let folder = self . config . local_archive_cache_path . clone ( ) ;
534- move || Ok ( tempfile:: NamedTempFile :: new_in ( & folder) ?. into_temp_path ( ) )
535- } )
536- . await ?;
559+ let lock = self . local_index_cache_lock ( & local_index_path) ;
537560
538- let mut file = tokio:: fs:: File :: create ( & temp_path) . await ?;
539- let mut stream = self . get_stream ( & remote_index_path) . await ?. content ;
540- tokio:: io:: copy ( & mut stream, & mut file) . await ?;
541- file. sync_all ( ) . await ?;
561+ // At this point we know the index is missing or broken.
562+ // Try to become the "downloader" without queueing as a writer.
563+ if let Ok ( write_guard) = lock. try_lock ( ) {
564+ // Double-check: maybe someone fixed it between our first failure and now.
565+ if let Ok ( res) = archive_index:: find_in_file ( & local_index_path, path_in_archive) . await {
566+ return Ok ( res) ;
567+ }
542568
543- temp_path. disable_cleanup ( true ) ;
544- tokio:: fs:: rename ( & temp_path, & local_index_path) . await ?;
569+ let remote_index_path = format ! ( "{archive_path}.{ARCHIVE_INDEX_FILE_EXTENSION}" ) ;
545570
546- // fsync parent dir to make rename durable (blocking)
547- spawn_blocking ( move || {
548- let dir = std:: fs:: File :: open ( parent) ?;
549- dir. sync_all ( ) . map_err ( Into :: into)
550- } )
571+ // We are the repairer: download fresh index into place.
572+ self . download_archive_index ( & local_index_path, & remote_index_path)
551573 . await ?;
552- }
553574
554- _read_guard = _write_guard. downgrade ( ) ;
575+ // Write lock is dropped here (end of scope), so others can proceed.
576+ drop ( write_guard) ;
577+
578+ // Final attempt: if this still fails, bubble the error.
579+ return archive_index:: find_in_file ( local_index_path, path_in_archive) . await ;
580+ }
581+
582+ // Someone else is already downloading/repairing. Don't queue on write(); just wait
583+ // a bit and poll the fast path until it becomes readable or we give up.
584+ const STEP_MS : u64 = 10 ;
585+ const ATTEMPTS : u64 = 50 ; // = 500ms total wait
586+ const TOTAL_WAIT_MS : u64 = STEP_MS * ATTEMPTS ;
587+
588+ let mut last_err = None ;
589+
590+ for _ in 0 ..ATTEMPTS {
591+ sleep ( Duration :: from_millis ( STEP_MS ) ) . await ;
592+
593+ match archive_index:: find_in_file ( local_index_path. clone ( ) , path_in_archive) . await {
594+ Ok ( res) => return Ok ( res) ,
595+ Err ( err) => {
596+ // keep waiting; repair may still be in progress
597+ last_err = Some ( err) ;
598+ }
599+ }
555600 }
556601
557- archive_index:: find_in_file ( local_index_path, path_in_archive) . await
602+ // Still not usable after waiting: return the last error we saw.
603+ Err ( last_err
604+ . unwrap_or_else ( || anyhow ! ( "archive index unavailable after repair wait" ) )
605+ . context ( format ! (
606+ "no archive index after waiting for {TOTAL_WAIT_MS}ms"
607+ ) ) )
558608 }
559609
560610 #[ instrument]
0 commit comments