Skip to content

Commit 63cb800

Browse files
committed
common: Add hashing support for REE arrays
1 parent b990987 commit 63cb800

File tree

2 files changed

+133
-2
lines changed

2 files changed

+133
-2
lines changed

datafusion/common/src/cast.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@ use arrow::array::{
2727
Int16Array, Int8Array, LargeBinaryArray, LargeListViewArray, LargeStringArray,
2828
ListViewArray, StringViewArray, UInt16Array,
2929
};
30+
use arrow::datatypes::RunEndIndexType;
3031
use arrow::{
3132
array::{
3233
Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
3334
Decimal256Array, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray,
3435
Float32Array, Float64Array, GenericBinaryArray, GenericListArray,
3536
GenericStringArray, Int32Array, Int64Array, IntervalDayTimeArray,
3637
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeListArray, ListArray,
37-
MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, StringArray, StructArray,
38-
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
38+
MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, RunArray, StringArray,
39+
StructArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
3940
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
4041
TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array,
4142
UInt8Array, UnionArray,
@@ -334,3 +335,8 @@ pub fn as_list_view_array(array: &dyn Array) -> Result<&ListViewArray> {
334335
pub fn as_large_list_view_array(array: &dyn Array) -> Result<&LargeListViewArray> {
335336
Ok(downcast_value!(array, LargeListViewArray))
336337
}
338+
339+
// Downcast ArrayRef to RunArray
340+
pub fn as_run_array<R: RunEndIndexType>(array: &dyn Array) -> Result<&RunArray<R>> {
341+
Ok(downcast_value!(array, RunArray, R))
342+
}

datafusion/common/src/hash_utils.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::cast::{
3333
use crate::error::Result;
3434
use crate::error::{_internal_datafusion_err, _internal_err};
3535
use std::cell::RefCell;
36+
use std::sync::Arc;
3637

3738
// Combines two hashes into one hash
3839
#[inline]
@@ -484,6 +485,41 @@ fn hash_fixed_list_array(
484485
Ok(())
485486
}
486487

488+
#[cfg(not(feature = "force_hash_collisions"))]
489+
fn hash_run_array<R: RunEndIndexType>(
490+
array: &RunArray<R>,
491+
random_state: &RandomState,
492+
hashes_buffer: &mut [u64],
493+
rehash: bool,
494+
) -> Result<()> {
495+
let values = array.values();
496+
let values_len = values.len();
497+
let mut values_hashes = vec![0u64; values_len];
498+
create_hashes(&[Arc::clone(values)], random_state, &mut values_hashes)?;
499+
500+
let run_ends = array.run_ends();
501+
let mut prev_run_end = 0;
502+
503+
for i in 0..values_len {
504+
let run_end = run_ends.values()[i].as_usize();
505+
let value_hash = values_hashes[i];
506+
507+
if rehash {
508+
for pos in prev_run_end..run_end {
509+
hashes_buffer[pos] = combine_hashes(value_hash, hashes_buffer[pos]);
510+
}
511+
} else {
512+
for pos in prev_run_end..run_end {
513+
hashes_buffer[pos] = value_hash;
514+
}
515+
}
516+
517+
prev_run_end = run_end;
518+
}
519+
520+
Ok(())
521+
}
522+
487523
/// Internal helper function that hashes a single array and either initializes or combines
488524
/// the hash values in the buffer.
489525
#[cfg(not(feature = "force_hash_collisions"))]
@@ -535,6 +571,10 @@ fn hash_single_array(
535571
let array = as_union_array(array)?;
536572
hash_union_array(array, random_state, hashes_buffer)?;
537573
}
574+
DataType::RunEndEncoded(_, _) => downcast_run_array! {
575+
array => hash_run_array(array, random_state, hashes_buffer, rehash)?,
576+
_ => unreachable!()
577+
}
538578
_ => {
539579
// This is internal because we should have caught this before.
540580
return _internal_err!(
@@ -803,6 +843,91 @@ mod tests {
803843
create_hash_string!(string_view_array, StringArray);
804844
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);
805845

846+
#[test]
847+
#[cfg(not(feature = "force_hash_collisions"))]
848+
fn create_hashes_for_run_array() -> Result<()> {
849+
// Create values and run_ends for RunArray
850+
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
851+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
852+
853+
// Create the RunArray
854+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
855+
856+
// Create hashes
857+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
858+
let hashes_buff = &mut vec![0; array.len()];
859+
let hashes =
860+
create_hashes(&[array.clone() as ArrayRef], &random_state, hashes_buff)?;
861+
862+
// The length should be 7 (last run end)
863+
assert_eq!(hashes.len(), 7);
864+
865+
// Values at indices 0,1 should have the same hash (value 10)
866+
assert_eq!(hashes[0], hashes[1]);
867+
868+
// Values at indices 2,3,4 should have the same hash (value 20)
869+
assert_eq!(hashes[2], hashes[3]);
870+
assert_eq!(hashes[3], hashes[4]);
871+
872+
// Values at indices 5,6 should have the same hash (value 30)
873+
assert_eq!(hashes[5], hashes[6]);
874+
875+
// Hashes for different values should be different
876+
assert_ne!(hashes[0], hashes[2]);
877+
assert_ne!(hashes[2], hashes[5]);
878+
assert_ne!(hashes[0], hashes[5]);
879+
880+
Ok(())
881+
}
882+
883+
#[test]
884+
#[cfg(not(feature = "force_hash_collisions"))]
885+
fn create_multi_column_hash_with_run_array() -> Result<()> {
886+
// Create a regular Int32Array
887+
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7]));
888+
889+
// Create a RunArray with same logical length
890+
let values = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
891+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
892+
let run_array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
893+
894+
// Create hashes for single column (int array)
895+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
896+
let mut one_col_hashes = vec![0; int_array.len()];
897+
create_hashes(
898+
&[int_array.clone() as ArrayRef],
899+
&random_state,
900+
&mut one_col_hashes,
901+
)?;
902+
903+
// Create hashes for both columns
904+
let mut two_col_hashes = vec![0; int_array.len()];
905+
create_hashes(
906+
&[int_array.clone() as ArrayRef, run_array.clone() as ArrayRef],
907+
&random_state,
908+
&mut two_col_hashes,
909+
)?;
910+
911+
// Verify lengths
912+
assert_eq!(one_col_hashes.len(), 7);
913+
assert_eq!(two_col_hashes.len(), 7);
914+
915+
// Hashes should be different when including the Run array
916+
assert_ne!(one_col_hashes, two_col_hashes);
917+
918+
// Indices 0,1 should have the same Run value ("foo") so their rehash pattern should be consistent
919+
let diff_0_vs_1_one_col = one_col_hashes[0] != one_col_hashes[1];
920+
let diff_0_vs_1_two_col = two_col_hashes[0] != two_col_hashes[1];
921+
assert_eq!(diff_0_vs_1_one_col, diff_0_vs_1_two_col);
922+
923+
// Similarly for indices with the same Run value ("bar")
924+
let diff_2_vs_3_one_col = one_col_hashes[2] != one_col_hashes[3];
925+
let diff_2_vs_3_two_col = two_col_hashes[2] != two_col_hashes[3];
926+
assert_eq!(diff_2_vs_3_one_col, diff_2_vs_3_two_col);
927+
928+
Ok(())
929+
}
930+
806931
#[test]
807932
// Tests actual values of hashes, which are different if forcing collisions
808933
#[cfg(not(feature = "force_hash_collisions"))]

0 commit comments

Comments
 (0)