Skip to content

Commit 63a8c65

Browse files
authored
common: Add hashing support for REE arrays (#18981)
## Which issue does this PR close? Part of #16011 ## What changes are included in this PR? Support for hashing of REE arrays ## Are these changes tested? Unit tests are added. ## Are there any user-facing changes? No, strictly additive behavior/new feature. @alamb @vegarsti
1 parent 71fdad0 commit 63a8c65

File tree

1 file changed

+238
-0
lines changed

1 file changed

+238
-0
lines changed

datafusion/common/src/hash_utils.rs

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,76 @@ fn hash_fixed_list_array(
484484
Ok(())
485485
}
486486

487+
#[cfg(not(feature = "force_hash_collisions"))]
488+
fn hash_run_array<R: RunEndIndexType>(
489+
array: &RunArray<R>,
490+
random_state: &RandomState,
491+
hashes_buffer: &mut [u64],
492+
rehash: bool,
493+
) -> Result<()> {
494+
// We find the relevant runs that cover potentially sliced arrays, so we can only hash those
495+
// values. Then we find the runs that refer to the original runs and ensure that we apply
496+
// hashes correctly to the sliced, whether sliced at the start, end, or both.
497+
let array_offset = array.offset();
498+
let array_len = array.len();
499+
500+
if array_len == 0 {
501+
return Ok(());
502+
}
503+
504+
let run_ends = array.run_ends();
505+
let run_ends_values = run_ends.values();
506+
let values = array.values();
507+
508+
let start_physical_index = array.get_start_physical_index();
509+
// get_end_physical_index returns the inclusive last index, but we need the exclusive range end
510+
// for the operations we use below.
511+
let end_physical_index = array.get_end_physical_index() + 1;
512+
513+
let sliced_values = values.slice(
514+
start_physical_index,
515+
end_physical_index - start_physical_index,
516+
);
517+
let mut values_hashes = vec![0u64; sliced_values.len()];
518+
create_hashes(
519+
std::slice::from_ref(&sliced_values),
520+
random_state,
521+
&mut values_hashes,
522+
)?;
523+
524+
let mut start_in_slice = 0;
525+
for (adjusted_physical_index, &absolute_run_end) in run_ends_values
526+
[start_physical_index..end_physical_index]
527+
.iter()
528+
.enumerate()
529+
{
530+
let is_null_value = sliced_values.is_null(adjusted_physical_index);
531+
let absolute_run_end = absolute_run_end.as_usize();
532+
533+
let end_in_slice = (absolute_run_end - array_offset).min(array_len);
534+
535+
if rehash {
536+
if !is_null_value {
537+
let value_hash = values_hashes[adjusted_physical_index];
538+
for hash in hashes_buffer
539+
.iter_mut()
540+
.take(end_in_slice)
541+
.skip(start_in_slice)
542+
{
543+
*hash = combine_hashes(value_hash, *hash);
544+
}
545+
}
546+
} else {
547+
let value_hash = values_hashes[adjusted_physical_index];
548+
hashes_buffer[start_in_slice..end_in_slice].fill(value_hash);
549+
}
550+
551+
start_in_slice = end_in_slice;
552+
}
553+
554+
Ok(())
555+
}
556+
487557
/// Internal helper function that hashes a single array and either initializes or combines
488558
/// the hash values in the buffer.
489559
#[cfg(not(feature = "force_hash_collisions"))]
@@ -535,6 +605,10 @@ fn hash_single_array(
535605
let array = as_union_array(array)?;
536606
hash_union_array(array, random_state, hashes_buffer)?;
537607
}
608+
DataType::RunEndEncoded(_, _) => downcast_run_array! {
609+
array => hash_run_array(array, random_state, hashes_buffer, rehash)?,
610+
_ => unreachable!()
611+
}
538612
_ => {
539613
// This is internal because we should have caught this before.
540614
return _internal_err!(
@@ -803,6 +877,74 @@ mod tests {
803877
create_hash_string!(string_view_array, StringArray);
804878
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);
805879

880+
#[test]
881+
#[cfg(not(feature = "force_hash_collisions"))]
882+
fn create_hashes_for_run_array() -> Result<()> {
883+
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
884+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
885+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
886+
887+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
888+
let hashes_buff = &mut vec![0; array.len()];
889+
let hashes = create_hashes(
890+
&[Arc::clone(&array) as ArrayRef],
891+
&random_state,
892+
hashes_buff,
893+
)?;
894+
895+
assert_eq!(hashes.len(), 7);
896+
assert_eq!(hashes[0], hashes[1]);
897+
assert_eq!(hashes[2], hashes[3]);
898+
assert_eq!(hashes[3], hashes[4]);
899+
assert_eq!(hashes[5], hashes[6]);
900+
assert_ne!(hashes[0], hashes[2]);
901+
assert_ne!(hashes[2], hashes[5]);
902+
assert_ne!(hashes[0], hashes[5]);
903+
904+
Ok(())
905+
}
906+
907+
#[test]
908+
#[cfg(not(feature = "force_hash_collisions"))]
909+
fn create_multi_column_hash_with_run_array() -> Result<()> {
910+
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7]));
911+
let values = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
912+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
913+
let run_array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
914+
915+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
916+
let mut one_col_hashes = vec![0; int_array.len()];
917+
create_hashes(
918+
&[Arc::clone(&int_array) as ArrayRef],
919+
&random_state,
920+
&mut one_col_hashes,
921+
)?;
922+
923+
let mut two_col_hashes = vec![0; int_array.len()];
924+
create_hashes(
925+
&[
926+
Arc::clone(&int_array) as ArrayRef,
927+
Arc::clone(&run_array) as ArrayRef,
928+
],
929+
&random_state,
930+
&mut two_col_hashes,
931+
)?;
932+
933+
assert_eq!(one_col_hashes.len(), 7);
934+
assert_eq!(two_col_hashes.len(), 7);
935+
assert_ne!(one_col_hashes, two_col_hashes);
936+
937+
let diff_0_vs_1_one_col = one_col_hashes[0] != one_col_hashes[1];
938+
let diff_0_vs_1_two_col = two_col_hashes[0] != two_col_hashes[1];
939+
assert_eq!(diff_0_vs_1_one_col, diff_0_vs_1_two_col);
940+
941+
let diff_2_vs_3_one_col = one_col_hashes[2] != one_col_hashes[3];
942+
let diff_2_vs_3_two_col = two_col_hashes[2] != two_col_hashes[3];
943+
assert_eq!(diff_2_vs_3_one_col, diff_2_vs_3_two_col);
944+
945+
Ok(())
946+
}
947+
806948
#[test]
807949
// Tests actual values of hashes, which are different if forcing collisions
808950
#[cfg(not(feature = "force_hash_collisions"))]
@@ -1323,4 +1465,100 @@ mod tests {
13231465
// 67 vs 67
13241466
assert_eq!(hashes[0], hashes[4]);
13251467
}
1468+
1469+
#[test]
1470+
#[cfg(not(feature = "force_hash_collisions"))]
1471+
fn create_hashes_for_sliced_run_array() -> Result<()> {
1472+
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
1473+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
1474+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1475+
1476+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1477+
let mut full_hashes = vec![0; array.len()];
1478+
create_hashes(
1479+
&[Arc::clone(&array) as ArrayRef],
1480+
&random_state,
1481+
&mut full_hashes,
1482+
)?;
1483+
1484+
let array_ref: ArrayRef = Arc::clone(&array) as ArrayRef;
1485+
let sliced_array = array_ref.slice(2, 3);
1486+
1487+
let mut sliced_hashes = vec![0; sliced_array.len()];
1488+
create_hashes(
1489+
std::slice::from_ref(&sliced_array),
1490+
&random_state,
1491+
&mut sliced_hashes,
1492+
)?;
1493+
1494+
assert_eq!(sliced_hashes.len(), 3);
1495+
assert_eq!(sliced_hashes[0], sliced_hashes[1]);
1496+
assert_eq!(sliced_hashes[1], sliced_hashes[2]);
1497+
assert_eq!(&sliced_hashes, &full_hashes[2..5]);
1498+
1499+
Ok(())
1500+
}
1501+
1502+
#[test]
1503+
#[cfg(not(feature = "force_hash_collisions"))]
1504+
fn test_run_array_with_nulls() -> Result<()> {
1505+
let values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1506+
let run_ends = Arc::new(Int32Array::from(vec![2, 4, 6]));
1507+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1508+
1509+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1510+
let mut hashes = vec![0; array.len()];
1511+
create_hashes(
1512+
&[Arc::clone(&array) as ArrayRef],
1513+
&random_state,
1514+
&mut hashes,
1515+
)?;
1516+
1517+
assert_eq!(hashes[0], hashes[1]);
1518+
assert_ne!(hashes[0], 0);
1519+
assert_eq!(hashes[2], hashes[3]);
1520+
assert_eq!(hashes[2], 0);
1521+
assert_eq!(hashes[4], hashes[5]);
1522+
assert_ne!(hashes[4], 0);
1523+
assert_ne!(hashes[0], hashes[4]);
1524+
1525+
Ok(())
1526+
}
1527+
1528+
#[test]
1529+
#[cfg(not(feature = "force_hash_collisions"))]
1530+
fn test_run_array_with_nulls_multicolumn() -> Result<()> {
1531+
let primitive_array = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1532+
let run_values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1533+
let run_ends = Arc::new(Int32Array::from(vec![1, 2, 3]));
1534+
let run_array =
1535+
Arc::new(RunArray::try_new(&run_ends, run_values.as_ref()).unwrap());
1536+
let second_col = Arc::new(Int32Array::from(vec![100, 200, 300]));
1537+
1538+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1539+
1540+
let mut primitive_hashes = vec![0; 3];
1541+
create_hashes(
1542+
&[
1543+
Arc::clone(&primitive_array) as ArrayRef,
1544+
Arc::clone(&second_col) as ArrayRef,
1545+
],
1546+
&random_state,
1547+
&mut primitive_hashes,
1548+
)?;
1549+
1550+
let mut run_hashes = vec![0; 3];
1551+
create_hashes(
1552+
&[
1553+
Arc::clone(&run_array) as ArrayRef,
1554+
Arc::clone(&second_col) as ArrayRef,
1555+
],
1556+
&random_state,
1557+
&mut run_hashes,
1558+
)?;
1559+
1560+
assert_eq!(primitive_hashes, run_hashes);
1561+
1562+
Ok(())
1563+
}
13261564
}

0 commit comments

Comments
 (0)