Skip to content

Commit 85220ec

Browse files
authored
Handle empty Iceberg table source in conversion (#756)
* Handle empty Iceberg table source in conversion * applied spotless
1 parent 8cceec7 commit 85220ec

File tree

2 files changed

+92
-2
lines changed

2 files changed

+92
-2
lines changed

xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ private FileIO initTableOps() {
108108
@Override
109109
public InternalTable getTable(Snapshot snapshot) {
110110
Table iceTable = getSourceTable();
111-
Schema iceSchema = iceTable.schemas().get(snapshot.schemaId());
111+
Schema iceSchema =
112+
(snapshot != null) ? iceTable.schemas().get(snapshot.schemaId()) : iceTable.schema();
112113
TableOperations iceOps = ((BaseTable) iceTable).operations();
113114
IcebergSchemaExtractor schemaExtractor = IcebergSchemaExtractor.getInstance();
114115
InternalSchema irSchema = schemaExtractor.fromIceberg(iceSchema);
@@ -123,12 +124,19 @@ public InternalTable getTable(Snapshot snapshot) {
123124
irPartitionFields.size() > 0
124125
? DataLayoutStrategy.HIVE_STYLE_PARTITION
125126
: DataLayoutStrategy.FLAT;
127+
128+
Instant latestCommitTime =
129+
(snapshot != null)
130+
? Instant.ofEpochMilli(snapshot.timestampMillis())
131+
: Instant.ofEpochMilli(
132+
((BaseTable) iceTable).operations().current().lastUpdatedMillis());
133+
126134
return InternalTable.builder()
127135
.tableFormat(TableFormat.ICEBERG)
128136
.basePath(iceTable.location())
129137
.name(iceTable.name())
130138
.partitioningFields(irPartitionFields)
131-
.latestCommitTime(Instant.ofEpochMilli(snapshot.timestampMillis()))
139+
.latestCommitTime(latestCommitTime)
132140
.readSchema(irSchema)
133141
.layoutStrategy(dataLayoutStrategy)
134142
.latestMetadataPath(iceOps.current().metadataFileLocation())
@@ -147,6 +155,18 @@ public InternalSnapshot getCurrentSnapshot() {
147155
Table iceTable = getSourceTable();
148156

149157
Snapshot currentSnapshot = iceTable.currentSnapshot();
158+
159+
if (currentSnapshot == null) {
160+
// Handle empty table case - return snapshot with schema but no data files
161+
InternalTable irTable = getTable(null);
162+
return InternalSnapshot.builder()
163+
.version("0")
164+
.table(irTable)
165+
.partitionedDataFiles(Collections.emptyList())
166+
.sourceIdentifier("0")
167+
.build();
168+
}
169+
150170
InternalTable irTable = getTable(currentSnapshot);
151171

152172
TableScan scan =

xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,37 @@ void getTableTest(@TempDir Path workingDir) throws IOException {
116116
internalTable.getPartitioningFields().get(0).getTransformType());
117117
}
118118

119+
@Test
120+
void testGetTableWithoutSnapshot(@TempDir Path workingDir) throws IOException {
121+
Table emptyTable = createTestCatalogTable(workingDir.toString());
122+
assertNull(emptyTable.currentSnapshot());
123+
124+
SourceTable sourceTableConfig = getPerTableConfig(emptyTable);
125+
126+
IcebergConversionSource conversionSource =
127+
sourceProvider.getConversionSourceInstance(sourceTableConfig);
128+
129+
InternalTable internalTable = conversionSource.getTable(null);
130+
assertNotNull(internalTable);
131+
assertEquals(TableFormat.ICEBERG, internalTable.getTableFormat());
132+
assertEquals(emptyTable.location(), internalTable.getBasePath());
133+
assertEquals(
134+
((BaseTable) emptyTable).operations().current().lastUpdatedMillis(),
135+
internalTable.getLatestCommitTime().toEpochMilli());
136+
137+
assertEquals(
138+
emptyTable.schema().columns().size(), internalTable.getReadSchema().getFields().size());
139+
validateSchema(internalTable.getReadSchema(), emptyTable.schema());
140+
141+
assertEquals(1, internalTable.getPartitioningFields().size());
142+
InternalField partitionField = internalTable.getPartitioningFields().get(0).getSourceField();
143+
assertEquals("cs_sold_date_sk", partitionField.getName());
144+
assertEquals(7, partitionField.getFieldId());
145+
assertEquals(
146+
PartitionTransformType.VALUE,
147+
internalTable.getPartitioningFields().get(0).getTransformType());
148+
}
149+
119150
@Test
120151
public void testGetCurrentSnapshot(@TempDir Path workingDir) throws IOException {
121152
Table catalogSales = createTestTableWithData(workingDir.toString());
@@ -164,6 +195,45 @@ public void testGetCurrentSnapshot(@TempDir Path workingDir) throws IOException
164195
}
165196
}
166197

198+
@Test
199+
void testGetCurrentSnapshotForEmptyTable(@TempDir Path workingDir) throws IOException {
200+
Table emptyTable = createTestCatalogTable(workingDir.toString());
201+
assertNull(emptyTable.currentSnapshot());
202+
203+
SourceTable sourceTableConfig = getPerTableConfig(emptyTable);
204+
205+
IcebergDataFileExtractor spyDataFileExtractor = spy(IcebergDataFileExtractor.builder().build());
206+
IcebergPartitionValueConverter spyPartitionConverter =
207+
spy(IcebergPartitionValueConverter.getInstance());
208+
209+
IcebergConversionSource conversionSource =
210+
IcebergConversionSource.builder()
211+
.hadoopConf(hadoopConf)
212+
.sourceTableConfig(sourceTableConfig)
213+
.dataFileExtractor(spyDataFileExtractor)
214+
.partitionConverter(spyPartitionConverter)
215+
.build();
216+
217+
InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
218+
assertNotNull(internalSnapshot);
219+
assertEquals("0", internalSnapshot.getVersion());
220+
assertEquals("0", internalSnapshot.getSourceIdentifier());
221+
assertTrue(internalSnapshot.getPartitionedDataFiles().isEmpty());
222+
223+
InternalTable internalTable = internalSnapshot.getTable();
224+
assertNotNull(internalTable);
225+
assertEquals(emptyTable.location(), internalTable.getBasePath());
226+
assertEquals(
227+
((BaseTable) emptyTable).operations().current().lastUpdatedMillis(),
228+
internalTable.getLatestCommitTime().toEpochMilli());
229+
230+
assertEquals(
231+
emptyTable.schema().columns().size(), internalTable.getReadSchema().getFields().size());
232+
233+
verify(spyPartitionConverter, never()).toXTable(any(), any(), any());
234+
verify(spyDataFileExtractor, never()).fromIceberg(any(), any(), any());
235+
}
236+
167237
@Test
168238
public void testGetTableChangeForCommit(@TempDir Path workingDir) throws IOException {
169239
Table catalogSales = createTestTableWithData(workingDir.toString());

0 commit comments

Comments
 (0)