Skip to content

Commit 4288e26

Browse files
committed
common: Add hashing support for REE arrays
1 parent b990987 commit 4288e26

File tree

2 files changed

+287
-2
lines changed

2 files changed

+287
-2
lines changed

datafusion/common/src/cast.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@ use arrow::array::{
2727
Int16Array, Int8Array, LargeBinaryArray, LargeListViewArray, LargeStringArray,
2828
ListViewArray, StringViewArray, UInt16Array,
2929
};
30+
use arrow::datatypes::RunEndIndexType;
3031
use arrow::{
3132
array::{
3233
Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
3334
Decimal256Array, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray,
3435
Float32Array, Float64Array, GenericBinaryArray, GenericListArray,
3536
GenericStringArray, Int32Array, Int64Array, IntervalDayTimeArray,
3637
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeListArray, ListArray,
37-
MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, StringArray, StructArray,
38-
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
38+
MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, RunArray, StringArray,
39+
StructArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
3940
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
4041
TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array,
4142
UInt8Array, UnionArray,
@@ -334,3 +335,8 @@ pub fn as_list_view_array(array: &dyn Array) -> Result<&ListViewArray> {
334335
pub fn as_large_list_view_array(array: &dyn Array) -> Result<&LargeListViewArray> {
335336
Ok(downcast_value!(array, LargeListViewArray))
336337
}
338+
339+
// Downcast ArrayRef to RunArray
340+
pub fn as_run_array<R: RunEndIndexType>(array: &dyn Array) -> Result<&RunArray<R>> {
341+
Ok(downcast_value!(array, RunArray, R))
342+
}

datafusion/common/src/hash_utils.rs

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,90 @@ fn hash_fixed_list_array(
484484
Ok(())
485485
}
486486

487+
#[cfg(not(feature = "force_hash_collisions"))]
488+
fn hash_run_array<R: RunEndIndexType>(
489+
array: &RunArray<R>,
490+
random_state: &RandomState,
491+
hashes_buffer: &mut [u64],
492+
rehash: bool,
493+
) -> Result<()> {
494+
// We find the relevant runs that cover potentially sliced arrays, so we can only hash those
495+
// values. Then we find the runs refer to the original runs and ensure that we apply hashes
496+
// correctly to the sliced, whether sliced at the start, end, or both.
497+
let array_offset = array.offset();
498+
let array_len = array.len();
499+
500+
if array_len == 0 {
501+
return Ok(());
502+
}
503+
504+
let run_ends = array.run_ends();
505+
let run_ends_values = run_ends.values();
506+
let values = array.values();
507+
508+
let start_physical_index = array.get_start_physical_index();
509+
// get_end_physical_index returns the inclusive last index, but we need the exclusive range end
510+
// for the operations we use below.
511+
let end_physical_index = array.get_end_physical_index() + 1;
512+
513+
let sliced_values = values.slice(
514+
start_physical_index,
515+
end_physical_index - start_physical_index,
516+
);
517+
let mut values_hashes = vec![0u64; sliced_values.len()];
518+
create_hashes(
519+
std::slice::from_ref(&sliced_values),
520+
random_state,
521+
&mut values_hashes,
522+
)?;
523+
524+
let mut logical_position = 0;
525+
for (adjusted_physical_index, &absolute_run_end) in run_ends_values
526+
[start_physical_index..end_physical_index]
527+
.iter()
528+
.enumerate()
529+
{
530+
let is_null_value = sliced_values.is_null(adjusted_physical_index);
531+
let absolute_run_end = absolute_run_end.as_usize();
532+
533+
let start_in_slice = if absolute_run_end > array_offset {
534+
logical_position
535+
} else {
536+
continue;
537+
};
538+
539+
let end_in_slice = (absolute_run_end - array_offset).min(array_len);
540+
541+
if start_in_slice >= array_len {
542+
break;
543+
}
544+
545+
if rehash {
546+
if !is_null_value {
547+
let value_hash = values_hashes[adjusted_physical_index];
548+
for hash in hashes_buffer
549+
.iter_mut()
550+
.take(end_in_slice)
551+
.skip(start_in_slice)
552+
{
553+
*hash = combine_hashes(value_hash, *hash);
554+
}
555+
}
556+
} else {
557+
let value_hash = values_hashes[adjusted_physical_index];
558+
hashes_buffer[start_in_slice..end_in_slice].fill(value_hash);
559+
}
560+
561+
logical_position = end_in_slice;
562+
563+
if logical_position >= array_len {
564+
break;
565+
}
566+
}
567+
568+
Ok(())
569+
}
570+
487571
/// Internal helper function that hashes a single array and either initializes or combines
488572
/// the hash values in the buffer.
489573
#[cfg(not(feature = "force_hash_collisions"))]
@@ -535,6 +619,10 @@ fn hash_single_array(
535619
let array = as_union_array(array)?;
536620
hash_union_array(array, random_state, hashes_buffer)?;
537621
}
622+
DataType::RunEndEncoded(_, _) => downcast_run_array! {
623+
array => hash_run_array(array, random_state, hashes_buffer, rehash)?,
624+
_ => unreachable!()
625+
}
538626
_ => {
539627
// This is internal because we should have caught this before.
540628
return _internal_err!(
@@ -803,6 +891,74 @@ mod tests {
803891
create_hash_string!(string_view_array, StringArray);
804892
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);
805893

894+
#[test]
895+
#[cfg(not(feature = "force_hash_collisions"))]
896+
fn create_hashes_for_run_array() -> Result<()> {
897+
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
898+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
899+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
900+
901+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
902+
let hashes_buff = &mut vec![0; array.len()];
903+
let hashes = create_hashes(
904+
&[Arc::clone(&array) as ArrayRef],
905+
&random_state,
906+
hashes_buff,
907+
)?;
908+
909+
assert_eq!(hashes.len(), 7);
910+
assert_eq!(hashes[0], hashes[1]);
911+
assert_eq!(hashes[2], hashes[3]);
912+
assert_eq!(hashes[3], hashes[4]);
913+
assert_eq!(hashes[5], hashes[6]);
914+
assert_ne!(hashes[0], hashes[2]);
915+
assert_ne!(hashes[2], hashes[5]);
916+
assert_ne!(hashes[0], hashes[5]);
917+
918+
Ok(())
919+
}
920+
921+
#[test]
922+
#[cfg(not(feature = "force_hash_collisions"))]
923+
fn create_multi_column_hash_with_run_array() -> Result<()> {
924+
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7]));
925+
let values = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
926+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
927+
let run_array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
928+
929+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
930+
let mut one_col_hashes = vec![0; int_array.len()];
931+
create_hashes(
932+
&[Arc::clone(&int_array) as ArrayRef],
933+
&random_state,
934+
&mut one_col_hashes,
935+
)?;
936+
937+
let mut two_col_hashes = vec![0; int_array.len()];
938+
create_hashes(
939+
&[
940+
Arc::clone(&int_array) as ArrayRef,
941+
Arc::clone(&run_array) as ArrayRef,
942+
],
943+
&random_state,
944+
&mut two_col_hashes,
945+
)?;
946+
947+
assert_eq!(one_col_hashes.len(), 7);
948+
assert_eq!(two_col_hashes.len(), 7);
949+
assert_ne!(one_col_hashes, two_col_hashes);
950+
951+
let diff_0_vs_1_one_col = one_col_hashes[0] != one_col_hashes[1];
952+
let diff_0_vs_1_two_col = two_col_hashes[0] != two_col_hashes[1];
953+
assert_eq!(diff_0_vs_1_one_col, diff_0_vs_1_two_col);
954+
955+
let diff_2_vs_3_one_col = one_col_hashes[2] != one_col_hashes[3];
956+
let diff_2_vs_3_two_col = two_col_hashes[2] != two_col_hashes[3];
957+
assert_eq!(diff_2_vs_3_one_col, diff_2_vs_3_two_col);
958+
959+
Ok(())
960+
}
961+
806962
#[test]
807963
// Tests actual values of hashes, which are different if forcing collisions
808964
#[cfg(not(feature = "force_hash_collisions"))]
@@ -1321,4 +1477,127 @@ mod tests {
13211477
// 67 vs 67
13221478
assert_eq!(hashes[0], hashes[4]);
13231479
}
1480+
1481+
#[test]
1482+
#[cfg(not(feature = "force_hash_collisions"))]
1483+
fn create_hashes_for_sliced_run_array() -> Result<()> {
1484+
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
1485+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
1486+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1487+
1488+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1489+
let mut full_hashes = vec![0; array.len()];
1490+
create_hashes(
1491+
&[Arc::clone(&array) as ArrayRef],
1492+
&random_state,
1493+
&mut full_hashes,
1494+
)?;
1495+
1496+
let array_ref: ArrayRef = Arc::clone(&array) as ArrayRef;
1497+
let sliced_array = array_ref.slice(2, 3);
1498+
1499+
let mut sliced_hashes = vec![0; sliced_array.len()];
1500+
create_hashes(
1501+
std::slice::from_ref(&sliced_array),
1502+
&random_state,
1503+
&mut sliced_hashes,
1504+
)?;
1505+
1506+
assert_eq!(sliced_hashes.len(), 3);
1507+
assert_eq!(sliced_hashes[0], sliced_hashes[1]);
1508+
assert_eq!(sliced_hashes[1], sliced_hashes[2]);
1509+
assert_eq!(sliced_hashes[0], full_hashes[2]);
1510+
assert_eq!(sliced_hashes[1], full_hashes[3]);
1511+
assert_eq!(sliced_hashes[2], full_hashes[4]);
1512+
1513+
Ok(())
1514+
}
1515+
1516+
#[test]
1517+
#[cfg(not(feature = "force_hash_collisions"))]
1518+
fn test_sliced_run_array_only_hashes_needed_values() -> Result<()> {
1519+
let values_vec: Vec<i32> = (1..=1000).collect();
1520+
let run_ends_vec: Vec<i32> = (1..=1000).map(|i| i * 2).collect();
1521+
1522+
let values = Arc::new(Int32Array::from(values_vec));
1523+
let run_ends = Arc::new(Int32Array::from(run_ends_vec));
1524+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1525+
1526+
let array_ref: ArrayRef = Arc::clone(&array) as ArrayRef;
1527+
let sliced_array = array_ref.slice(100, 5);
1528+
1529+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1530+
let mut sliced_hashes = vec![0; sliced_array.len()];
1531+
create_hashes(&[sliced_array], &random_state, &mut sliced_hashes)?;
1532+
1533+
assert_eq!(sliced_hashes.len(), 5);
1534+
assert_eq!(sliced_hashes[0], sliced_hashes[1]);
1535+
assert_eq!(sliced_hashes[2], sliced_hashes[3]);
1536+
assert_ne!(sliced_hashes[0], sliced_hashes[2]);
1537+
1538+
Ok(())
1539+
}
1540+
1541+
#[test]
1542+
#[cfg(not(feature = "force_hash_collisions"))]
1543+
fn test_run_array_with_nulls() -> Result<()> {
1544+
let values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1545+
let run_ends = Arc::new(Int32Array::from(vec![2, 4, 6]));
1546+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
1547+
1548+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1549+
let mut hashes = vec![0; array.len()];
1550+
create_hashes(
1551+
&[Arc::clone(&array) as ArrayRef],
1552+
&random_state,
1553+
&mut hashes,
1554+
)?;
1555+
1556+
assert_eq!(hashes[0], hashes[1]);
1557+
assert_ne!(hashes[0], 0);
1558+
assert_eq!(hashes[2], hashes[3]);
1559+
assert_eq!(hashes[2], 0);
1560+
assert_eq!(hashes[4], hashes[5]);
1561+
assert_ne!(hashes[4], 0);
1562+
assert_ne!(hashes[0], hashes[4]);
1563+
1564+
Ok(())
1565+
}
1566+
1567+
#[test]
1568+
#[cfg(not(feature = "force_hash_collisions"))]
1569+
fn test_run_array_with_nulls_multicolumn() -> Result<()> {
1570+
let primitive_array = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1571+
let run_values = Arc::new(Int32Array::from(vec![Some(10), None, Some(20)]));
1572+
let run_ends = Arc::new(Int32Array::from(vec![1, 2, 3]));
1573+
let run_array =
1574+
Arc::new(RunArray::try_new(&run_ends, run_values.as_ref()).unwrap());
1575+
let second_col = Arc::new(Int32Array::from(vec![100, 200, 300]));
1576+
1577+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1578+
1579+
let mut primitive_hashes = vec![0; 3];
1580+
create_hashes(
1581+
&[
1582+
Arc::clone(&primitive_array) as ArrayRef,
1583+
Arc::clone(&second_col) as ArrayRef,
1584+
],
1585+
&random_state,
1586+
&mut primitive_hashes,
1587+
)?;
1588+
1589+
let mut run_hashes = vec![0; 3];
1590+
create_hashes(
1591+
&[
1592+
Arc::clone(&run_array) as ArrayRef,
1593+
Arc::clone(&second_col) as ArrayRef,
1594+
],
1595+
&random_state,
1596+
&mut run_hashes,
1597+
)?;
1598+
1599+
assert_eq!(primitive_hashes, run_hashes);
1600+
1601+
Ok(())
1602+
}
13241603
}

0 commit comments

Comments
 (0)