From 703315d1213bbe34ed2a17237d466bd8eedc7f2f Mon Sep 17 00:00:00 2001 From: Alfonso Subiotto Marques Date: Sun, 19 Oct 2025 14:23:47 +0200 Subject: [PATCH] performance[vortex-scan]: improve IO pipelining The issue with the current approach of using buffered(concurrency) is that all of the concurrency tasks currently executing might be awaiting the same coalesced IO operation, which stops other split_exec tasks from making progress by issuing and awaiting their own IO requests. The approach proposed in this commit is to group all split_exec tasks into concurrency Futures{Ordered,Unordered} groups so concurrency tasks are executing in parallel, but other futures in that group can make progress concurrently during IO awaits. This results in better pipelining IO operations with the downside of possibly increased memory use. This patch was motivated because drive_send seems to expect to expect to execute CONCURRENCY (192 by default) concurrent requests but this is in practice much less because of the split_exec scheduling strategy. Signed-off-by: Alfonso Subiotto Marques --- vortex-scan/src/repeated_scan.rs | 57 +++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/vortex-scan/src/repeated_scan.rs b/vortex-scan/src/repeated_scan.rs index 40e4d769d22..0526c217265 100644 --- a/vortex-scan/src/repeated_scan.rs +++ b/vortex-scan/src/repeated_scan.rs @@ -5,8 +5,9 @@ use std::ops::Range; use std::sync::Arc; use std::{cmp, iter}; -use futures::Stream; use futures::future::BoxFuture; +use futures::stream::{FuturesOrdered, FuturesUnordered}; +use futures::{Stream, StreamExt}; use itertools::{Either, Itertools}; use vortex_array::ArrayRef; use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter}; @@ -166,23 +167,61 @@ impl RepeatedScan { &self, row_range: Option>, ) -> VortexResult> + Send + 'static + use> { - use futures::StreamExt; let num_workers = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1); let concurrency = self.concurrency * num_workers; let handle = self.handle.clone(); - let stream = - futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task)); - - let stream = if self.ordered { - stream.buffered(concurrency).boxed() + let tasks = self.execute(row_range)?; + let ordered = self.ordered; + + let stream = if tasks.len() < concurrency { + let stream = futures::stream::iter(tasks).map(move |task| handle.spawn(task)); + let stream = if ordered { + stream.buffered(concurrency).boxed() + } else { + stream.buffer_unordered(concurrency).boxed() + }; + stream + .filter_map(|chunk| async move { chunk.transpose() }) + .boxed() } else { - stream.buffer_unordered(concurrency).boxed() + // Each group of tasks.len() / concurrency will be spawned + // separately. Since individual tasks may be blocked awaiting IO + // results, grouping them allows polling other tasks in the + // meantime that in turn will trigger IO requests and therefore + // offer better IO pipelining. + let chunk_size = tasks.len() / concurrency; + let task_chunks: Vec> = tasks + .into_iter() + .chunks(chunk_size) + .into_iter() + .map(|chunk| chunk.collect()) + .collect(); + + let stream = futures::stream::iter(task_chunks.into_iter().map(move |chunk| { + if ordered { + Either::Left(FuturesOrdered::from_iter(chunk).collect::>()) + } else { + Either::Right(FuturesUnordered::from_iter(chunk).collect::>()) + } + })) + .map(move |task| handle.spawn(task)); + + let stream = if ordered { + stream.buffered(concurrency).boxed() + } else { + stream.buffer_unordered(concurrency).boxed() + }; + + stream + .flat_map(futures::stream::iter) + .filter_map(|chunk| async move { chunk.transpose() }) + .boxed() }; - Ok(stream.filter_map(|chunk| async move { chunk.transpose() })) + Ok(stream) } }