Skip to content

Commit 36c0949

Browse files
committed
dist: make installation asynchronous
1 parent 3b25dca commit 36c0949

File tree

2 files changed

+117
-21
lines changed

2 files changed

+117
-21
lines changed

src/diskio/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ impl IncrementalFileState {
290290
/// Trait object for performing IO. At this point the overhead
291291
/// of trait invocation is not a bottleneck, but if it becomes
292292
/// one we could consider an enum variant based approach instead.
293-
pub(crate) trait Executor {
293+
pub(crate) trait Executor: Send {
294294
/// Perform a single operation.
295295
/// During overload situations previously queued items may
296296
/// need to be completed before the item is accepted:

src/dist/manifestation.rs

Lines changed: 116 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,17 @@
44
#[cfg(test)]
55
mod tests;
66

7+
use std::collections::VecDeque;
78
use std::path::Path;
9+
use std::pin::Pin;
810
use std::sync::Arc;
11+
use std::task::{Context, Poll};
12+
use std::vec;
913

10-
use anyhow::{Context, Result, anyhow, bail};
14+
use anyhow::{Context as _, Result, anyhow, bail};
15+
use futures_util::Stream;
1116
use futures_util::stream::{FuturesUnordered, StreamExt};
17+
use tokio::task::{JoinHandle, spawn_blocking};
1218
use tracing::{info, warn};
1319

1420
use crate::diskio::{Executor, IO_CHUNK_SIZE, get_executor, unpack_ram};
@@ -211,34 +217,38 @@ impl Manifestation {
211217
}
212218

213219
info!("downloading component(s)");
214-
let mut downloads = FuturesUnordered::new();
215-
let mut component_iter = components.into_iter();
216-
let mut cleanup_downloads = vec![];
217-
let manifestation = Arc::new(self);
218-
loop {
219-
if downloads.is_empty() && component_iter.len() == 0 {
220-
break;
221-
}
222-
223-
let installable = downloads.next().await.transpose()?;
224-
while component_iter.len() > 0 && downloads.len() < concurrent_downloads {
225-
if let Some(bin) = component_iter.next() {
226-
downloads.push(bin.download(max_retries));
220+
let mut stream = InstallEvents::new(components.into_iter(), Arc::new(self));
221+
let mut transaction = Some(tx);
222+
let mut tx = loop {
223+
// Refill downloads when there's capacity
224+
// Must live outside of `InstallEvents` because we can't write the type of future
225+
while stream.components.len() > 0 && stream.downloads.len() < concurrent_downloads {
226+
if let Some(bin) = stream.components.next() {
227+
stream.downloads.push(bin.download(max_retries));
227228
}
228229
}
229230

230-
if let Some((installable, hash)) = installable {
231-
cleanup_downloads.push(hash);
232-
tx = installable.install(tx, manifestation.clone())?;
231+
// Trigger another installation if no other installation is in progress, as evidenced
232+
// by whether `transaction` is `Some` (not held by another installation task).
233+
stream.try_install(&mut transaction);
234+
match stream.next().await {
235+
// Completed an installation, yielding the transaction back
236+
Some(Ok(tx)) => match stream.is_done() {
237+
true => break tx,
238+
false => transaction = Some(tx),
239+
},
240+
Some(Err(e)) => return Err(e),
241+
// A download completed, so we can trigger another one
242+
None => {}
233243
}
234-
}
244+
};
235245

236246
// Install new distribution manifest
237247
let new_manifest_str = new_manifest.clone().stringify()?;
238248
tx.modify_file(rel_installed_manifest_path)?;
239249
utils::write_file("manifest", &installed_manifest_path, &new_manifest_str)?;
240-
download_cfg.clean(&cleanup_downloads)?;
241-
drop(downloads);
250+
download_cfg.clean(&stream.cleanup_downloads)?;
251+
drop(stream);
242252

243253
// Write configuration.
244254
//
@@ -445,6 +455,92 @@ impl Manifestation {
445455
}
446456
}
447457

458+
struct InstallEvents<'a, F> {
459+
manifestation: Arc<Manifestation>,
460+
components: vec::IntoIter<ComponentBinary<'a>>,
461+
cleanup_downloads: Vec<&'a str>,
462+
install_queue: VecDeque<ComponentInstall>,
463+
installing: Option<JoinHandle<Result<Transaction>>>,
464+
downloads: FuturesUnordered<F>,
465+
}
466+
467+
impl<'a, F> InstallEvents<'a, F> {
468+
fn new(
469+
components: vec::IntoIter<ComponentBinary<'a>>,
470+
manifestation: Arc<Manifestation>,
471+
) -> Self {
472+
Self {
473+
manifestation,
474+
cleanup_downloads: Vec::with_capacity(components.len()),
475+
components,
476+
install_queue: VecDeque::new(),
477+
installing: None,
478+
downloads: FuturesUnordered::new(),
479+
}
480+
}
481+
482+
fn try_install(&mut self, tx: &mut Option<Transaction>) {
483+
if let Some(installable) = self.install_queue.pop_front() {
484+
if let Some(tx) = tx.take() {
485+
let manifestation = self.manifestation.clone();
486+
self.installing = Some(spawn_blocking(|| installable.install(tx, manifestation)));
487+
} else {
488+
self.install_queue.push_front(installable);
489+
}
490+
}
491+
}
492+
493+
fn is_done(&self) -> bool {
494+
self.components.len() == 0 && self.downloads.is_empty() && self.install_queue.is_empty()
495+
}
496+
}
497+
498+
impl<'a, F: Future<Output = Result<(ComponentInstall, &'a str)>>> Stream for InstallEvents<'a, F> {
499+
type Item = Result<Transaction>;
500+
501+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
502+
loop {
503+
let mut this = self.as_mut();
504+
505+
// First, see if any of the downloads is complete; if so, yield `None`
506+
// to the caller so it can trigger another download.
507+
match Pin::new(&mut this.downloads).poll_next(cx) {
508+
Poll::Ready(Some(Ok((installable, hash)))) => {
509+
this.cleanup_downloads.push(hash);
510+
this.install_queue.push_back(installable);
511+
return Poll::Ready(None);
512+
}
513+
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
514+
Poll::Ready(None) | Poll::Pending => {}
515+
}
516+
517+
let Some(handle) = &mut this.installing else {
518+
return match self.install_queue.is_empty() {
519+
// Nothing to do, yield control to the runtime
520+
true => Poll::Pending,
521+
// Can try to start the next installation
522+
false => Poll::Ready(None),
523+
};
524+
};
525+
526+
match Pin::new(handle).poll(cx) {
527+
Poll::Ready(Ok(Ok(tx))) => {
528+
// Current `handle` must not be polled again
529+
this.installing = None;
530+
return Poll::Ready(Some(Ok(tx)));
531+
}
532+
Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Err(e))),
533+
Poll::Ready(Err(e)) => {
534+
return Poll::Ready(Some(Err(anyhow!(
535+
"internal error during installation: {e}"
536+
))));
537+
}
538+
Poll::Pending => return Poll::Pending,
539+
}
540+
}
541+
}
542+
}
543+
448544
#[derive(Debug, Default)]
449545
struct Update {
450546
components_to_uninstall: Vec<Component>,

0 commit comments

Comments
 (0)