Skip to content

Commit df760ce

Browse files
committed
dist: make installation asynchronous
1 parent 3b25dca commit df760ce

File tree

2 files changed

+126
-22
lines changed

2 files changed

+126
-22
lines changed

src/diskio/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ impl IncrementalFileState {
290290
/// Trait object for performing IO. At this point the overhead
291291
/// of trait invocation is not a bottleneck, but if it becomes
292292
/// one we could consider an enum variant based approach instead.
293-
pub(crate) trait Executor {
293+
pub(crate) trait Executor: Send {
294294
/// Perform a single operation.
295295
/// During overload situations previously queued items may
296296
/// need to be completed before the item is accepted:

src/dist/manifestation.rs

Lines changed: 125 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,17 @@
44
#[cfg(test)]
55
mod tests;
66

7+
use std::collections::VecDeque;
78
use std::path::Path;
9+
use std::pin::Pin;
810
use std::sync::Arc;
11+
use std::task::{Context, Poll};
12+
use std::vec;
913

10-
use anyhow::{Context, Result, anyhow, bail};
14+
use anyhow::{Context as _, Result, anyhow, bail};
15+
use futures_util::Stream;
1116
use futures_util::stream::{FuturesUnordered, StreamExt};
17+
use tokio::task::{JoinHandle, spawn_blocking};
1218
use tracing::{info, warn};
1319

1420
use crate::diskio::{Executor, IO_CHUNK_SIZE, get_executor, unpack_ram};
@@ -211,34 +217,44 @@ impl Manifestation {
211217
}
212218

213219
info!("downloading component(s)");
214-
let mut downloads = FuturesUnordered::new();
215-
let mut component_iter = components.into_iter();
216-
let mut cleanup_downloads = vec![];
217-
let manifestation = Arc::new(self);
218-
loop {
219-
if downloads.is_empty() && component_iter.len() == 0 {
220-
break;
221-
}
220+
let mut tx = if !components.is_empty() {
221+
let mut stream = InstallEvents::new(components.into_iter(), Arc::new(self));
222+
let mut transaction = Some(tx);
223+
let tx = loop {
224+
// Refill downloads when there's capacity
225+
// Must live outside of `InstallEvents` because we can't write the type of future
226+
while stream.components.len() > 0 && stream.downloads.len() < concurrent_downloads {
227+
if let Some(bin) = stream.components.next() {
228+
stream.downloads.push(bin.download(max_retries));
229+
}
230+
}
222231

223-
let installable = downloads.next().await.transpose()?;
224-
while component_iter.len() > 0 && downloads.len() < concurrent_downloads {
225-
if let Some(bin) = component_iter.next() {
226-
downloads.push(bin.download(max_retries));
232+
// Trigger another installation if no other installation is in progress, as evidenced
233+
// by whether `transaction` is `Some` (not held by another installation task).
234+
stream.try_install(&mut transaction);
235+
match stream.next().await {
236+
// Completed an installation, yielding the transaction back
237+
Some(Ok(tx)) => match stream.is_done() {
238+
true => break tx,
239+
false => transaction = Some(tx),
240+
},
241+
Some(Err(e)) => return Err(e),
242+
// A download completed, so we can trigger another one
243+
None => {}
227244
}
228-
}
245+
};
229246

230-
if let Some((installable, hash)) = installable {
231-
cleanup_downloads.push(hash);
232-
tx = installable.install(tx, manifestation.clone())?;
233-
}
234-
}
247+
download_cfg.clean(&stream.cleanup_downloads)?;
248+
drop(stream);
249+
tx
250+
} else {
251+
tx
252+
};
235253

236254
// Install new distribution manifest
237255
let new_manifest_str = new_manifest.clone().stringify()?;
238256
tx.modify_file(rel_installed_manifest_path)?;
239257
utils::write_file("manifest", &installed_manifest_path, &new_manifest_str)?;
240-
download_cfg.clean(&cleanup_downloads)?;
241-
drop(downloads);
242258

243259
// Write configuration.
244260
//
@@ -445,6 +461,94 @@ impl Manifestation {
445461
}
446462
}
447463

464+
struct InstallEvents<'a, F> {
465+
manifestation: Arc<Manifestation>,
466+
components: vec::IntoIter<ComponentBinary<'a>>,
467+
cleanup_downloads: Vec<&'a str>,
468+
install_queue: VecDeque<ComponentInstall>,
469+
installing: Option<JoinHandle<Result<Transaction>>>,
470+
downloads: FuturesUnordered<F>,
471+
}
472+
473+
impl<'a, F> InstallEvents<'a, F> {
474+
fn new(
475+
components: vec::IntoIter<ComponentBinary<'a>>,
476+
manifestation: Arc<Manifestation>,
477+
) -> Self {
478+
Self {
479+
manifestation,
480+
cleanup_downloads: Vec::with_capacity(components.len()),
481+
components,
482+
install_queue: VecDeque::new(),
483+
installing: None,
484+
downloads: FuturesUnordered::new(),
485+
}
486+
}
487+
488+
fn try_install(&mut self, tx: &mut Option<Transaction>) {
489+
let Some(installable) = self.install_queue.pop_front() else {
490+
return;
491+
};
492+
493+
if let Some(tx) = tx.take() {
494+
let manifestation = self.manifestation.clone();
495+
self.installing = Some(spawn_blocking(|| installable.install(tx, manifestation)));
496+
} else {
497+
self.install_queue.push_front(installable);
498+
}
499+
}
500+
501+
fn is_done(&self) -> bool {
502+
self.components.len() == 0 && self.downloads.is_empty() && self.install_queue.is_empty()
503+
}
504+
}
505+
506+
impl<'a, F: Future<Output = Result<(ComponentInstall, &'a str)>>> Stream for InstallEvents<'a, F> {
507+
type Item = Result<Transaction>;
508+
509+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
510+
loop {
511+
let mut this = self.as_mut();
512+
513+
// First, see if any of the downloads is complete; if so, yield `None`
514+
// to the caller so it can trigger another download.
515+
match Pin::new(&mut this.downloads).poll_next(cx) {
516+
Poll::Ready(Some(Ok((installable, hash)))) => {
517+
this.cleanup_downloads.push(hash);
518+
this.install_queue.push_back(installable);
519+
return Poll::Ready(None);
520+
}
521+
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
522+
Poll::Ready(None) | Poll::Pending => {}
523+
}
524+
525+
let Some(handle) = &mut this.installing else {
526+
return match self.install_queue.is_empty() {
527+
// Nothing to do, yield control to the runtime
528+
true => Poll::Pending,
529+
// Can try to start the next installation
530+
false => Poll::Ready(None),
531+
};
532+
};
533+
534+
match Pin::new(handle).poll(cx) {
535+
Poll::Ready(Ok(Ok(tx))) => {
536+
// Current `handle` must not be polled again
537+
this.installing = None;
538+
return Poll::Ready(Some(Ok(tx)));
539+
}
540+
Poll::Ready(Ok(Err(e))) => return Poll::Ready(Some(Err(e))),
541+
Poll::Ready(Err(e)) => {
542+
return Poll::Ready(Some(Err(anyhow!(
543+
"internal error during installation: {e}"
544+
))));
545+
}
546+
Poll::Pending => return Poll::Pending,
547+
}
548+
}
549+
}
550+
}
551+
448552
#[derive(Debug, Default)]
449553
struct Update {
450554
components_to_uninstall: Vec<Component>,

0 commit comments

Comments
 (0)