1313 */
1414package io .trino .spiller ;
1515
16- import com .google .common .annotations .VisibleForTesting ;
1716import com .google .common .collect .AbstractIterator ;
1817import com .google .common .collect .ImmutableList ;
1918import com .google .common .io .Closer ;
2019import com .google .common .util .concurrent .Futures ;
2120import com .google .common .util .concurrent .ListenableFuture ;
2221import com .google .common .util .concurrent .ListeningExecutorService ;
23- import io .airlift .slice .OutputStreamSliceOutput ;
2422import io .airlift .slice .Slice ;
25- import io .airlift .slice .SliceOutput ;
2623import io .airlift .units .DataSize ;
2724import io .trino .annotation .NotThreadSafe ;
2825import io .trino .execution .buffer .PageDeserializer ;
4239import java .io .UncheckedIOException ;
4340import java .nio .file .Files ;
4441import java .nio .file .Path ;
42+ import java .util .ArrayList ;
4543import java .util .Iterator ;
4644import java .util .List ;
4745import java .util .Optional ;
4846import java .util .concurrent .atomic .AtomicBoolean ;
4947import java .util .concurrent .atomic .AtomicLong ;
5048
49+ import static com .google .common .base .Preconditions .checkArgument ;
5150import static com .google .common .base .Preconditions .checkState ;
51+ import static com .google .common .collect .Iterators .transform ;
5252import static com .google .common .util .concurrent .Futures .immediateFuture ;
5353import static io .trino .spi .StandardErrorCode .GENERIC_INTERNAL_ERROR ;
5454import static io .trino .spiller .FileSingleStreamSpillerFactory .SPILL_FILE_PREFIX ;
5555import static io .trino .spiller .FileSingleStreamSpillerFactory .SPILL_FILE_SUFFIX ;
56- import static java .nio .file .StandardOpenOption .APPEND ;
5756import static java .util .Objects .requireNonNull ;
5857
5958@ NotThreadSafe
6059public class FileSingleStreamSpiller
6160 implements SingleStreamSpiller
6261{
63- @ VisibleForTesting
64- static final int BUFFER_SIZE = 4 * 1024 ;
62+ private final List < SpillFile > spillFiles ;
63+ private volatile int currentFileIndex ;
6564
66- private final FileHolder targetFile ;
6765 private final Closer closer = Closer .create ();
6866 private final PagesSerdeFactory serdeFactory ;
6967 private volatile Optional <SecretKey > encryptionKey ;
@@ -84,12 +82,15 @@ public FileSingleStreamSpiller(
8482 PagesSerdeFactory serdeFactory ,
8583 Optional <SecretKey > encryptionKey ,
8684 ListeningExecutorService executor ,
87- Path spillPath ,
85+ List < Path > spillPaths ,
8886 SpillerStats spillerStats ,
8987 SpillContext spillContext ,
9088 LocalMemoryContext memoryContext ,
9189 Runnable fileSystemErrorHandler )
9290 {
91+ requireNonNull (spillPaths , "spillPaths is null" );
92+ checkArgument (!spillPaths .isEmpty (), "spillPaths is empty" );
93+
9394 this .serdeFactory = requireNonNull (serdeFactory , "serdeFactory is null" );
9495 this .encryptionKey = requireNonNull (encryptionKey , "encryptionKey is null" );
9596 this .encrypted = encryptionKey .isPresent ();
@@ -107,10 +108,14 @@ public FileSingleStreamSpiller(
107108 // This means we start accounting for the memory before the spiller thread allocates it, and we release the memory reservation
108109 // before/after the spiller thread allocates that memory -- -- whether before or after depends on whether writePages() is in the
109110 // middle of execution when close() is called (note that this applies to both readPages() and writePages() methods).
110- this .memoryContext .setBytes (BUFFER_SIZE );
111+ this .memoryContext .setBytes (( long ) SpillFile . BUFFER_SIZE * spillPaths . size () );
111112 this .fileSystemErrorHandler = requireNonNull (fileSystemErrorHandler , "filesystemErrorHandler is null" );
112113 try {
113- this .targetFile = closer .register (new FileHolder (Files .createTempFile (spillPath , SPILL_FILE_PREFIX , SPILL_FILE_SUFFIX )));
114+ ImmutableList .Builder <SpillFile > builder = ImmutableList .builderWithExpectedSize (spillPaths .size ());
115+ for (Path path : spillPaths ) {
116+ builder .add (closer .register (new SpillFile (Files .createTempFile (path , SPILL_FILE_PREFIX , SPILL_FILE_SUFFIX ))));
117+ }
118+ this .spillFiles = builder .build ();
114119 }
115120 catch (IOException e ) {
116121 this .fileSystemErrorHandler .run ();
@@ -137,61 +142,136 @@ public long getSpilledPagesInMemorySize()
137142 public Iterator <Page > getSpilledPages ()
138143 {
139144 checkNoSpillInProgress ();
140- return readPages ();
145+ checkState (writable .getAndSet (false ), "Repeated reads are disallowed to prevent potential resource leaks" );
146+
147+ try {
148+ Optional <SecretKey > encryptionKey = this .encryptionKey ;
149+ checkState (encrypted == encryptionKey .isPresent (), "encryptionKey has been discarded" );
150+
151+ PageDeserializer deserializer = serdeFactory .createDeserializer (encryptionKey );
152+ this .encryptionKey = Optional .empty ();
153+
154+ int fileCount = spillFiles .size ();
155+ List <Iterator <Page >> iterators = new ArrayList <>(fileCount );
156+ for (SpillFile file : spillFiles ) {
157+ iterators .add (readFilePages (deserializer , file , closer ));
158+ }
159+
160+ return new AbstractIterator <>()
161+ {
162+ int fileIndex ;
163+
164+ @ Override
165+ protected Page computeNext ()
166+ {
167+ Iterator <Page > iterator = iterators .get (fileIndex );
168+ if (!iterator .hasNext ()) {
169+ checkAllIteratorsExhausted (iterators );
170+ return endOfData ();
171+ }
172+
173+ Page page = iterator .next ();
174+ fileIndex = (fileIndex + 1 ) % fileCount ;
175+ return page ;
176+ }
177+ };
178+ }
179+ catch (IOException e ) {
180+ fileSystemErrorHandler .run ();
181+ throw new TrinoException (GENERIC_INTERNAL_ERROR , "Failed to read spilled pages" , e );
182+ }
141183 }
142184
143185 @ Override
144186 public ListenableFuture <List <Page >> getAllSpilledPages ()
145187 {
146- return executor .submit (() -> ImmutableList .copyOf (getSpilledPages ()));
188+ checkNoSpillInProgress ();
189+ checkState (writable .getAndSet (false ), "Repeated reads are disallowed to prevent potential resource leaks" );
190+
191+ Optional <SecretKey > encryptionKey = this .encryptionKey ;
192+ checkState (encrypted == encryptionKey .isPresent (), "encryptionKey has been discarded" );
193+
194+ this .encryptionKey = Optional .empty ();
195+
196+ List <ListenableFuture <List <Page >>> futures = new ArrayList <>();
197+ for (SpillFile file : spillFiles ) {
198+ futures .add (executor .submit (() -> {
199+ PageDeserializer deserializer = serdeFactory .createDeserializer (encryptionKey );
200+ ImmutableList .Builder <Page > pages = ImmutableList .builder ();
201+ try (Closer closer = Closer .create ()) {
202+ readFilePages (deserializer , file , closer ).forEachRemaining (pages ::add );
203+ }
204+ return pages .build ();
205+ }));
206+ }
207+
208+ // Combine pages from all spill files according to the round-robin order.
209+ return Futures .transform (Futures .allAsList (futures ), pagesPerFile -> {
210+ ImmutableList .Builder <Page > builder = ImmutableList .builderWithExpectedSize (pagesPerFile .stream ().mapToInt (List ::size ).sum ());
211+ int fileCount = spillFiles .size ();
212+
213+ List <Iterator <Page >> iterators = new ArrayList <>(fileCount );
214+ for (List <Page > pages : pagesPerFile ) {
215+ iterators .add (pages .iterator ());
216+ }
217+
218+ int fileIndex = 0 ;
219+ while (iterators .get (fileIndex ).hasNext ()) {
220+ builder .add (iterators .get (fileIndex ).next ());
221+ fileIndex = (fileIndex + 1 ) % fileCount ;
222+ }
223+ checkAllIteratorsExhausted (iterators );
224+ return builder .build ();
225+ }, executor );
226+ }
227+
228+ private static void checkAllIteratorsExhausted (List <Iterator <Page >> iterators )
229+ {
230+ iterators .forEach (iterator -> checkState (!iterator .hasNext (), "spill file iterator not fully consumed" ));
147231 }
148232
149- private DataSize writePages (Iterator <Page > pageIterator )
233+ private DataSize writePages (Iterator <Page > pages )
150234 {
151235 checkState (writable .get (), "Spilling no longer allowed. The spiller has been made non-writable on first read for subsequent reads to be consistent" );
152236
153237 Optional <SecretKey > encryptionKey = this .encryptionKey ;
154238 checkState (encrypted == encryptionKey .isPresent (), "encryptionKey has been discarded" );
155239 PageSerializer serializer = serdeFactory .createSerializer (encryptionKey );
240+
156241 long spilledPagesBytes = 0 ;
157- try (SliceOutput output = new OutputStreamSliceOutput (targetFile .newOutputStream (APPEND ), BUFFER_SIZE )) {
158- while (pageIterator .hasNext ()) {
159- Page page = pageIterator .next ();
242+ int fileIndex = currentFileIndex ;
243+ int fileCount = spillFiles .size ();
244+
245+ try {
246+ while (pages .hasNext ()) {
247+ Page page = pages .next ();
160248 long pageSizeInBytes = page .getSizeInBytes ();
249+ Slice serialized = serializer .serialize (page );
250+ long serializedPageSize = serialized .length ();
251+
252+ spillFiles .get (fileIndex ).writeBytes (serialized );
253+
161254 spilledPagesBytes += pageSizeInBytes ;
255+
162256 spilledPagesInMemorySize .addAndGet (pageSizeInBytes );
163- Slice serializedPage = serializer .serialize (page );
164- long pageSize = serializedPage .length ();
165- localSpillContext .updateBytes (pageSize );
166- spillerStats .addToTotalSpilledBytes (pageSize );
167- output .writeBytes (serializedPage );
257+ localSpillContext .updateBytes (serializedPageSize );
258+ spillerStats .addToTotalSpilledBytes (serializedPageSize );
259+
260+ fileIndex = (fileIndex + 1 ) % fileCount ;
261+ }
262+
263+ currentFileIndex = fileIndex ;
264+
265+ for (SpillFile file : spillFiles ) {
266+ file .closeOutput ();
168267 }
169268 }
170269 catch (UncheckedIOException | IOException e ) {
171270 fileSystemErrorHandler .run ();
172271 throw new TrinoException (GENERIC_INTERNAL_ERROR , "Failed to spill pages" , e );
173272 }
174- return DataSize .ofBytes (spilledPagesBytes );
175- }
176273
177- private Iterator <Page > readPages ()
178- {
179- checkState (writable .getAndSet (false ), "Repeated reads are disallowed to prevent potential resource leaks" );
180-
181- try {
182- Optional <SecretKey > encryptionKey = this .encryptionKey ;
183- checkState (encrypted == encryptionKey .isPresent (), "encryptionKey has been discarded" );
184- PageDeserializer deserializer = serdeFactory .createDeserializer (encryptionKey );
185- // encryption key is safe to discard since it now belongs to the PageDeserializer and repeated reads are disallowed
186- this .encryptionKey = Optional .empty ();
187- InputStream input = closer .register (targetFile .newInputStream ());
188- Iterator <Page > pages = PagesSerdeUtil .readPages (deserializer , input );
189- return closeWhenExhausted (pages , input );
190- }
191- catch (IOException e ) {
192- fileSystemErrorHandler .run ();
193- throw new TrinoException (GENERIC_INTERNAL_ERROR , "Failed to read spilled pages" , e );
194- }
274+ return DataSize .ofBytes (spilledPagesBytes );
195275 }
196276
197277 @ Override
@@ -215,6 +295,17 @@ private void checkNoSpillInProgress()
215295 checkState (spillInProgress .isDone (), "spill in progress" );
216296 }
217297
298+ /**
299+ * Returns an iterator that exposes all pages stored in the given file.
300+ * Pages are lazily deserialized as the iterator is consumed.
301+ */
302+ private Iterator <Page > readFilePages (PageDeserializer deserializer , SpillFile file , Closer closer )
303+ throws IOException
304+ {
305+ InputStream input = closer .register (file .newInputStream ());
306+ return transform (closeWhenExhausted (PagesSerdeUtil .readSerializedPages (input ), input ), deserializer ::deserialize );
307+ }
308+
218309 private static <T > Iterator <T > closeWhenExhausted (Iterator <T > iterator , Closeable resource )
219310 {
220311 requireNonNull (iterator , "iterator is null" );
0 commit comments