Skip to content

Commit b9d36b6

Browse files
committed
dist: make installation asynchronous
1 parent 3b25dca commit b9d36b6

File tree

2 files changed

+111
-21
lines changed

2 files changed

+111
-21
lines changed

src/diskio/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ impl IncrementalFileState {
290290
/// Trait object for performing IO. At this point the overhead
291291
/// of trait invocation is not a bottleneck, but if it becomes
292292
/// one we could consider an enum variant based approach instead.
293-
pub(crate) trait Executor {
293+
pub(crate) trait Executor: Send {
294294
/// Perform a single operation.
295295
/// During overload situations previously queued items may
296296
/// need to be completed before the item is accepted:

src/dist/manifestation.rs

Lines changed: 110 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,17 @@
44
#[cfg(test)]
55
mod tests;
66

7+
use std::collections::VecDeque;
78
use std::path::Path;
9+
use std::pin::Pin;
810
use std::sync::Arc;
11+
use std::task::{Context, Poll, ready};
12+
use std::vec;
913

10-
use anyhow::{Context, Result, anyhow, bail};
14+
use anyhow::{Context as _, Result, anyhow, bail};
15+
use futures_util::Stream;
1116
use futures_util::stream::{FuturesUnordered, StreamExt};
17+
use tokio::task::{JoinHandle, spawn_blocking};
1218
use tracing::{info, warn};
1319

1420
use crate::diskio::{Executor, IO_CHUNK_SIZE, get_executor, unpack_ram};
@@ -211,34 +217,38 @@ impl Manifestation {
211217
}
212218

213219
info!("downloading component(s)");
214-
let mut downloads = FuturesUnordered::new();
215-
let mut component_iter = components.into_iter();
216-
let mut cleanup_downloads = vec![];
217-
let manifestation = Arc::new(self);
218-
loop {
219-
if downloads.is_empty() && component_iter.len() == 0 {
220-
break;
221-
}
222-
223-
let installable = downloads.next().await.transpose()?;
224-
while component_iter.len() > 0 && downloads.len() < concurrent_downloads {
225-
if let Some(bin) = component_iter.next() {
226-
downloads.push(bin.download(max_retries));
220+
let mut stream = InstallEvents::new(components.into_iter(), Arc::new(self));
221+
let mut transaction = Some(tx);
222+
let mut tx = loop {
223+
// Refill downloads when there's capacity
224+
// Must live outside of `InstallEvents` because we can't write the type of future
225+
while stream.components.len() > 0 && stream.downloads.len() < concurrent_downloads {
226+
if let Some(bin) = stream.components.next() {
227+
stream.downloads.push(bin.download(max_retries));
227228
}
228229
}
229230

230-
if let Some((installable, hash)) = installable {
231-
cleanup_downloads.push(hash);
232-
tx = installable.install(tx, manifestation.clone())?;
231+
// Trigger another installation if no other installation is in progress, as evidenced
232+
// by whether `transaction` is `Some` (not held by another installation task).
233+
stream.try_install(&mut transaction);
234+
match stream.next().await {
235+
// Completed an installation, yielding the transaction back
236+
Some(Ok(tx)) => match stream.is_done() {
237+
true => break tx,
238+
false => transaction = Some(tx),
239+
},
240+
Some(Err(e)) => return Err(e),
241+
// A download completed, so we can trigger another one
242+
None => {}
233243
}
234-
}
244+
};
235245

236246
// Install new distribution manifest
237247
let new_manifest_str = new_manifest.clone().stringify()?;
238248
tx.modify_file(rel_installed_manifest_path)?;
239249
utils::write_file("manifest", &installed_manifest_path, &new_manifest_str)?;
240-
download_cfg.clean(&cleanup_downloads)?;
241-
drop(downloads);
250+
download_cfg.clean(&stream.cleanup_downloads)?;
251+
drop(stream);
242252

243253
// Write configuration.
244254
//
@@ -445,6 +455,86 @@ impl Manifestation {
445455
}
446456
}
447457

458+
struct InstallEvents<'a, F> {
459+
manifestation: Arc<Manifestation>,
460+
components: vec::IntoIter<ComponentBinary<'a>>,
461+
cleanup_downloads: Vec<&'a str>,
462+
install_queue: VecDeque<ComponentInstall>,
463+
installing: Option<JoinHandle<Result<Transaction>>>,
464+
downloads: FuturesUnordered<F>,
465+
}
466+
467+
impl<'a, F> InstallEvents<'a, F> {
468+
fn new(
469+
components: vec::IntoIter<ComponentBinary<'a>>,
470+
manifestation: Arc<Manifestation>,
471+
) -> Self {
472+
Self {
473+
manifestation,
474+
cleanup_downloads: Vec::with_capacity(components.len()),
475+
components,
476+
install_queue: VecDeque::new(),
477+
installing: None,
478+
downloads: FuturesUnordered::new(),
479+
}
480+
}
481+
482+
fn try_install(&mut self, tx: &mut Option<Transaction>) {
483+
if let Some(installable) = self.install_queue.pop_front() {
484+
if let Some(tx) = tx.take() {
485+
let manifestation = self.manifestation.clone();
486+
self.installing = Some(spawn_blocking(|| installable.install(tx, manifestation)));
487+
} else {
488+
self.install_queue.push_front(installable);
489+
}
490+
}
491+
}
492+
493+
fn is_done(&self) -> bool {
494+
self.components.len() == 0 && self.downloads.is_empty() && self.install_queue.is_empty()
495+
}
496+
}
497+
498+
impl<'a, F: Future<Output = Result<(ComponentInstall, &'a str)>>> Stream for InstallEvents<'a, F> {
499+
type Item = Result<Transaction>;
500+
501+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
502+
// First, see if any of the downloads is complete; if so, yield `None`
503+
// to the caller so it can trigger another download.
504+
let mut this = self.as_mut();
505+
match Pin::new(&mut this.downloads).poll_next(cx) {
506+
Poll::Ready(Some(Ok((installable, hash)))) => {
507+
this.cleanup_downloads.push(hash);
508+
this.install_queue.push_back(installable);
509+
return Poll::Ready(None);
510+
}
511+
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
512+
Poll::Ready(None) | Poll::Pending => {}
513+
}
514+
515+
let Some(handle) = &mut this.installing else {
516+
return match self.install_queue.is_empty() {
517+
// Nothing to do, yield control to the runtime
518+
true => Poll::Pending,
519+
// Can try to start the next installation
520+
false => Poll::Ready(None),
521+
};
522+
};
523+
524+
match ready!(Pin::new(handle).poll(cx)) {
525+
Ok(Ok(tx)) => {
526+
// Current `handle` must not be polled again
527+
this.installing = None;
528+
Poll::Ready(Some(Ok(tx)))
529+
}
530+
Ok(Err(e)) => Poll::Ready(Some(Err(e))),
531+
Err(e) => Poll::Ready(Some(Err(anyhow!(
532+
"internal error during installation: {e}"
533+
)))),
534+
}
535+
}
536+
}
537+
448538
#[derive(Debug, Default)]
449539
struct Update {
450540
components_to_uninstall: Vec<Component>,

0 commit comments

Comments
 (0)