33
44use std:: fmt:: Debug ;
55use std:: iter;
6- use std:: sync:: LazyLock ;
7-
8- use tokio:: fs:: File ;
9- use tokio:: runtime:: { self , Runtime } ;
10- use tokio:: sync:: mpsc;
11- use tokio:: sync:: mpsc:: Sender ;
12- use tokio:: task:: JoinHandle ;
13- use tokio_stream:: wrappers:: ReceiverStream ;
6+
7+ use futures:: channel:: mpsc;
8+ use futures:: channel:: mpsc:: Sender ;
9+ use futures:: { SinkExt , TryStreamExt } ;
10+ use parking_lot:: Mutex ;
1411use vortex:: ArrayRef ;
1512use vortex:: dtype:: Nullability :: { NonNullable , Nullable } ;
1613use vortex:: dtype:: { DType , StructFields } ;
1714use vortex:: error:: { VortexExpect , VortexResult , vortex_err} ;
1815use vortex:: file:: { VortexWriteOptions , WriteSummary } ;
16+ use vortex:: io:: runtime:: current:: CurrentThreadWorkerPool ;
17+ use vortex:: io:: runtime:: { BlockingRuntime , Task } ;
1918use vortex:: stream:: ArrayStreamAdapter ;
2019
20+ use crate :: RUNTIME ;
2121use crate :: convert:: { data_chunk_to_arrow, from_duckdb_table} ;
2222use crate :: duckdb:: { CopyFunction , DataChunk , LogicalType } ;
2323
@@ -29,21 +29,19 @@ pub struct BindData {
2929 fields : StructFields ,
3030}
3131
32- static COPY_RUNTIME : LazyLock < Runtime > = LazyLock :: new ( || {
33- runtime:: Builder :: new_current_thread ( )
34- . enable_all ( )
35- . build ( )
36- . vortex_expect ( "Cannot start runtime" )
37- } ) ;
38-
3932/// Write to a file has two phases, writing data chunks and then closing the file.
4033/// We use a spawned tokio task to actually compress arrays are write it to disk.
4134/// Each chunk is pushed into the sink and read from the task.
4235/// Once finished we can close all sinks and then the task can be awaited and the file
4336/// flushed to disk.
4437pub struct GlobalState {
45- write_task : Option < JoinHandle < VortexResult < WriteSummary > > > ,
38+ write_task : Mutex < Option < Task < VortexResult < WriteSummary > > > > ,
4639 sink : Option < Sender < VortexResult < ArrayRef > > > ,
40+ // Pool of background workers helping to drive the write task.
41+ // Note that this is optional and without it, we would only drive the task when DuckDB calls
42+ // into us, and we call `RUNTIME.block_on`.
43+ #[ allow( dead_code) ]
44+ worker_pool : CurrentThreadWorkerPool ,
4745}
4846
4947impl CopyFunction for VortexCopyFunction {
@@ -76,10 +74,10 @@ impl CopyFunction for VortexCopyFunction {
7674 chunk : & mut DataChunk ,
7775 ) -> VortexResult < ( ) > {
7876 let chunk = data_chunk_to_arrow ( bind_data. fields . names ( ) , chunk) ;
79- COPY_RUNTIME . block_on ( async {
77+ RUNTIME . block_on ( |_h| async {
8078 init_global
8179 . sink
82- . as_ref ( )
80+ . as_mut ( )
8381 . vortex_expect ( "sink closed early" )
8482 . send ( chunk)
8583 . await
@@ -93,15 +91,16 @@ impl CopyFunction for VortexCopyFunction {
9391 _bind_data : & Self :: BindData ,
9492 init_global : & mut Self :: GlobalState ,
9593 ) -> VortexResult < ( ) > {
96- COPY_RUNTIME . block_on ( async {
94+ RUNTIME . block_on ( |_h| async {
9795 if let Some ( sink) = init_global. sink . take ( ) {
9896 drop ( sink)
9997 }
100- init_global
98+ let task = init_global
10199 . write_task
100+ . lock ( )
102101 . take ( )
103- . vortex_expect ( "no file to close" )
104- . await ? ?;
102+ . vortex_expect ( "no file to close" ) ;
103+ task . await ?;
105104 Ok ( ( ) )
106105 } )
107106 }
@@ -112,18 +111,22 @@ impl CopyFunction for VortexCopyFunction {
112111 ) -> VortexResult < Self :: GlobalState > {
113112 // The channel size 32 was chosen arbitrarily.
114113 let ( sink, rx) = mpsc:: channel ( 32 ) ;
115- let array_stream =
116- ArrayStreamAdapter :: new ( bind_data. dtype . clone ( ) , ReceiverStream :: new ( rx) ) ;
114+ let array_stream = ArrayStreamAdapter :: new ( bind_data. dtype . clone ( ) , rx. into_stream ( ) ) ;
117115
118- let writer = COPY_RUNTIME . spawn ( async move {
119- let mut file = File :: create ( file_path) . await ?;
116+ let writer = RUNTIME . handle ( ) . spawn_nested ( |h| async move {
117+ let mut file = async_fs :: File :: create ( file_path) . await ?;
120118 VortexWriteOptions :: default ( )
119+ . with_handle ( h)
121120 . write ( & mut file, array_stream)
122121 . await
123122 } ) ;
124123
124+ let worker_pool = RUNTIME . new_pool ( ) ;
125+ worker_pool. set_workers_to_available_parallelism ( ) ;
126+
125127 Ok ( GlobalState {
126- write_task : Some ( writer) ,
128+ worker_pool,
129+ write_task : Mutex :: new ( Some ( writer) ) ,
127130 sink : Some ( sink) ,
128131 } )
129132 }
0 commit comments