Skip to content

Commit 5b42f3a

Browse files
committed
common: Add hashing support for REE arrays
1 parent b990987 commit 5b42f3a

File tree

1 file changed

+238
-0
lines changed

1 file changed

+238
-0
lines changed

datafusion/common/src/hash_utils.rs

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,76 @@ fn hash_fixed_list_array(
484484
Ok(())
485485
}
486486

487+
#[cfg(not(feature = "force_hash_collisions"))]
488+
fn hash_run_array<R: RunEndIndexType>(
489+
array: &RunArray<R>,
490+
random_state: &RandomState,
491+
hashes_buffer: &mut [u64],
492+
rehash: bool,
493+
) -> Result<()> {
494+
// We find the relevant runs that cover potentially sliced arrays, so we can only hash those
495+
// values. Then we find the runs that refer to the original runs and ensure that we apply
496+
// hashes correctly to the sliced, whether sliced at the start, end, or both.
497+
let array_offset = array.offset();
498+
let array_len = array.len();
499+
500+
if array_len == 0 {
501+
return Ok(());
502+
}
503+
504+
let run_ends = array.run_ends();
505+
let run_ends_values = run_ends.values();
506+
let values = array.values();
507+
508+
let start_physical_index = array.get_start_physical_index();
509+
// get_end_physical_index returns the inclusive last index, but we need the exclusive range end
510+
// for the operations we use below.
511+
let end_physical_index = array.get_end_physical_index() + 1;
512+
513+
let sliced_values = values.slice(
514+
start_physical_index,
515+
end_physical_index - start_physical_index,
516+
);
517+
let mut values_hashes = vec![0u64; sliced_values.len()];
518+
create_hashes(
519+
std::slice::from_ref(&sliced_values),
520+
random_state,
521+
&mut values_hashes,
522+
)?;
523+
524+
let mut start_in_slice = 0;
525+
for (adjusted_physical_index, &absolute_run_end) in run_ends_values
526+
[start_physical_index..end_physical_index]
527+
.iter()
528+
.enumerate()
529+
{
530+
let is_null_value = sliced_values.is_null(adjusted_physical_index);
531+
let absolute_run_end = absolute_run_end.as_usize();
532+
533+
let end_in_slice = (absolute_run_end - array_offset).min(array_len);
534+
535+
if rehash {
536+
if !is_null_value {
537+
let value_hash = values_hashes[adjusted_physical_index];
538+
for hash in hashes_buffer
539+
.iter_mut()
540+
.take(end_in_slice)
541+
.skip(start_in_slice)
542+
{
543+
*hash = combine_hashes(value_hash, *hash);
544+
}
545+
}
546+
} else {
547+
let value_hash = values_hashes[adjusted_physical_index];
548+
hashes_buffer[start_in_slice..end_in_slice].fill(value_hash);
549+
}
550+
551+
start_in_slice = end_in_slice;
552+
}
553+
554+
Ok(())
555+
}
556+
487557
/// Internal helper function that hashes a single array and either initializes or combines
488558
/// the hash values in the buffer.
489559
#[cfg(not(feature = "force_hash_collisions"))]
@@ -535,6 +605,10 @@ fn hash_single_array(
535605
let array = as_union_array(array)?;
536606
hash_union_array(array, random_state, hashes_buffer)?;
537607
}
608+
DataType::RunEndEncoded(_, _) => downcast_run_array! {
609+
array => hash_run_array(array, random_state, hashes_buffer, rehash)?,
610+
_ => unreachable!()
611+
}
538612
_ => {
539613
// This is internal because we should have caught this before.
540614
return _internal_err!(
@@ -803,6 +877,74 @@ mod tests {
803877
create_hash_string!(string_view_array, StringArray);
804878
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);
805879

880+
#[test]
881+
#[cfg(not(feature = "force_hash_collisions"))]
882+
fn create_hashes_for_run_array() -> Result<()> {
883+
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
884+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
885+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
886+
887+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
888+
let hashes_buff = &mut vec![0; array.len()];
889+
let hashes = create_hashes(
890+
&[Arc::clone(&array) as ArrayRef],
891+
&random_state,
892+
hashes_buff,
893+
)?;
894+
895+
assert_eq!(hashes.len(), 7);
896+
assert_eq!(hashes[0], hashes[1]);
897+
assert_eq!(hashes[2], hashes[3]);
898+
assert_eq!(hashes[3], hashes[4]);
899+
assert_eq!(hashes[5], hashes[6]);
900+
assert_ne!(hashes[0], hashes[2]);
901+
assert_ne!(hashes[2], hashes[5]);
902+
assert_ne!(hashes[0], hashes[5]);
903+
904+
Ok(())
905+
}
906+
907+
#[test]
908+
#[cfg(not(feature = "force_hash_collisions"))]
909+
fn create_multi_column_hash_with_run_array() -> Result<()> {
910+
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7]));
911+
let values = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
912+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
913+
let run_array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
914+
915+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
916+
let mut one_col_hashes = vec![0; int_array.len()];
917+
create_hashes(
918+
&[Arc::clone(&int_array) as ArrayRef],
919+
&random_state,
920+
&mut one_col_hashes,
921+
)?;
922+
923+
let mut two_col_hashes = vec![0; int_array.len()];
924+
create_hashes(
925+
&[
926+
Arc::clone(&int_array) as ArrayRef,
927+
Arc::clone(&run_array) as ArrayRef,
928+
],
929+
&random_state,
930+
&mut two_col_hashes,
931+
)?;
932+
933+
assert_eq!(one_col_hashes.len(), 7);
934+
assert_eq!(two_col_hashes.len(), 7);
935+
assert_ne!(one_col_hashes, two_col_hashes);
936+
937+
let diff_0_vs_1_one_col = one_col_hashes[0] != one_col_hashes[1];
938+
let diff_0_vs_1_two_col = two_col_hashes[0] != two_col_hashes[1];
939+
assert_eq!(diff_0_vs_1_one_col, diff_0_vs_1_two_col);
940+
941+
let diff_2_vs_3_one_col = one_col_hashes[2] != one_col_hashes[3];
942+
let diff_2_vs_3_two_col = two_col_hashes[2] != two_col_hashes[3];
943+
assert_eq!(diff_2_vs_3_one_col, diff_2_vs_3_two_col);
944+
945+
Ok(())
946+
}
947+
806948
#[test]
807949
// Tests actual values of hashes, which are different if forcing collisions
808950
#[cfg(not(feature = "force_hash_collisions"))]
@@ -1321,4 +1463,100 @@ mod tests {
13211463
// 67 vs 67
13221464
assert_eq!(hashes[0], hashes[4]);
13231465
}
1466+
1467+
#[test]
1468+
#[cfg(not(feature = "force_hash_collisions"))]
1469+
fn create_hashes_for_sliced_run_array() -> Result<()> {
1470+
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
1471+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
1472+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1473+
1474+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1475+
let mut full_hashes = vec![0; array.len()];
1476+
create_hashes(
1477+
&[Arc::clone(&array) as ArrayRef],
1478+
&random_state,
1479+
&mut full_hashes,
1480+
)?;
1481+
1482+
let array_ref: ArrayRef = Arc::clone(&array) as ArrayRef;
1483+
let sliced_array = array_ref.slice(2, 3);
1484+
1485+
let mut sliced_hashes = vec![0; sliced_array.len()];
1486+
create_hashes(
1487+
std::slice::from_ref(&sliced_array),
1488+
&random_state,
1489+
&mut sliced_hashes,
1490+
)?;
1491+
1492+
assert_eq!(sliced_hashes.len(), 3);
1493+
assert_eq!(sliced_hashes[0], sliced_hashes[1]);
1494+
assert_eq!(sliced_hashes[1], sliced_hashes[2]);
1495+
assert_eq!(&sliced_hashes, &full_hashes[2..5]);
1496+
1497+
Ok(())
1498+
}
1499+
1500+
#[test]
1501+
#[cfg(not(feature = "force_hash_collisions"))]
1502+
fn test_run_array_with_nulls() -> Result<()> {
1503+
let values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1504+
let run_ends = Arc::new(Int32Array::from(vec![2, 4, 6]));
1505+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1506+
1507+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1508+
let mut hashes = vec![0; array.len()];
1509+
create_hashes(
1510+
&[Arc::clone(&array) as ArrayRef],
1511+
&random_state,
1512+
&mut hashes,
1513+
)?;
1514+
1515+
assert_eq!(hashes[0], hashes[1]);
1516+
assert_ne!(hashes[0], 0);
1517+
assert_eq!(hashes[2], hashes[3]);
1518+
assert_eq!(hashes[2], 0);
1519+
assert_eq!(hashes[4], hashes[5]);
1520+
assert_ne!(hashes[4], 0);
1521+
assert_ne!(hashes[0], hashes[4]);
1522+
1523+
Ok(())
1524+
}
1525+
1526+
#[test]
1527+
#[cfg(not(feature = "force_hash_collisions"))]
1528+
fn test_run_array_with_nulls_multicolumn() -> Result<()> {
1529+
let primitive_array = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1530+
let run_values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1531+
let run_ends = Arc::new(Int32Array::from(vec![1, 2, 3]));
1532+
let run_array =
1533+
Arc::new(RunArray::try_new(&run_ends, run_values.as_ref()).unwrap());
1534+
let second_col = Arc::new(Int32Array::from(vec![100, 200, 300]));
1535+
1536+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1537+
1538+
let mut primitive_hashes = vec![0; 3];
1539+
create_hashes(
1540+
&[
1541+
Arc::clone(&primitive_array) as ArrayRef,
1542+
Arc::clone(&second_col) as ArrayRef,
1543+
],
1544+
&random_state,
1545+
&mut primitive_hashes,
1546+
)?;
1547+
1548+
let mut run_hashes = vec![0; 3];
1549+
create_hashes(
1550+
&[
1551+
Arc::clone(&run_array) as ArrayRef,
1552+
Arc::clone(&second_col) as ArrayRef,
1553+
],
1554+
&random_state,
1555+
&mut run_hashes,
1556+
)?;
1557+
1558+
assert_eq!(primitive_hashes, run_hashes);
1559+
1560+
Ok(())
1561+
}
13241562
}

0 commit comments

Comments
 (0)