Skip to content

Commit a43f33d

Browse files
authored
Allow repartitioning on files with ranges (#18948)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Fixes #18940 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> When any partitioned file had ranges selected (even if the range contained the whole file), repartitioning was being skipped. There was no good reason to disallow this. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - I have now allowed this and change the implementation to respect file ranges. - Also did a very small refactor to make the code easier to read: replace `state` tuple and `state.0`/`state.1` with proper variable names using destructing ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Added a couple of unit tests. Let me know if more are needed. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 7b4593f commit a43f33d

File tree

2 files changed

+213
-48
lines changed

2 files changed

+213
-48
lines changed

datafusion/datasource/src/file_groups.rs

Lines changed: 195 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,6 @@ impl FileGroupPartitioner {
189189
return None;
190190
}
191191

192-
// Perform redistribution only in case all files should be read from beginning to end
193-
let has_ranges = file_groups
194-
.iter()
195-
.flat_map(FileGroup::iter)
196-
.any(|f| f.range.is_some());
197-
if has_ranges {
198-
return None;
199-
}
200-
201192
// special case when order must be preserved
202193
if self.preserve_order_within_groups {
203194
self.repartition_preserving_order(file_groups)
@@ -218,14 +209,13 @@ impl FileGroupPartitioner {
218209

219210
let total_size = flattened_files
220211
.iter()
221-
.map(|f| f.object_meta.size as i64)
222-
.sum::<i64>();
223-
if total_size < (repartition_file_min_size as i64) || total_size == 0 {
212+
.map(|f| f.effective_size())
213+
.sum::<u64>();
214+
if total_size < (repartition_file_min_size as u64) || total_size == 0 {
224215
return None;
225216
}
226217

227-
let target_partition_size =
228-
(total_size as u64).div_ceil(target_partitions as u64);
218+
let target_partition_size = total_size.div_ceil(target_partitions as u64);
229219

230220
let current_partition_index: usize = 0;
231221
let current_partition_size: u64 = 0;
@@ -235,27 +225,30 @@ impl FileGroupPartitioner {
235225
.into_iter()
236226
.scan(
237227
(current_partition_index, current_partition_size),
238-
|state, source_file| {
228+
|(current_partition_index, current_partition_size), source_file| {
239229
let mut produced_files = vec![];
240-
let mut range_start = 0;
241-
while range_start < source_file.object_meta.size {
230+
let (mut range_start, file_end) = source_file.range();
231+
while range_start < file_end {
242232
let range_end = min(
243-
range_start + (target_partition_size - state.1),
244-
source_file.object_meta.size,
233+
range_start
234+
+ (target_partition_size - *current_partition_size),
235+
file_end,
245236
);
246237

247238
let mut produced_file = source_file.clone();
248239
produced_file.range = Some(FileRange {
249240
start: range_start as i64,
250241
end: range_end as i64,
251242
});
252-
produced_files.push((state.0, produced_file));
243+
produced_files.push((*current_partition_index, produced_file));
253244

254-
if state.1 + (range_end - range_start) >= target_partition_size {
255-
state.0 += 1;
256-
state.1 = 0;
245+
if *current_partition_size + (range_end - range_start)
246+
>= target_partition_size
247+
{
248+
*current_partition_index += 1;
249+
*current_partition_size = 0;
257250
} else {
258-
state.1 += range_end - range_start;
251+
*current_partition_size += range_end - range_start;
259252
}
260253
range_start = range_end;
261254
}
@@ -297,7 +290,7 @@ impl FileGroupPartitioner {
297290
if group.len() == 1 {
298291
Some(ToRepartition {
299292
source_index: group_index,
300-
file_size: group[0].object_meta.size,
293+
file_size: group[0].effective_size(),
301294
new_groups: vec![group_index],
302295
})
303296
} else {
@@ -333,28 +326,31 @@ impl FileGroupPartitioner {
333326

334327
// Distribute files to their newly assigned groups
335328
while let Some(to_repartition) = heap.pop() {
336-
let range_size = to_repartition.range_size() as i64;
329+
let range_size = to_repartition.range_size();
337330
let ToRepartition {
338331
source_index,
339-
file_size,
332+
file_size: _,
340333
new_groups,
341334
} = to_repartition.into_inner();
342335
assert_eq!(file_groups[source_index].len(), 1);
343336
let original_file = file_groups[source_index].pop().unwrap();
344337

345338
let last_group = new_groups.len() - 1;
346-
let mut range_start: i64 = 0;
347-
let mut range_end: i64 = range_size;
339+
let (mut range_start, file_end) = original_file.range();
340+
let mut range_end = range_start + range_size;
348341
for (i, group_index) in new_groups.into_iter().enumerate() {
349342
let target_group = &mut file_groups[group_index];
350343
assert!(target_group.is_empty());
351344

352345
// adjust last range to include the entire file
353346
if i == last_group {
354-
range_end = file_size as i64;
347+
range_end = file_end;
355348
}
356-
target_group
357-
.push(original_file.clone().with_range(range_start, range_end));
349+
target_group.push(
350+
original_file
351+
.clone()
352+
.with_range(range_start as i64, range_end as i64),
353+
);
358354
range_start = range_end;
359355
range_end += range_size;
360356
}
@@ -645,6 +641,68 @@ mod test {
645641
assert_partitioned_files(expected, actual);
646642
}
647643

644+
#[test]
645+
fn repartition_single_file_with_range() {
646+
// Single file, single partition into multiple partitions
647+
let single_partition =
648+
vec![FileGroup::new(vec![pfile("a", 123).with_range(0, 123)])];
649+
650+
let actual = FileGroupPartitioner::new()
651+
.with_target_partitions(4)
652+
.with_repartition_file_min_size(10)
653+
.repartition_file_groups(&single_partition);
654+
655+
let expected = Some(vec![
656+
FileGroup::new(vec![pfile("a", 123).with_range(0, 31)]),
657+
FileGroup::new(vec![pfile("a", 123).with_range(31, 62)]),
658+
FileGroup::new(vec![pfile("a", 123).with_range(62, 93)]),
659+
FileGroup::new(vec![pfile("a", 123).with_range(93, 123)]),
660+
]);
661+
assert_partitioned_files(expected, actual);
662+
}
663+
664+
#[test]
665+
fn repartition_single_file_with_incomplete_range() {
666+
// Single file, single partition into multiple partitions
667+
let single_partition =
668+
vec![FileGroup::new(vec![pfile("a", 123).with_range(10, 100)])];
669+
670+
let actual = FileGroupPartitioner::new()
671+
.with_target_partitions(4)
672+
.with_repartition_file_min_size(10)
673+
.repartition_file_groups(&single_partition);
674+
675+
let expected = Some(vec![
676+
FileGroup::new(vec![pfile("a", 123).with_range(10, 33)]),
677+
FileGroup::new(vec![pfile("a", 123).with_range(33, 56)]),
678+
FileGroup::new(vec![pfile("a", 123).with_range(56, 79)]),
679+
FileGroup::new(vec![pfile("a", 123).with_range(79, 100)]),
680+
]);
681+
assert_partitioned_files(expected, actual);
682+
}
683+
684+
#[test]
685+
fn repartition_single_file_duplicated_with_range() {
686+
// Single file, two partitions into multiple partitions
687+
let single_partition = vec![FileGroup::new(vec![
688+
pfile("a", 100).with_range(0, 50),
689+
pfile("a", 100).with_range(50, 100),
690+
])];
691+
692+
let actual = FileGroupPartitioner::new()
693+
.with_target_partitions(4)
694+
.with_repartition_file_min_size(10)
695+
.repartition_file_groups(&single_partition);
696+
697+
let expected = Some(vec![
698+
FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
699+
FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
700+
FileGroup::new(vec![pfile("a", 100).with_range(50, 75)]),
701+
FileGroup::new(vec![pfile("a", 100).with_range(75, 100)]),
702+
]);
703+
assert_partitioned_files(expected, actual);
704+
}
705+
648706
#[test]
649707
fn repartition_too_much_partitions() {
650708
// Single file, single partition into 96 partitions
@@ -717,22 +775,6 @@ mod test {
717775
assert_partitioned_files(expected, actual);
718776
}
719777

720-
#[test]
721-
fn repartition_no_action_ranges() {
722-
// No action due to Some(range) in second file
723-
let source_partitions = vec![
724-
FileGroup::new(vec![pfile("a", 123)]),
725-
FileGroup::new(vec![pfile("b", 144).with_range(1, 50)]),
726-
];
727-
728-
let actual = FileGroupPartitioner::new()
729-
.with_target_partitions(65)
730-
.with_repartition_file_min_size(10)
731-
.repartition_file_groups(&source_partitions);
732-
733-
assert_partitioned_files(None, actual)
734-
}
735-
736778
#[test]
737779
fn repartition_no_action_min_size() {
738780
// No action due to target_partition_size
@@ -809,6 +851,26 @@ mod test {
809851
assert_partitioned_files(expected, actual);
810852
}
811853

854+
#[test]
855+
fn repartition_ordered_one_large_file_with_range() {
856+
// "Rebalance" the single large file across partitions
857+
let source_partitions =
858+
vec![FileGroup::new(vec![pfile("a", 100).with_range(0, 100)])];
859+
860+
let actual = FileGroupPartitioner::new()
861+
.with_preserve_order_within_groups(true)
862+
.with_target_partitions(3)
863+
.with_repartition_file_min_size(10)
864+
.repartition_file_groups(&source_partitions);
865+
866+
let expected = Some(vec![
867+
FileGroup::new(vec![pfile("a", 100).with_range(0, 34)]),
868+
FileGroup::new(vec![pfile("a", 100).with_range(34, 68)]),
869+
FileGroup::new(vec![pfile("a", 100).with_range(68, 100)]),
870+
]);
871+
assert_partitioned_files(expected, actual);
872+
}
873+
812874
#[test]
813875
fn repartition_ordered_one_large_one_small_file() {
814876
// "Rebalance" the single large file across empty partitions, but can't split
@@ -837,6 +899,91 @@ mod test {
837899
assert_partitioned_files(expected, actual);
838900
}
839901

902+
#[test]
903+
fn repartition_ordered_one_large_one_small_file_with_full_range() {
904+
// "Rebalance" the single large file across empty partitions, but can't split
905+
// small file
906+
let source_partitions = vec![
907+
FileGroup::new(vec![pfile("a", 100).with_range(0, 100)]),
908+
FileGroup::new(vec![pfile("b", 30)]),
909+
];
910+
911+
let actual = FileGroupPartitioner::new()
912+
.with_preserve_order_within_groups(true)
913+
.with_target_partitions(4)
914+
.with_repartition_file_min_size(10)
915+
.repartition_file_groups(&source_partitions);
916+
917+
let expected = Some(vec![
918+
// scan first third of "a"
919+
FileGroup::new(vec![pfile("a", 100).with_range(0, 33)]),
920+
// only b in this group (can't do this)
921+
FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
922+
// second third of "a"
923+
FileGroup::new(vec![pfile("a", 100).with_range(33, 66)]),
924+
// final third of "a"
925+
FileGroup::new(vec![pfile("a", 100).with_range(66, 100)]),
926+
]);
927+
assert_partitioned_files(expected, actual);
928+
}
929+
930+
#[test]
931+
fn repartition_ordered_one_large_one_small_file_with_split_range() {
932+
// "Rebalance" the single large file across empty partitions, but can't split
933+
// small file
934+
let source_partitions = vec![
935+
FileGroup::new(vec![pfile("a", 100).with_range(0, 50)]),
936+
FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
937+
FileGroup::new(vec![pfile("b", 30)]),
938+
];
939+
940+
let actual = FileGroupPartitioner::new()
941+
.with_preserve_order_within_groups(true)
942+
.with_target_partitions(4)
943+
.with_repartition_file_min_size(10)
944+
.repartition_file_groups(&source_partitions);
945+
946+
let expected = Some(vec![
947+
// scan first half of first "a"
948+
FileGroup::new(vec![pfile("a", 100).with_range(0, 25)]),
949+
// second "a" fully (not split)
950+
FileGroup::new(vec![pfile("a", 100).with_range(50, 100)]),
951+
// only b in this group (can't do this)
952+
FileGroup::new(vec![pfile("b", 30).with_range(0, 30)]),
953+
// second half of first "a"
954+
FileGroup::new(vec![pfile("a", 100).with_range(25, 50)]),
955+
]);
956+
assert_partitioned_files(expected, actual);
957+
}
958+
959+
#[test]
960+
fn repartition_ordered_one_large_one_small_file_with_non_full_range() {
961+
// "Rebalance" the single large file across empty partitions, but can't split
962+
// small file
963+
let source_partitions = vec![
964+
FileGroup::new(vec![pfile("a", 100).with_range(20, 80)]),
965+
FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
966+
];
967+
968+
let actual = FileGroupPartitioner::new()
969+
.with_preserve_order_within_groups(true)
970+
.with_target_partitions(4)
971+
.with_repartition_file_min_size(10)
972+
.repartition_file_groups(&source_partitions);
973+
974+
let expected = Some(vec![
975+
// scan first third of "a"
976+
FileGroup::new(vec![pfile("a", 100).with_range(20, 40)]),
977+
// only b in this group (can't split this)
978+
FileGroup::new(vec![pfile("b", 30).with_range(5, 25)]),
979+
// second third of "a"
980+
FileGroup::new(vec![pfile("a", 100).with_range(40, 60)]),
981+
// final third of "a"
982+
FileGroup::new(vec![pfile("a", 100).with_range(60, 80)]),
983+
]);
984+
assert_partitioned_files(expected, actual);
985+
}
986+
840987
#[test]
841988
fn repartition_ordered_two_large_files() {
842989
// "Rebalance" two large files across empty partitions, but can't mix them

datafusion/datasource/src/mod.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,24 @@ impl PartitionedFile {
159159
.with_range(start, end)
160160
}
161161

162+
/// Size of the file to be scanned (taking into account the range, if present).
163+
pub fn effective_size(&self) -> u64 {
164+
if let Some(range) = &self.range {
165+
(range.end - range.start) as u64
166+
} else {
167+
self.object_meta.size
168+
}
169+
}
170+
171+
/// Effective range of the file to be scanned.
172+
pub fn range(&self) -> (u64, u64) {
173+
if let Some(range) = &self.range {
174+
(range.start as u64, range.end as u64)
175+
} else {
176+
(0, self.object_meta.size)
177+
}
178+
}
179+
162180
/// Provide a hint to the size of the file metadata. If a hint is provided
163181
/// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
164182
/// Without an appropriate hint, two read may be required to fetch the metadata.

0 commit comments

Comments
 (0)