Skip to content

Commit 8cceec7

Browse files
authored
Paimon Source Support (#742)
* Paimon initial support # Conflicts: # xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java # xtable-core/src/test/java/org/apache/xtable/GenericTable.java # xtable-core/src/test/java/org/apache/xtable/ITConversionController.java * Expanding imports and removing java 11 target * fix compilation issue * Add Paimon Unit Tests * Addressing review comments * Fix test * Parameterizing timestamp precison tests * Fix timestamp precision metadata * Fixing tests by removing the paimon catalog config in spark
1 parent 93d4d27 commit 8cceec7

File tree

18 files changed

+2244
-39
lines changed

18 files changed

+2244
-39
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ hs_err_pid*
2626
# Ignore java-version and idea files.
2727
.java-version
2828
.idea
29+
.vscode
2930

3031
# Ignore Gradle project-specific cache directory
3132
.gradle

pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
<spark.version.prefix>3.4</spark.version.prefix>
8989
<iceberg.version>1.4.2</iceberg.version>
9090
<delta.version>2.4.0</delta.version>
91+
<paimon.version>1.2.0</paimon.version>
9192
<jackson.version>2.18.2</jackson.version>
9293
<spotless.version>2.43.0</spotless.version>
9394
<apache.rat.version>0.16.1</apache.rat.version>
@@ -333,6 +334,18 @@
333334
<version>${delta.hive.version}</version>
334335
</dependency>
335336

337+
<!-- Paimon -->
338+
<dependency>
339+
<groupId>org.apache.paimon</groupId>
340+
<artifactId>paimon-bundle</artifactId>
341+
<version>${paimon.version}</version>
342+
</dependency>
343+
<dependency>
344+
<groupId>org.apache.paimon</groupId>
345+
<artifactId>paimon-spark-${spark.version.prefix}</artifactId>
346+
<version>${paimon.version}</version>
347+
</dependency>
348+
336349
<!-- Spark -->
337350
<dependency>
338351
<groupId>org.apache.spark</groupId>

xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ public class TableFormat {
2727
public static final String HUDI = "HUDI";
2828
public static final String ICEBERG = "ICEBERG";
2929
public static final String DELTA = "DELTA";
30+
public static final String PAIMON = "PAIMON";
3031
public static final String PARQUET = "PARQUET";
3132

3233
public static String[] values() {
33-
return new String[] {"HUDI", "ICEBERG", "DELTA"};
34+
return new String[] {"HUDI", "ICEBERG", "DELTA", "PAIMON"};
3435
}
3536
}

xtable-core/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@
110110
<scope>test</scope>
111111
</dependency>
112112

113+
<!-- Paimon dependencies -->
114+
<dependency>
115+
<groupId>org.apache.paimon</groupId>
116+
<artifactId>paimon-bundle</artifactId>
117+
</dependency>
118+
<dependency>
119+
<groupId>org.apache.paimon</groupId>
120+
<artifactId>paimon-spark-${spark.version.prefix}</artifactId>
121+
<scope>test</scope>
122+
</dependency>
123+
113124
<!-- Hadoop dependencies -->
114125
<dependency>
115126
<groupId>org.apache.hadoop</groupId>
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.paimon;
20+
21+
import java.io.IOException;
22+
import java.time.Instant;
23+
import java.util.List;
24+
25+
import lombok.extern.log4j.Log4j2;
26+
27+
import org.apache.paimon.Snapshot;
28+
import org.apache.paimon.schema.SchemaManager;
29+
import org.apache.paimon.schema.TableSchema;
30+
import org.apache.paimon.table.FileStoreTable;
31+
import org.apache.paimon.utils.SnapshotManager;
32+
33+
import org.apache.xtable.exception.ReadException;
34+
import org.apache.xtable.model.*;
35+
import org.apache.xtable.model.schema.InternalPartitionField;
36+
import org.apache.xtable.model.schema.InternalSchema;
37+
import org.apache.xtable.model.storage.DataLayoutStrategy;
38+
import org.apache.xtable.model.storage.InternalDataFile;
39+
import org.apache.xtable.model.storage.PartitionFileGroup;
40+
import org.apache.xtable.model.storage.TableFormat;
41+
import org.apache.xtable.spi.extractor.ConversionSource;
42+
43+
@Log4j2
44+
public class PaimonConversionSource implements ConversionSource<Snapshot> {
45+
46+
private final FileStoreTable paimonTable;
47+
private final SchemaManager schemaManager;
48+
private final SnapshotManager snapshotManager;
49+
50+
private final PaimonDataFileExtractor dataFileExtractor = PaimonDataFileExtractor.getInstance();
51+
private final PaimonSchemaExtractor schemaExtractor = PaimonSchemaExtractor.getInstance();
52+
private final PaimonPartitionExtractor partitionSpecExtractor =
53+
PaimonPartitionExtractor.getInstance();
54+
55+
public PaimonConversionSource(FileStoreTable paimonTable) {
56+
this.paimonTable = paimonTable;
57+
this.schemaManager = paimonTable.schemaManager();
58+
this.snapshotManager = paimonTable.snapshotManager();
59+
}
60+
61+
@Override
62+
public InternalTable getTable(Snapshot snapshot) {
63+
TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId());
64+
InternalSchema internalSchema = schemaExtractor.toInternalSchema(paimonSchema);
65+
66+
List<String> partitionKeys = paimonTable.partitionKeys();
67+
List<InternalPartitionField> partitioningFields =
68+
partitionSpecExtractor.toInternalPartitionFields(partitionKeys, internalSchema);
69+
70+
return InternalTable.builder()
71+
.name(paimonTable.name())
72+
.tableFormat(TableFormat.PAIMON)
73+
.readSchema(internalSchema)
74+
.layoutStrategy(DataLayoutStrategy.HIVE_STYLE_PARTITION)
75+
.basePath(paimonTable.location().toString())
76+
.partitioningFields(partitioningFields)
77+
.latestCommitTime(Instant.ofEpochMilli(snapshot.timeMillis()))
78+
.latestMetadataPath(snapshotManager.snapshotPath(snapshot.id()).toString())
79+
.build();
80+
}
81+
82+
@Override
83+
public InternalTable getCurrentTable() {
84+
Snapshot snapshot = getLastSnapshot();
85+
return getTable(snapshot);
86+
}
87+
88+
@Override
89+
public InternalSnapshot getCurrentSnapshot() {
90+
Snapshot snapshot = getLastSnapshot();
91+
InternalTable internalTable = getTable(snapshot);
92+
InternalSchema internalSchema = internalTable.getReadSchema();
93+
List<InternalDataFile> dataFiles =
94+
dataFileExtractor.toInternalDataFiles(paimonTable, snapshot, internalSchema);
95+
96+
return InternalSnapshot.builder()
97+
.table(internalTable)
98+
.version(Long.toString(snapshot.timeMillis()))
99+
.partitionedDataFiles(PartitionFileGroup.fromFiles(dataFiles))
100+
// TODO : Implement pending commits extraction, required for incremental sync
101+
// https://github.com/apache/incubator-xtable/issues/754
102+
.sourceIdentifier(getCommitIdentifier(snapshot))
103+
.build();
104+
}
105+
106+
private Snapshot getLastSnapshot() {
107+
SnapshotManager snapshotManager = paimonTable.snapshotManager();
108+
Snapshot snapshot = snapshotManager.latestSnapshot();
109+
if (snapshot == null) {
110+
throw new ReadException("No snapshots found for table " + paimonTable.name());
111+
}
112+
return snapshot;
113+
}
114+
115+
@Override
116+
public TableChange getTableChangeForCommit(Snapshot snapshot) {
117+
throw new UnsupportedOperationException("Incremental Sync is not supported yet.");
118+
}
119+
120+
@Override
121+
public CommitsBacklog<Snapshot> getCommitsBacklog(
122+
InstantsForIncrementalSync instantsForIncrementalSync) {
123+
throw new UnsupportedOperationException("Incremental Sync is not supported yet.");
124+
}
125+
126+
@Override
127+
public boolean isIncrementalSyncSafeFrom(Instant instant) {
128+
return false; // Incremental sync is not supported yet
129+
}
130+
131+
@Override
132+
public String getCommitIdentifier(Snapshot snapshot) {
133+
return Long.toString(snapshot.commitIdentifier());
134+
}
135+
136+
@Override
137+
public void close() throws IOException {}
138+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.paimon;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.paimon.Snapshot;
24+
import org.apache.paimon.catalog.CatalogContext;
25+
import org.apache.paimon.fs.FileIO;
26+
import org.apache.paimon.fs.Path;
27+
import org.apache.paimon.options.Options;
28+
import org.apache.paimon.table.FileStoreTable;
29+
import org.apache.paimon.table.FileStoreTableFactory;
30+
31+
import org.apache.xtable.conversion.ConversionSourceProvider;
32+
import org.apache.xtable.conversion.SourceTable;
33+
import org.apache.xtable.exception.ReadException;
34+
import org.apache.xtable.spi.extractor.ConversionSource;
35+
36+
public class PaimonConversionSourceProvider extends ConversionSourceProvider<Snapshot> {
37+
@Override
38+
public ConversionSource<Snapshot> getConversionSourceInstance(SourceTable sourceTableConfig) {
39+
try {
40+
Options catalogOptions = new Options();
41+
CatalogContext context = CatalogContext.create(catalogOptions, hadoopConf);
42+
43+
Path path = new Path(sourceTableConfig.getDataPath());
44+
FileIO fileIO = FileIO.get(path, context);
45+
FileStoreTable paimonTable = FileStoreTableFactory.create(fileIO, path);
46+
47+
return new PaimonConversionSource(paimonTable);
48+
} catch (IOException e) {
49+
throw new ReadException("Failed to read Paimon table from file system", e);
50+
}
51+
}
52+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.paimon;
20+
21+
import java.util.*;
22+
23+
import org.apache.paimon.Snapshot;
24+
import org.apache.paimon.io.DataFileMeta;
25+
import org.apache.paimon.manifest.ManifestEntry;
26+
import org.apache.paimon.table.FileStoreTable;
27+
import org.apache.paimon.table.source.snapshot.SnapshotReader;
28+
29+
import org.apache.xtable.model.schema.InternalSchema;
30+
import org.apache.xtable.model.stat.ColumnStat;
31+
import org.apache.xtable.model.storage.InternalDataFile;
32+
33+
public class PaimonDataFileExtractor {
34+
35+
private final PaimonPartitionExtractor partitionExtractor =
36+
PaimonPartitionExtractor.getInstance();
37+
38+
private static final PaimonDataFileExtractor INSTANCE = new PaimonDataFileExtractor();
39+
40+
public static PaimonDataFileExtractor getInstance() {
41+
return INSTANCE;
42+
}
43+
44+
public List<InternalDataFile> toInternalDataFiles(
45+
FileStoreTable table, Snapshot snapshot, InternalSchema internalSchema) {
46+
List<InternalDataFile> result = new ArrayList<>();
47+
Iterator<ManifestEntry> manifestEntryIterator =
48+
newSnapshotReader(table, snapshot).readFileIterator();
49+
while (manifestEntryIterator.hasNext()) {
50+
result.add(toInternalDataFile(table, manifestEntryIterator.next(), internalSchema));
51+
}
52+
return result;
53+
}
54+
55+
private InternalDataFile toInternalDataFile(
56+
FileStoreTable table, ManifestEntry entry, InternalSchema internalSchema) {
57+
return InternalDataFile.builder()
58+
.physicalPath(toFullPhysicalPath(table, entry))
59+
.fileSizeBytes(entry.file().fileSize())
60+
.lastModified(entry.file().creationTimeEpochMillis())
61+
.recordCount(entry.file().rowCount())
62+
.partitionValues(
63+
partitionExtractor.toPartitionValues(table, entry.partition(), internalSchema))
64+
.columnStats(toColumnStats(entry.file()))
65+
.build();
66+
}
67+
68+
private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) {
69+
String basePath = table.location().toString();
70+
String bucketPath = "bucket-" + entry.bucket();
71+
String filePath = entry.file().fileName();
72+
73+
Optional<String> partitionPath = partitionExtractor.toPartitionPath(table, entry.partition());
74+
if (partitionPath.isPresent()) {
75+
return String.join("/", basePath, partitionPath.get(), bucketPath, filePath);
76+
} else {
77+
return String.join("/", basePath, bucketPath, filePath);
78+
}
79+
}
80+
81+
private List<ColumnStat> toColumnStats(DataFileMeta file) {
82+
// TODO: Implement logic to extract column stats from the file meta
83+
// https://github.com/apache/incubator-xtable/issues/755
84+
return Collections.emptyList();
85+
}
86+
87+
private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot snapshot) {
88+
// If the table has primary keys, we read only the top level files
89+
// which means we can only consider fully compacted files.
90+
if (!table.schema().primaryKeys().isEmpty()) {
91+
return table
92+
.newSnapshotReader()
93+
.withLevel(table.coreOptions().numLevels() - 1)
94+
.withSnapshot(snapshot);
95+
} else {
96+
return table.newSnapshotReader().withSnapshot(snapshot);
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)