Skip to content

Commit b220067

Browse files
committed
common: Add hashing support for REE arrays
1 parent b990987 commit b220067

File tree

2 files changed

+156
-3
lines changed

2 files changed

+156
-3
lines changed

datafusion/common/src/cast.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@ use arrow::array::{
2727
Int16Array, Int8Array, LargeBinaryArray, LargeListViewArray, LargeStringArray,
2828
ListViewArray, StringViewArray, UInt16Array,
2929
};
30+
use arrow::datatypes::RunEndIndexType;
3031
use arrow::{
3132
array::{
3233
Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
3334
Decimal256Array, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray,
3435
Float32Array, Float64Array, GenericBinaryArray, GenericListArray,
3536
GenericStringArray, Int32Array, Int64Array, IntervalDayTimeArray,
3637
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeListArray, ListArray,
37-
MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, StringArray, StructArray,
38-
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
38+
MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, RunArray, StringArray,
39+
StructArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
3940
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
4041
TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array,
4142
UInt8Array, UnionArray,
@@ -334,3 +335,8 @@ pub fn as_list_view_array(array: &dyn Array) -> Result<&ListViewArray> {
334335
pub fn as_large_list_view_array(array: &dyn Array) -> Result<&LargeListViewArray> {
335336
Ok(downcast_value!(array, LargeListViewArray))
336337
}
338+
339+
// Downcast ArrayRef to RunArray
340+
pub fn as_run_array<R: RunEndIndexType>(array: &dyn Array) -> Result<&RunArray<R>> {
341+
Ok(downcast_value!(array, RunArray, R))
342+
}

datafusion/common/src/hash_utils.rs

Lines changed: 148 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ use arrow::{downcast_dictionary_array, downcast_primitive_array};
2828
use crate::cast::{
2929
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
3030
as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
31-
as_string_array, as_string_view_array, as_struct_array, as_union_array,
31+
as_run_array, as_string_array, as_string_view_array, as_struct_array, as_union_array,
3232
};
3333
use crate::error::Result;
3434
use crate::error::{_internal_datafusion_err, _internal_err};
3535
use std::cell::RefCell;
36+
use std::sync::Arc;
3637

3738
// Combines two hashes into one hash
3839
#[inline]
@@ -484,6 +485,50 @@ fn hash_fixed_list_array(
484485
Ok(())
485486
}
486487

488+
#[cfg(not(feature = "force_hash_collisions"))]
489+
fn hash_run_array<R: RunEndIndexType>(
490+
array: &RunArray<R>,
491+
random_state: &RandomState,
492+
hashes_buffer: &mut [u64],
493+
rehash: bool,
494+
) -> Result<()> {
495+
// Get the physical values and create a clone for hashing
496+
let values = array.values();
497+
let values_len = values.len();
498+
let mut values_hashes = vec![0u64; values_len];
499+
create_hashes(&[Arc::clone(values)], random_state, &mut values_hashes)?;
500+
501+
// Get run ends buffer
502+
let run_ends = array.run_ends();
503+
504+
// Previous run end (starting at 0 for the first run)
505+
let mut prev_run_end = 0;
506+
507+
// Process each run - the number of runs equals the number of physical values
508+
for i in 0..values_len {
509+
// Get the run end for this run by accessing the run_ends buffer at the corresponding index
510+
let run_end = run_ends.values()[i].as_usize();
511+
// Apply the hash of the value at index i to all positions in this run
512+
let value_hash = values_hashes[i];
513+
514+
// Apply this hash to all positions in the current run
515+
if rehash {
516+
for pos in prev_run_end..run_end {
517+
hashes_buffer[pos] = combine_hashes(value_hash, hashes_buffer[pos]);
518+
}
519+
} else {
520+
for pos in prev_run_end..run_end {
521+
hashes_buffer[pos] = value_hash;
522+
}
523+
}
524+
525+
// Update the previous run end
526+
prev_run_end = run_end;
527+
}
528+
529+
Ok(())
530+
}
531+
487532
/// Internal helper function that hashes a single array and either initializes or combines
488533
/// the hash values in the buffer.
489534
#[cfg(not(feature = "force_hash_collisions"))]
@@ -535,6 +580,23 @@ fn hash_single_array(
535580
let array = as_union_array(array)?;
536581
hash_union_array(array, random_state, hashes_buffer)?;
537582
}
583+
DataType::RunEndEncoded(run_ends_type, _) => {
584+
match run_ends_type.data_type() {
585+
DataType::Int16 => {
586+
let array = as_run_array::<Int16Type>(array)?;
587+
hash_run_array(array, random_state, hashes_buffer, rehash)?;
588+
}
589+
DataType::Int32 => {
590+
let array = as_run_array::<Int32Type>(array)?;
591+
hash_run_array(array, random_state, hashes_buffer, rehash)?;
592+
}
593+
DataType::Int64 => {
594+
let array = as_run_array::<Int64Type>(array)?;
595+
hash_run_array(array, random_state, hashes_buffer, rehash)?;
596+
}
597+
_ => unreachable!("RunEndEncoded must have Int16, Int32, or Int64 run ends")
598+
}
599+
}
538600
_ => {
539601
// This is internal because we should have caught this before.
540602
return _internal_err!(
@@ -803,6 +865,91 @@ mod tests {
803865
create_hash_string!(string_view_array, StringArray);
804866
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);
805867

868+
#[test]
869+
#[cfg(not(feature = "force_hash_collisions"))]
870+
fn create_hashes_for_run_array() -> Result<()> {
871+
// Create values and run_ends for RunArray
872+
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
873+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
874+
875+
// Create the RunArray
876+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
877+
878+
// Create hashes
879+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
880+
let hashes_buff = &mut vec![0; array.len()];
881+
let hashes =
882+
create_hashes(&[array.clone() as ArrayRef], &random_state, hashes_buff)?;
883+
884+
// The length should be 7 (last run end)
885+
assert_eq!(hashes.len(), 7);
886+
887+
// Values at indices 0,1 should have the same hash (value 10)
888+
assert_eq!(hashes[0], hashes[1]);
889+
890+
// Values at indices 2,3,4 should have the same hash (value 20)
891+
assert_eq!(hashes[2], hashes[3]);
892+
assert_eq!(hashes[3], hashes[4]);
893+
894+
// Values at indices 5,6 should have the same hash (value 30)
895+
assert_eq!(hashes[5], hashes[6]);
896+
897+
// Hashes for different values should be different
898+
assert_ne!(hashes[0], hashes[2]);
899+
assert_ne!(hashes[2], hashes[5]);
900+
assert_ne!(hashes[0], hashes[5]);
901+
902+
Ok(())
903+
}
904+
905+
#[test]
906+
#[cfg(not(feature = "force_hash_collisions"))]
907+
fn create_multi_column_hash_with_run_array() -> Result<()> {
908+
// Create a regular Int32Array
909+
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7]));
910+
911+
// Create a RunArray with same logical length
912+
let values = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
913+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
914+
let run_array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
915+
916+
// Create hashes for single column (int array)
917+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
918+
let mut one_col_hashes = vec![0; int_array.len()];
919+
create_hashes(
920+
&[int_array.clone() as ArrayRef],
921+
&random_state,
922+
&mut one_col_hashes,
923+
)?;
924+
925+
// Create hashes for both columns
926+
let mut two_col_hashes = vec![0; int_array.len()];
927+
create_hashes(
928+
&[int_array.clone() as ArrayRef, run_array.clone() as ArrayRef],
929+
&random_state,
930+
&mut two_col_hashes,
931+
)?;
932+
933+
// Verify lengths
934+
assert_eq!(one_col_hashes.len(), 7);
935+
assert_eq!(two_col_hashes.len(), 7);
936+
937+
// Hashes should be different when including the Run array
938+
assert_ne!(one_col_hashes, two_col_hashes);
939+
940+
// Indices 0,1 should have the same Run value ("foo") so their rehash pattern should be consistent
941+
let diff_0_vs_1_one_col = one_col_hashes[0] != one_col_hashes[1];
942+
let diff_0_vs_1_two_col = two_col_hashes[0] != two_col_hashes[1];
943+
assert_eq!(diff_0_vs_1_one_col, diff_0_vs_1_two_col);
944+
945+
// Similarly for indices with the same Run value ("bar")
946+
let diff_2_vs_3_one_col = one_col_hashes[2] != one_col_hashes[3];
947+
let diff_2_vs_3_two_col = two_col_hashes[2] != two_col_hashes[3];
948+
assert_eq!(diff_2_vs_3_one_col, diff_2_vs_3_two_col);
949+
950+
Ok(())
951+
}
952+
806953
#[test]
807954
// Tests actual values of hashes, which are different if forcing collisions
808955
#[cfg(not(feature = "force_hash_collisions"))]

0 commit comments

Comments
 (0)