Skip to content

Commit 2300fb6

Browse files
committed
Add hashing of REE arrays and add REE aggregation example
1 parent b990987 commit 2300fb6

File tree

3 files changed

+229
-3
lines changed

3 files changed

+229
-3
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use arrow::array::{Array, Int32Array, RunArray, StringViewArray};
2+
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
3+
use arrow::record_batch::RecordBatch;
4+
use datafusion::datasource::MemTable;
5+
use datafusion::prelude::*;
6+
use std::sync::Arc;
7+
8+
#[tokio::main]
9+
async fn main() -> Result<(), datafusion::error::DataFusionError> {
10+
// Create a new DataFusion context
11+
let ctx = SessionContext::new();
12+
13+
// First, let's create our data
14+
// We'll have temperature readings where multiple consecutive readings come from the same sensor
15+
16+
// Temperature values (not run-length encoded)
17+
// This represents all temperature readings in sequence
18+
let temperatures = Int32Array::from(vec![
19+
22, 23, 24, 25, 22, 21, 20, 21, 22, 23, 24, 25, 26, 27, 28,
20+
]);
21+
22+
// Create the string values for sensor IDs
23+
let sensor_id_values =
24+
StringViewArray::from(vec!["sensor_A", "sensor_B", "sensor_C", "sensor_D"]);
25+
26+
// Create the run ends array (positions where each run ends)
27+
let sensor_id_run_ends = Int32Array::from(vec![4, 7, 12, 15]);
28+
29+
// Create RunArray for sensor IDs with Int32Type as run end type
30+
let sensor_id_ree =
31+
RunArray::<Int32Type>::try_new(&sensor_id_run_ends, &sensor_id_values)
32+
.expect("Failed to create sensor ID RunArray");
33+
34+
// Get the exact data type of the RunArray for the schema
35+
let sensor_id_type = sensor_id_ree.data_type().clone();
36+
37+
// Create schema
38+
let schema = Arc::new(Schema::new(vec![
39+
Field::new("sensor_id", sensor_id_type, false),
40+
Field::new("temperature", DataType::Int32, false),
41+
]));
42+
43+
// Create record batch
44+
let batch = RecordBatch::try_new(
45+
schema.clone(),
46+
vec![Arc::new(sensor_id_ree), Arc::new(temperatures)],
47+
)?;
48+
49+
// Register as a table
50+
let provider = MemTable::try_new(schema, vec![vec![batch]])?;
51+
ctx.register_table("sensor_readings", Arc::new(provider))?;
52+
53+
// Run aggregation query
54+
// Group by sensor ID and calculate statistics
55+
let sql = "
56+
SELECT
57+
sensor_id,
58+
AVG(temperature) AS avg_temp,
59+
MIN(temperature) AS min_temp,
60+
MAX(temperature) AS max_temp,
61+
COUNT(temperature) AS reading_count
62+
FROM sensor_readings
63+
GROUP BY sensor_id
64+
ORDER BY sensor_id
65+
";
66+
67+
let results = ctx.sql(sql).await?.collect().await?;
68+
for batch in results {
69+
println!("{:?}", batch);
70+
}
71+
72+
Ok(())
73+
}

datafusion/common/src/cast.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@ use arrow::array::{
2727
Int16Array, Int8Array, LargeBinaryArray, LargeListViewArray, LargeStringArray,
2828
ListViewArray, StringViewArray, UInt16Array,
2929
};
30+
use arrow::datatypes::RunEndIndexType;
3031
use arrow::{
3132
array::{
3233
Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array,
3334
Decimal256Array, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray,
3435
Float32Array, Float64Array, GenericBinaryArray, GenericListArray,
3536
GenericStringArray, Int32Array, Int64Array, IntervalDayTimeArray,
3637
IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeListArray, ListArray,
37-
MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, StringArray, StructArray,
38-
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
38+
MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, RunArray, StringArray,
39+
StructArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
3940
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
4041
TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array,
4142
UInt8Array, UnionArray,
@@ -334,3 +335,8 @@ pub fn as_list_view_array(array: &dyn Array) -> Result<&ListViewArray> {
334335
pub fn as_large_list_view_array(array: &dyn Array) -> Result<&LargeListViewArray> {
335336
Ok(downcast_value!(array, LargeListViewArray))
336337
}
338+
339+
// Downcast ArrayRef to RunArray
340+
pub fn as_run_array<R: RunEndIndexType>(array: &dyn Array) -> Result<&RunArray<R>> {
341+
Ok(downcast_value!(array, RunArray, R))
342+
}

datafusion/common/src/hash_utils.rs

Lines changed: 148 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@ use arrow::{downcast_dictionary_array, downcast_primitive_array};
2828
use crate::cast::{
2929
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
3030
as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
31-
as_string_array, as_string_view_array, as_struct_array, as_union_array,
31+
as_run_array, as_string_array, as_string_view_array, as_struct_array, as_union_array,
3232
};
3333
use crate::error::Result;
3434
use crate::error::{_internal_datafusion_err, _internal_err};
3535
use std::cell::RefCell;
36+
use std::sync::Arc;
3637

3738
// Combines two hashes into one hash
3839
#[inline]
@@ -484,6 +485,50 @@ fn hash_fixed_list_array(
484485
Ok(())
485486
}
486487

488+
#[cfg(not(feature = "force_hash_collisions"))]
489+
fn hash_run_array<R: RunEndIndexType>(
490+
array: &RunArray<R>,
491+
random_state: &RandomState,
492+
hashes_buffer: &mut [u64],
493+
rehash: bool,
494+
) -> Result<()> {
495+
// Get the physical values and create a clone for hashing
496+
let values = array.values();
497+
let values_len = values.len();
498+
let mut values_hashes = vec![0u64; values_len];
499+
create_hashes(&[Arc::clone(values)], random_state, &mut values_hashes)?;
500+
501+
// Get run ends buffer
502+
let run_ends = array.run_ends();
503+
504+
// Previous run end (starting at 0 for the first run)
505+
let mut prev_run_end = 0;
506+
507+
// Process each run - the number of runs equals the number of physical values
508+
for i in 0..values_len {
509+
// Get the run end for this run by accessing the run_ends buffer at the corresponding index
510+
let run_end = run_ends.values()[i].as_usize();
511+
// Apply the hash of the value at index i to all positions in this run
512+
let value_hash = values_hashes[i];
513+
514+
// Apply this hash to all positions in the current run
515+
if rehash {
516+
for pos in prev_run_end..run_end {
517+
hashes_buffer[pos] = combine_hashes(value_hash, hashes_buffer[pos]);
518+
}
519+
} else {
520+
for pos in prev_run_end..run_end {
521+
hashes_buffer[pos] = value_hash;
522+
}
523+
}
524+
525+
// Update the previous run end
526+
prev_run_end = run_end;
527+
}
528+
529+
Ok(())
530+
}
531+
487532
/// Internal helper function that hashes a single array and either initializes or combines
488533
/// the hash values in the buffer.
489534
#[cfg(not(feature = "force_hash_collisions"))]
@@ -535,6 +580,23 @@ fn hash_single_array(
535580
let array = as_union_array(array)?;
536581
hash_union_array(array, random_state, hashes_buffer)?;
537582
}
583+
DataType::RunEndEncoded(run_ends_type, _) => {
584+
match run_ends_type.data_type() {
585+
DataType::Int16 => {
586+
let array = as_run_array::<Int16Type>(array)?;
587+
hash_run_array(array, random_state, hashes_buffer, rehash)?;
588+
}
589+
DataType::Int32 => {
590+
let array = as_run_array::<Int32Type>(array)?;
591+
hash_run_array(array, random_state, hashes_buffer, rehash)?;
592+
}
593+
DataType::Int64 => {
594+
let array = as_run_array::<Int64Type>(array)?;
595+
hash_run_array(array, random_state, hashes_buffer, rehash)?;
596+
}
597+
_ => unreachable!("RunEndEncoded must have Int16, Int32, or Int64 run ends")
598+
}
599+
}
538600
_ => {
539601
// This is internal because we should have caught this before.
540602
return _internal_err!(
@@ -803,6 +865,91 @@ mod tests {
803865
create_hash_string!(string_view_array, StringArray);
804866
create_hash_string!(dict_string_array, DictionaryArray<Int8Type>);
805867

868+
#[test]
869+
#[cfg(not(feature = "force_hash_collisions"))]
870+
fn create_hashes_for_run_array() -> Result<()> {
871+
// Create values and run_ends for RunArray
872+
let values = Arc::new(Int32Array::from(vec![10, 20, 30]));
873+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
874+
875+
// Create the RunArray
876+
let array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
877+
878+
// Create hashes
879+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
880+
let hashes_buff = &mut vec![0; array.len()];
881+
let hashes =
882+
create_hashes(&[array.clone() as ArrayRef], &random_state, hashes_buff)?;
883+
884+
// The length should be 7 (last run end)
885+
assert_eq!(hashes.len(), 7);
886+
887+
// Values at indices 0,1 should have the same hash (value 10)
888+
assert_eq!(hashes[0], hashes[1]);
889+
890+
// Values at indices 2,3,4 should have the same hash (value 20)
891+
assert_eq!(hashes[2], hashes[3]);
892+
assert_eq!(hashes[3], hashes[4]);
893+
894+
// Values at indices 5,6 should have the same hash (value 30)
895+
assert_eq!(hashes[5], hashes[6]);
896+
897+
// Hashes for different values should be different
898+
assert_ne!(hashes[0], hashes[2]);
899+
assert_ne!(hashes[2], hashes[5]);
900+
assert_ne!(hashes[0], hashes[5]);
901+
902+
Ok(())
903+
}
904+
905+
#[test]
906+
#[cfg(not(feature = "force_hash_collisions"))]
907+
fn create_multi_column_hash_with_run_array() -> Result<()> {
908+
// Create a regular Int32Array
909+
let int_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7]));
910+
911+
// Create a RunArray with same logical length
912+
let values = Arc::new(StringArray::from(vec!["foo", "bar", "baz"]));
913+
let run_ends = Arc::new(Int32Array::from(vec![2, 5, 7]));
914+
let run_array = Arc::new(RunArray::try_new(&run_ends, values.as_ref()).unwrap());
915+
916+
// Create hashes for single column (int array)
917+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
918+
let mut one_col_hashes = vec![0; int_array.len()];
919+
create_hashes(
920+
&[int_array.clone() as ArrayRef],
921+
&random_state,
922+
&mut one_col_hashes,
923+
)?;
924+
925+
// Create hashes for both columns
926+
let mut two_col_hashes = vec![0; int_array.len()];
927+
create_hashes(
928+
&[int_array.clone() as ArrayRef, run_array.clone() as ArrayRef],
929+
&random_state,
930+
&mut two_col_hashes,
931+
)?;
932+
933+
// Verify lengths
934+
assert_eq!(one_col_hashes.len(), 7);
935+
assert_eq!(two_col_hashes.len(), 7);
936+
937+
// Hashes should be different when including the Run array
938+
assert_ne!(one_col_hashes, two_col_hashes);
939+
940+
// Indices 0,1 should have the same Run value ("foo") so their rehash pattern should be consistent
941+
let diff_0_vs_1_one_col = one_col_hashes[0] != one_col_hashes[1];
942+
let diff_0_vs_1_two_col = two_col_hashes[0] != two_col_hashes[1];
943+
assert_eq!(diff_0_vs_1_one_col, diff_0_vs_1_two_col);
944+
945+
// Similarly for indices with the same Run value ("bar")
946+
let diff_2_vs_3_one_col = one_col_hashes[2] != one_col_hashes[3];
947+
let diff_2_vs_3_two_col = two_col_hashes[2] != two_col_hashes[3];
948+
assert_eq!(diff_2_vs_3_one_col, diff_2_vs_3_two_col);
949+
950+
Ok(())
951+
}
952+
806953
#[test]
807954
// Tests actual values of hashes, which are different if forcing collisions
808955
#[cfg(not(feature = "force_hash_collisions"))]

0 commit comments

Comments
 (0)