Skip to content

Commit add6fa6

Browse files
authored
feat: add variable packed struct support (#5003)
Close #2862 This PR will add variable packed struct support in file version 2.2 --- **This PR was primarily authored with Codex using GPT-5-Codex and then hand-reviewed by me. I AM responsible for every change made in this PR. I aimed to keep it aligned with our goals, though I may have missed minor issues. Please flag anything that feels off, I'll fix it quickly.** --------- Signed-off-by: Xuanwo <github@xuanwo.io>
1 parent d16a8b9 commit add6fa6

File tree

7 files changed

+1101
-66
lines changed

7 files changed

+1101
-66
lines changed

python/python/tests/test_arrow.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
PandasBFloat16Array,
2121
bfloat16_array,
2222
)
23+
from lance.file import LanceFileReader
2324
from ml_dtypes import bfloat16
2425

2526

@@ -192,6 +193,46 @@ def test_roundtrip_take_ext_types(tmp_path: Path):
192193
]
193194

194195

196+
def test_struct_variable_children_roundtrip(tmp_path: Path):
197+
struct_type = pa.struct(
198+
[
199+
pa.field("id", pa.int32()),
200+
pa.field("text", pa.utf8()),
201+
pa.field("payload", pa.binary()),
202+
]
203+
)
204+
struct_field = pa.field(
205+
"record",
206+
struct_type,
207+
metadata={"lance-encoding:packed": "true"},
208+
)
209+
struct_array = pa.StructArray.from_arrays(
210+
[
211+
pa.array([1, 2, 3, 4], pa.int32()),
212+
pa.array(["alpha", "beta", "gamma", "delta"], pa.utf8()),
213+
pa.array(
214+
[b"\x01\x02", b"\xff", b"\x03\x04\x05", b"\x10"],
215+
type=pa.binary(),
216+
),
217+
],
218+
type=struct_type,
219+
)
220+
table = pa.Table.from_arrays([struct_array], schema=pa.schema([struct_field]))
221+
222+
dataset_uri = tmp_path / "struct.lance"
223+
ds = lance.write_dataset(table, dataset_uri, data_storage_version="2.2")
224+
225+
round_trip = ds.to_table()
226+
assert round_trip.schema == table.schema
227+
assert round_trip.equals(table)
228+
assert round_trip.to_pylist() == table.to_pylist()
229+
230+
data_file = next((dataset_uri / "data").glob("*.lance"))
231+
metadata = LanceFileReader(str(data_file)).metadata()
232+
encodings = [page.encoding for page in metadata.columns[0].pages]
233+
assert any("VariablePackedStruct" in encoding for encoding in encodings)
234+
235+
195236
@pytest.fixture
196237
def png_uris():
197238
local_path = "file://" + os.path.join(os.path.dirname(__file__), "images/1.png")

python/python/tests/test_dataset.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1692,18 +1692,6 @@ def test_load_scanner_from_fragments(tmp_path: Path):
16921692
assert scanner.to_table().num_rows == 2 * 100
16931693

16941694

1695-
def test_write_unstable_data_version(tmp_path: Path, capfd):
1696-
# Note: this test will only work if no earlier test attempts
1697-
# to use an unstable version. If we need that later we can find a way to
1698-
# run this test in a separate process (pytest-xdist?)
1699-
tab = pa.table({"a": range(100), "b": range(100)})
1700-
ds = lance.write_dataset(
1701-
tab, tmp_path / "dataset", mode="append", data_storage_version="next"
1702-
)
1703-
assert ds.to_table() == tab
1704-
assert "You have requested an unstable format version" in capfd.readouterr().err
1705-
1706-
17071695
def test_merge_data(tmp_path: Path):
17081696
tab = pa.table({"a": range(100), "b": range(100)})
17091697
lance.write_dataset(tab, tmp_path / "dataset", mode="append")

rust/lance-encoding/src/compression.rs

Lines changed: 89 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@ use crate::{
4646
},
4747
general::{GeneralMiniBlockCompressor, GeneralMiniBlockDecompressor},
4848
packed::{
49-
PackedStructFixedWidthMiniBlockDecompressor, PackedStructFixedWidthMiniBlockEncoder,
49+
PackedStructFixedWidthMiniBlockDecompressor,
50+
PackedStructFixedWidthMiniBlockEncoder, PackedStructVariablePerValueDecompressor,
51+
PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
52+
VariablePackedStructFieldKind,
5053
},
5154
rle::{RleMiniBlockDecompressor, RleMiniBlockEncoder},
5255
value::{ValueDecompressor, ValueEncoder},
@@ -64,7 +67,7 @@ use arrow_array::{cast::AsArray, types::UInt64Type};
6467
use fsst::fsst::{FSST_LEAST_INPUT_MAX_LENGTH, FSST_LEAST_INPUT_SIZE};
6568
use lance_core::{datatypes::Field, error::LanceOptionExt, Error, Result};
6669
use snafu::location;
67-
use std::str::FromStr;
70+
use std::{str::FromStr, sync::Arc};
6871

6972
/// Default threshold for RLE compression selection.
7073
/// RLE is chosen when the run count is less than this fraction of total values.
@@ -128,7 +131,7 @@ pub trait CompressionStrategy: Send + Sync + std::fmt::Debug {
128131
) -> Result<Box<dyn MiniBlockCompressor>>;
129132
}
130133

131-
#[derive(Debug, Default)]
134+
#[derive(Debug, Default, Clone)]
132135
pub struct DefaultCompressionStrategy {
133136
/// User-configured compression parameters
134137
params: CompressionParams,
@@ -297,6 +300,12 @@ impl DefaultCompressionStrategy {
297300
}
298301
}
299302

303+
/// Override the file version used to make compression decisions
304+
pub fn with_version(mut self, version: LanceFileVersion) -> Self {
305+
self.version = version;
306+
self
307+
}
308+
300309
/// Parse compression parameters from field metadata
301310
fn parse_field_metadata(field: &Field) -> CompressionFieldParams {
302311
let mut params = CompressionFieldParams::default();
@@ -431,12 +440,11 @@ impl CompressionStrategy for DefaultCompressionStrategy {
431440
DataBlock::Struct(struct_data_block) => {
432441
// this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
433442
// just being cautious here.
434-
if struct_data_block
435-
.children
436-
.iter()
437-
.any(|child| !matches!(child, DataBlock::FixedWidth(_)))
438-
{
439-
panic!("packed struct encoding currently only supports fixed-width fields.")
443+
if struct_data_block.has_variable_width_child() {
444+
return Err(Error::invalid_input(
445+
"Packed struct mini-block encoding supports only fixed-width children",
446+
location!(),
447+
));
440448
}
441449
Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()))
442450
}
@@ -471,6 +479,32 @@ impl CompressionStrategy for DefaultCompressionStrategy {
471479
match data {
472480
DataBlock::FixedWidth(_) => Ok(Box::new(ValueEncoder::default())),
473481
DataBlock::FixedSizeList(_) => Ok(Box::new(ValueEncoder::default())),
482+
DataBlock::Struct(struct_block) => {
483+
if field.children.len() != struct_block.children.len() {
484+
return Err(Error::invalid_input(
485+
"Struct field metadata does not match data block children",
486+
location!(),
487+
));
488+
}
489+
let has_variable_child = struct_block.has_variable_width_child();
490+
if has_variable_child {
491+
if self.version < LanceFileVersion::V2_2 {
492+
return Err(Error::NotSupported {
493+
source: "Variable packed struct encoding requires Lance file version 2.2 or later".into(),
494+
location: location!(),
495+
});
496+
}
497+
Ok(Box::new(PackedStructVariablePerValueEncoder::new(
498+
self.clone(),
499+
field.children.clone(),
500+
)))
501+
} else {
502+
Err(Error::invalid_input(
503+
"Packed struct per-value compression should not be used for fixed-width-only structs",
504+
location!(),
505+
))
506+
}
507+
}
474508
DataBlock::VariableWidth(variable_width) => {
475509
let max_len = variable_width.expect_single_stat::<UInt64Type>(Stat::MaxLength);
476510
let data_size = variable_width.expect_single_stat::<UInt64Type>(Stat::DataSize);
@@ -784,6 +818,52 @@ impl DecompressionStrategy for DefaultDecompressionStrategy {
784818
general.compression.as_ref().expect_ok()?.scheme(),
785819
)?))
786820
}
821+
Compression::VariablePackedStruct(description) => {
822+
let mut fields = Vec::with_capacity(description.fields.len());
823+
for field in &description.fields {
824+
let value_encoding = field.value.as_ref().ok_or_else(|| {
825+
Error::invalid_input(
826+
"VariablePackedStruct field is missing value encoding",
827+
location!(),
828+
)
829+
})?;
830+
let decoder = match field.layout.as_ref().ok_or_else(|| {
831+
Error::invalid_input(
832+
"VariablePackedStruct field is missing layout details",
833+
location!(),
834+
)
835+
})? {
836+
crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerValue(
837+
bits_per_value,
838+
) => {
839+
let decompressor =
840+
self.create_fixed_per_value_decompressor(value_encoding)?;
841+
VariablePackedStructFieldDecoder {
842+
kind: VariablePackedStructFieldKind::Fixed {
843+
bits_per_value: *bits_per_value,
844+
decompressor: Arc::from(decompressor),
845+
},
846+
}
847+
}
848+
crate::format::pb21::variable_packed_struct::field_encoding::Layout::BitsPerLength(
849+
bits_per_length,
850+
) => {
851+
let decompressor =
852+
self.create_variable_per_value_decompressor(value_encoding)?;
853+
VariablePackedStructFieldDecoder {
854+
kind: VariablePackedStructFieldKind::Variable {
855+
bits_per_length: *bits_per_length,
856+
decompressor: Arc::from(decompressor),
857+
},
858+
}
859+
}
860+
};
861+
fields.push(decoder);
862+
}
863+
Ok(Box::new(PackedStructVariablePerValueDecompressor::new(
864+
fields,
865+
)))
866+
}
787867
_ => todo!("variable-per-value decompressor for {:?}", description),
788868
}
789869
}

rust/lance-encoding/src/data.rs

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -345,23 +345,7 @@ struct StructDataBlockBuilder {
345345
}
346346

347347
impl StructDataBlockBuilder {
348-
// Currently only Struct with fixed-width fields are supported.
349-
// And the assumption that all fields have `bits_per_value % 8 == 0` is made here.
350-
fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
351-
let mut children = vec![];
352-
353-
debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));
354-
355-
let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
356-
let bytes_per_row = bytes_per_row as u64;
357-
358-
for bits_per_value in bits_per_values.iter() {
359-
let this_estimated_size_bytes =
360-
estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
361-
let child =
362-
FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
363-
children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
364-
}
348+
fn new(children: Vec<Box<dyn DataBlockBuilderImpl>>) -> Self {
365349
Self { children }
366350
}
367351
}
@@ -697,6 +681,12 @@ impl StructDataBlock {
697681
.collect()
698682
}
699683

684+
pub fn has_variable_width_child(&self) -> bool {
685+
self.children
686+
.iter()
687+
.any(|child| !matches!(child, DataBlock::FixedWidth(_)))
688+
}
689+
700690
pub fn data_size(&self) -> u64 {
701691
self.children
702692
.iter()
@@ -1059,16 +1049,18 @@ impl DataBlock {
10591049
))
10601050
}
10611051
Self::Struct(struct_data_block) => {
1062-
let mut bits_per_values = vec![];
1063-
for child in struct_data_block.children.iter() {
1064-
let child = child.as_fixed_width_ref().
1065-
expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
1066-
bits_per_values.push(child.bits_per_value as u32);
1067-
}
1068-
Box::new(StructDataBlockBuilder::new(
1069-
bits_per_values,
1070-
estimated_size_bytes,
1071-
))
1052+
let num_children = struct_data_block.children.len();
1053+
let per_child_estimate = if num_children == 0 {
1054+
0
1055+
} else {
1056+
estimated_size_bytes / num_children as u64
1057+
};
1058+
let child_builders = struct_data_block
1059+
.children
1060+
.iter()
1061+
.map(|child| child.make_builder(per_child_estimate))
1062+
.collect();
1063+
Box::new(StructDataBlockBuilder::new(child_builders))
10721064
}
10731065
Self::AllNull(_) => Box::new(AllNullDataBlockBuilder::default()),
10741066
_ => todo!("make_builder for {:?}", self),

rust/lance-encoding/src/encoder.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEnco
278278
LanceFileVersion::V2_0 => Box::new(
279279
crate::previous::encoder::CoreFieldEncodingStrategy::new(version),
280280
),
281-
_ => Box::new(StructuralEncodingStrategy::default()),
281+
_ => Box::new(StructuralEncodingStrategy::with_version(version)),
282282
}
283283
}
284284

@@ -293,7 +293,8 @@ pub fn default_encoding_strategy_with_params(
293293
location!(),
294294
)),
295295
_ => {
296-
let compression_strategy = Arc::new(DefaultCompressionStrategy::with_params(params));
296+
let compression_strategy =
297+
Arc::new(DefaultCompressionStrategy::with_params(params).with_version(version));
297298
Ok(Box::new(StructuralEncodingStrategy {
298299
compression_strategy,
299300
version,
@@ -322,6 +323,13 @@ impl Default for StructuralEncodingStrategy {
322323
}
323324

324325
impl StructuralEncodingStrategy {
326+
pub fn with_version(version: LanceFileVersion) -> Self {
327+
Self {
328+
compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)),
329+
version,
330+
}
331+
}
332+
325333
fn is_primitive_type(data_type: &DataType) -> bool {
326334
matches!(
327335
data_type,

rust/lance-encoding/src/encodings/logical/primitive.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1238,9 +1238,17 @@ impl MiniBlockScheduler {
12381238
dictionary_data_alignment: 16,
12391239
num_dictionary_items,
12401240
}),
1241-
_ => {
1242-
unreachable!("Currently only encodings `BinaryBlock` and `Flat` used for encoding MiniBlock dictionary.")
1243-
}
1241+
Compression::General(_) => Some(MiniBlockSchedulerDictionary {
1242+
dictionary_decompressor: decompressors
1243+
.create_block_decompressor(dictionary_encoding)?
1244+
.into(),
1245+
dictionary_buf_position_and_size: buffer_offsets_and_sizes[2],
1246+
dictionary_data_alignment: 1,
1247+
num_dictionary_items,
1248+
}),
1249+
_ => unreachable!(
1250+
"Mini-block dictionary encoding must use Variable, Flat, or General compression"
1251+
),
12441252
}
12451253
} else {
12461254
None
@@ -4291,15 +4299,28 @@ impl PrimitiveStructuralEncoder {
42914299

42924300
let data_block = DataBlock::from_arrays(&arrays, num_values);
42934301

4294-
// if the `data_block` is a `StructDataBlock`, then this is a struct with packed struct encoding.
4295-
if let DataBlock::Struct(ref struct_data_block) = data_block {
4296-
if struct_data_block
4297-
.children
4298-
.iter()
4299-
.any(|child| !matches!(child, DataBlock::FixedWidth(_)))
4300-
{
4301-
panic!("packed struct encoding currently only supports fixed-width fields.")
4302-
}
4302+
let requires_full_zip_packed_struct =
4303+
if let DataBlock::Struct(ref struct_data_block) = data_block {
4304+
struct_data_block.has_variable_width_child()
4305+
} else {
4306+
false
4307+
};
4308+
4309+
if requires_full_zip_packed_struct {
4310+
log::debug!(
4311+
"Encoding column {} with {} items using full-zip packed struct layout",
4312+
column_idx,
4313+
num_values
4314+
);
4315+
return Self::encode_full_zip(
4316+
column_idx,
4317+
&field,
4318+
compression_strategy.as_ref(),
4319+
data_block,
4320+
repdefs,
4321+
row_number,
4322+
num_rows,
4323+
);
43034324
}
43044325

43054326
if let DataBlock::Dictionary(dict) = data_block {

0 commit comments

Comments
 (0)