diff --git a/vortex-scan/src/repeated_scan.rs b/vortex-scan/src/repeated_scan.rs index 40e4d769d22..0526c217265 100644 --- a/vortex-scan/src/repeated_scan.rs +++ b/vortex-scan/src/repeated_scan.rs @@ -5,8 +5,9 @@ use std::ops::Range; use std::sync::Arc; use std::{cmp, iter}; -use futures::Stream; use futures::future::BoxFuture; +use futures::stream::{FuturesOrdered, FuturesUnordered}; +use futures::{Stream, StreamExt}; use itertools::{Either, Itertools}; use vortex_array::ArrayRef; use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter}; @@ -166,23 +167,61 @@ impl RepeatedScan { &self, row_range: Option>, ) -> VortexResult> + Send + 'static + use> { - use futures::StreamExt; let num_workers = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1); let concurrency = self.concurrency * num_workers; let handle = self.handle.clone(); - let stream = - futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task)); - - let stream = if self.ordered { - stream.buffered(concurrency).boxed() + let tasks = self.execute(row_range)?; + let ordered = self.ordered; + + let stream = if tasks.len() < concurrency { + let stream = futures::stream::iter(tasks).map(move |task| handle.spawn(task)); + let stream = if ordered { + stream.buffered(concurrency).boxed() + } else { + stream.buffer_unordered(concurrency).boxed() + }; + stream + .filter_map(|chunk| async move { chunk.transpose() }) + .boxed() } else { - stream.buffer_unordered(concurrency).boxed() + // Each group of tasks.len() / concurrency will be spawned + // separately. Since individual tasks may be blocked awaiting IO + // results, grouping them allows polling other tasks in the + // meantime that in turn will trigger IO requests and therefore + // offer better IO pipelining. + let chunk_size = tasks.len() / concurrency; + let task_chunks: Vec> = tasks + .into_iter() + .chunks(chunk_size) + .into_iter() + .map(|chunk| chunk.collect()) + .collect(); + + let stream = futures::stream::iter(task_chunks.into_iter().map(move |chunk| { + if ordered { + Either::Left(FuturesOrdered::from_iter(chunk).collect::>()) + } else { + Either::Right(FuturesUnordered::from_iter(chunk).collect::>()) + } + })) + .map(move |task| handle.spawn(task)); + + let stream = if ordered { + stream.buffered(concurrency).boxed() + } else { + stream.buffer_unordered(concurrency).boxed() + }; + + stream + .flat_map(futures::stream::iter) + .filter_map(|chunk| async move { chunk.transpose() }) + .boxed() }; - Ok(stream.filter_map(|chunk| async move { chunk.transpose() })) + Ok(stream) } }