Skip to content

Commit 1e37fce

Browse files
committed
common: Add hashing support for REE arrays
1 parent b990987 commit 1e37fce

File tree

2 files changed

+138
-2
lines changed

2 files changed

+138
-2
lines changed

datafusion/common/src/cast.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@ use arrow::array::{
2727
Int16Array, Int8Array, LargeBinaryArray, LargeListViewArray, LargeStringArray,
2828
ListViewArray, StringViewArray, UInt16Array,
2929
};
30+
use arrow::datatypes::RunEndIndexType;
3031
use arrow::{
3132
array::{
3233
Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
3334
Decimal256Array, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray,
3435
Float32Array, Float64Array, GenericBinaryArray, GenericListArray,
3536
GenericStringArray, Int32Array, Int64Array, IntervalDayTimeArray,
3637
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeListArray, ListArray,
37-
MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, StringArray, StructArray,
38-
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
38+
MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, RunArray, StringArray,
39+
StructArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
3940
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
4041
TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array,
4142
UInt8Array, UnionArray,
@@ -334,3 +335,8 @@ pub fn as_list_view_array(array: &dyn Array) -> Result<&ListViewArray> {
334335
pub fn as_large_list_view_array(array: &dyn Array) -> Result<&LargeListViewArray> {
335336
Ok(downcast_value!(array, LargeListViewArray))
336337
}
338+
339+
// Downcast ArrayRef to RunArray
340+
pub fn as_run_array<R: RunEndIndexType>(array: &dyn Array) -> Result<&RunArray<R>> {
341+
Ok(downcast_value!(array, RunArray, R))
342+
}

datafusion/common/src/hash_utils.rs

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::cast::{
3333
use crate::error::Result;
3434
use crate::error::{_internal_datafusion_err, _internal_err};
3535
use std::cell::RefCell;
36+
use std::sync::Arc;
3637

3738
// Combines two hashes into one hash
3839
#[inline]
@@ -484,6 +485,40 @@ fn hash_fixed_list_array(
484485
Ok(())
485486
}
486487

488+
#[cfg(not(feature = "force_hash_collisions"))]
489+
fn hash_run_array<R: RunEndIndexType>(
490+
array: &RunArray<R>,
491+
random_state: &RandomState,
492+
hashes_buffer: &mut [u64],
493+
rehash: bool,
494+
) -> Result<()> {
495+
let values = array.values();
496+
let values_len = values.len();
497+
let mut values_hashes = vec![0u64; values_len];
498+
create_hashes(&[Arc::clone(values)], random_state, &mut values_hashes)?;
499+
500+
let run_ends = array.run_ends();
501+
let mut prev_run_end = 0;
502+
503+
for (i, value_hash) in values_hashes.iter().enumerate().take(values_len) {
504+
let run_end = run_ends.values()[i].as_usize();
505+
506+
if rehash {
507+
for hash in hashes_buffer.iter_mut().take(run_end).skip(prev_run_end) {
508+
*hash = combine_hashes(*value_hash, *hash);
509+
}
510+
} else {
511+
for hash in hashes_buffer.iter_mut().take(run_end).skip(prev_run_end) {
512+
*hash = *value_hash;
513+
}
514+
}
515+
516+
prev_run_end = run_end;
517+
}
518+
519+
Ok(())
520+
}
521+
487522
/// Internal helper function that hashes a single array and either initializes or combines
488523
/// the hash values in the buffer.
489524
#[cfg(not(feature = "force_hash_collisions"))]
@@ -535,6 +570,10 @@ fn hash_single_array(
535570
let array = as_union_array(array)?;
536571
hash_union_array(array, random_state, hashes_buffer)?;
537572
}
573+
DataType::RunEndEncoded(_, _) => downcast_run_array! {
574+
array => hash_run_array(array, random_state, hashes_buffer, rehash)?,
575+
_ => unreachable!()
576+
}
538577
_ => {
539578
// This is internal because we should have caught this before.
540579
return _internal_err!(
@@ -803,6 +842,97 @@ mod tests {
803842
create_hash_string!(string_view_array, StringArray);
804843
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);
805844

845+
#[test]
846+
#[cfg(not(feature = "force_hash_collisions"))]
847+
fn create_hashes_for_run_array() -> Result<()> {
848+
// Create values and run_ends for RunArray
849+
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
850+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
851+
852+
// Create the RunArray
853+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
854+
855+
// Create hashes
856+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
857+
let hashes_buff = &mut vec![0; array.len()];
858+
let hashes = create_hashes(
859+
&[Arc::clone(&array) as ArrayRef],
860+
&random_state,
861+
hashes_buff,
862+
)?;
863+
864+
// The length should be 7 (last run end)
865+
assert_eq!(hashes.len(), 7);
866+
867+
// Values at indices 0,1 should have the same hash (value 10)
868+
assert_eq!(hashes[0], hashes[1]);
869+
870+
// Values at indices 2,3,4 should have the same hash (value 20)
871+
assert_eq!(hashes[2], hashes[3]);
872+
assert_eq!(hashes[3], hashes[4]);
873+
874+
// Values at indices 5,6 should have the same hash (value 30)
875+
assert_eq!(hashes[5], hashes[6]);
876+
877+
// Hashes for different values should be different
878+
assert_ne!(hashes[0], hashes[2]);
879+
assert_ne!(hashes[2], hashes[5]);
880+
assert_ne!(hashes[0], hashes[5]);
881+
882+
Ok(())
883+
}
884+
885+
#[test]
886+
#[cfg(not(feature = "force_hash_collisions"))]
887+
fn create_multi_column_hash_with_run_array() -> Result<()> {
888+
// Create a regular Int32Array
889+
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7]));
890+
891+
// Create a RunArray with same logical length
892+
let values = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
893+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
894+
let run_array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
895+
896+
// Create hashes for single column (int array)
897+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
898+
let mut one_col_hashes = vec![0; int_array.len()];
899+
create_hashes(
900+
&[Arc::clone(&int_array) as ArrayRef],
901+
&random_state,
902+
&mut one_col_hashes,
903+
)?;
904+
905+
// Create hashes for both columns
906+
let mut two_col_hashes = vec![0; int_array.len()];
907+
create_hashes(
908+
&[
909+
Arc::clone(&int_array) as ArrayRef,
910+
Arc::clone(&run_array) as ArrayRef,
911+
],
912+
&random_state,
913+
&mut two_col_hashes,
914+
)?;
915+
916+
// Verify lengths
917+
assert_eq!(one_col_hashes.len(), 7);
918+
assert_eq!(two_col_hashes.len(), 7);
919+
920+
// Hashes should be different when including the Run array
921+
assert_ne!(one_col_hashes, two_col_hashes);
922+
923+
// Indices 0,1 should have the same Run value ("foo") so their rehash pattern should be consistent
924+
let diff_0_vs_1_one_col = one_col_hashes[0] != one_col_hashes[1];
925+
let diff_0_vs_1_two_col = two_col_hashes[0] != two_col_hashes[1];
926+
assert_eq!(diff_0_vs_1_one_col, diff_0_vs_1_two_col);
927+
928+
// Similarly for indices with the same Run value ("bar")
929+
let diff_2_vs_3_one_col = one_col_hashes[2] != one_col_hashes[3];
930+
let diff_2_vs_3_two_col = two_col_hashes[2] != two_col_hashes[3];
931+
assert_eq!(diff_2_vs_3_one_col, diff_2_vs_3_two_col);
932+
933+
Ok(())
934+
}
935+
806936
#[test]
807937
// Tests actual values of hashes, which are different if forcing collisions
808938
#[cfg(not(feature = "force_hash_collisions"))]

0 commit comments

Comments
 (0)