Skip to content

Commit e79a2d3

Browse files
kroushan-nitthe-other-tim-brown
authored andcommitted
[590] Add Delta HMS Catalog Sync implementation
1 parent f2045be commit e79a2d3

File tree

9 files changed

+361
-44
lines changed

9 files changed

+361
-44
lines changed

pom.xml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
<apache.rat.version>0.16.1</apache.rat.version>
8989
<google.java.format.version>1.8</google.java.format.version>
9090
<delta.standalone.version>0.5.0</delta.standalone.version>
91+
<delta.hive.version>3.0.0</delta.hive.version>
9192
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
9293
<target.dir.pattern>**/target/**</target.dir.pattern>
9394
<delombok.output.dir>${project.build.directory}/delombok</delombok.output.dir>
@@ -280,6 +281,11 @@
280281
<version>${iceberg.version}</version>
281282
<scope>test</scope>
282283
</dependency>
284+
<dependency>
285+
<groupId>org.apache.iceberg</groupId>
286+
<artifactId>iceberg-hive-runtime</artifactId>
287+
<version>${iceberg.version}</version>
288+
</dependency>
283289

284290
<!-- Delta -->
285291
<dependency>
@@ -291,7 +297,11 @@
291297
<groupId>io.delta</groupId>
292298
<artifactId>delta-standalone_${scala.binary.version}</artifactId>
293299
<version>${delta.standalone.version}</version>
294-
<scope>test</scope>
300+
</dependency>
301+
<dependency>
302+
<groupId>io.delta</groupId>
303+
<artifactId>delta-hive_${scala.binary.version}</artifactId>
304+
<version>${delta.hive.version}</version>
295305
</dependency>
296306

297307
<!-- Spark -->

xtable-hive-metastore/pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@
4646
<dependency>
4747
<groupId>org.apache.iceberg</groupId>
4848
<artifactId>iceberg-hive-runtime</artifactId>
49-
<version>${iceberg.version}</version>
49+
</dependency>
50+
51+
<!-- Delta dependencies -->
52+
<dependency>
53+
<groupId>io.delta</groupId>
54+
<artifactId>delta-hive_${scala.binary.version}</artifactId>
5055
</dependency>
5156

5257
<!-- HMS dependencies -->

xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,7 @@ private void _init(
194194
} catch (MetaException | HiveException e) {
195195
throw new CatalogSyncException("HiveMetastoreClient could not be created", e);
196196
}
197-
this.tableBuilder =
198-
HMSCatalogTableBuilderFactory.getTableBuilder(tableFormat, this.configuration);
197+
this.tableBuilder = HMSCatalogTableBuilderFactory.getInstance(tableFormat, this.configuration);
199198
}
200199

201200
/**

xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,21 @@
3232

3333
import org.apache.xtable.catalog.CatalogTableBuilder;
3434
import org.apache.xtable.exception.NotSupportedException;
35+
import org.apache.xtable.hms.table.DeltaHMSCatalogTableBuilder;
3536
import org.apache.xtable.hms.table.IcebergHMSCatalogTableBuilder;
3637
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
3738
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
3839
import org.apache.xtable.model.storage.TableFormat;
3940

4041
public class HMSCatalogTableBuilderFactory {
4142

42-
public static CatalogTableBuilder<Table, Table> getTableBuilder(
43+
static CatalogTableBuilder<Table, Table> getInstance(
4344
String tableFormat, Configuration configuration) {
4445
switch (tableFormat) {
4546
case TableFormat.ICEBERG:
4647
return new IcebergHMSCatalogTableBuilder(configuration);
48+
case TableFormat.DELTA:
49+
return new DeltaHMSCatalogTableBuilder();
4750
default:
4851
throw new NotSupportedException("Unsupported table format: " + tableFormat);
4952
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.xtable.hms.table;
20+
21+
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
22+
import static org.apache.xtable.catalog.Constants.PROP_EXTERNAL;
23+
import static org.apache.xtable.catalog.Constants.PROP_PATH;
24+
import static org.apache.xtable.catalog.Constants.PROP_SERIALIZATION_FORMAT;
25+
import static org.apache.xtable.hms.HMSCatalogTableBuilderFactory.newHmsTable;
26+
27+
import java.util.HashMap;
28+
import java.util.Locale;
29+
import java.util.Map;
30+
31+
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
32+
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
33+
import org.apache.hadoop.hive.metastore.api.Table;
34+
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
35+
36+
import com.google.common.annotations.VisibleForTesting;
37+
38+
import io.delta.hive.DeltaStorageHandler;
39+
40+
import org.apache.xtable.catalog.CatalogTableBuilder;
41+
import org.apache.xtable.hms.HMSSchemaExtractor;
42+
import org.apache.xtable.model.InternalTable;
43+
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
44+
import org.apache.xtable.model.storage.TableFormat;
45+
46+
public class DeltaHMSCatalogTableBuilder implements CatalogTableBuilder<Table, Table> {
47+
48+
private final HMSSchemaExtractor schemaExtractor;
49+
private static final String tableFormat = TableFormat.DELTA;
50+
51+
public DeltaHMSCatalogTableBuilder() {
52+
this.schemaExtractor = HMSSchemaExtractor.getInstance();
53+
}
54+
55+
@Override
56+
public Table getCreateTableRequest(InternalTable table, CatalogTableIdentifier tableIdentifier) {
57+
return newHmsTable(tableIdentifier, getStorageDescriptor(table), getTableParameters());
58+
}
59+
60+
@Override
61+
public Table getUpdateTableRequest(
62+
InternalTable table, Table catalogTable, CatalogTableIdentifier tableIdentifier) {
63+
Table copyTb = new Table(catalogTable);
64+
copyTb.getSd().setCols(schemaExtractor.toColumns(tableFormat, table.getReadSchema()));
65+
return copyTb;
66+
}
67+
68+
@VisibleForTesting
69+
StorageDescriptor getStorageDescriptor(InternalTable table) {
70+
final StorageDescriptor storageDescriptor = new StorageDescriptor();
71+
storageDescriptor.setCols(schemaExtractor.toColumns(tableFormat, table.getReadSchema()));
72+
storageDescriptor.setLocation(table.getBasePath());
73+
SerDeInfo serDeInfo = new SerDeInfo();
74+
serDeInfo.setParameters(getSerDeParameters(table));
75+
storageDescriptor.setSerdeInfo(serDeInfo);
76+
return storageDescriptor;
77+
}
78+
79+
@VisibleForTesting
80+
Map<String, String> getTableParameters() {
81+
Map<String, String> parameters = new HashMap<>();
82+
parameters.put(PROP_EXTERNAL, "TRUE");
83+
parameters.put(TABLE_TYPE_PROP, tableFormat.toUpperCase(Locale.ENGLISH));
84+
parameters.put(
85+
hive_metastoreConstants.META_TABLE_STORAGE, DeltaStorageHandler.class.getCanonicalName());
86+
return parameters;
87+
}
88+
89+
private Map<String, String> getSerDeParameters(InternalTable table) {
90+
Map<String, String> parameters = new HashMap<>();
91+
parameters.put(PROP_SERIALIZATION_FORMAT, "1");
92+
parameters.put(PROP_PATH, table.getBasePath());
93+
return parameters;
94+
}
95+
}

xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,28 @@
1818

1919
package org.apache.xtable.hms;
2020

21+
import java.util.Arrays;
2122
import java.util.Collections;
2223
import java.util.HashMap;
24+
import java.util.List;
2325
import java.util.Map;
2426

2527
import org.apache.hadoop.conf.Configuration;
2628
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
2729
import org.apache.hadoop.hive.metastore.api.Database;
30+
import org.apache.hadoop.hive.metastore.api.FieldSchema;
2831
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
2932
import org.apache.hadoop.hive.metastore.api.Table;
3033
import org.mockito.Mock;
3134

3235
import org.apache.xtable.conversion.ExternalCatalogConfig;
3336
import org.apache.xtable.model.InternalTable;
3437
import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
38+
import org.apache.xtable.model.schema.InternalField;
39+
import org.apache.xtable.model.schema.InternalPartitionField;
3540
import org.apache.xtable.model.schema.InternalSchema;
41+
import org.apache.xtable.model.schema.InternalType;
42+
import org.apache.xtable.model.schema.PartitionTransformType;
3643
import org.apache.xtable.model.storage.CatalogType;
3744
import org.apache.xtable.model.storage.TableFormat;
3845

@@ -57,17 +64,80 @@ public class HMSCatalogSyncClientTestBase {
5764

5865
protected static final String ICEBERG_METADATA_FILE_LOCATION = "base-path/metadata";
5966
protected static final String ICEBERG_METADATA_FILE_LOCATION_V2 = "base-path/v2-metadata";
67+
protected static final InternalPartitionField PARTITION_FIELD =
68+
InternalPartitionField.builder()
69+
.sourceField(
70+
InternalField.builder()
71+
.name("partitionField")
72+
.schema(
73+
InternalSchema.builder().name("string").dataType(InternalType.STRING).build())
74+
.build())
75+
.transformType(PartitionTransformType.VALUE)
76+
.build();
77+
protected static final InternalSchema INTERNAL_SCHEMA =
78+
InternalSchema.builder()
79+
.dataType(InternalType.RECORD)
80+
.fields(
81+
Arrays.asList(
82+
getInternalField("intField", "int", InternalType.INT),
83+
getInternalField("stringField", "string", InternalType.STRING),
84+
getInternalField("partitionField", "string", InternalType.STRING)))
85+
.build();
86+
protected static final InternalSchema UPDATED_INTERNAL_SCHEMA =
87+
InternalSchema.builder()
88+
.dataType(InternalType.RECORD)
89+
.fields(
90+
Arrays.asList(
91+
getInternalField("intField", "int", InternalType.INT),
92+
getInternalField("stringField", "string", InternalType.STRING),
93+
getInternalField("partitionField", "string", InternalType.STRING),
94+
getInternalField("booleanField", "boolean", InternalType.BOOLEAN)))
95+
.build();
96+
protected static final List<FieldSchema> FIELD_SCHEMA =
97+
Arrays.asList(
98+
getFieldSchema("intField", "int"),
99+
getFieldSchema("stringField", "string"),
100+
getFieldSchema("partitionField", "string"));
101+
protected static final List<FieldSchema> UPDATED_FIELD_SCHEMA =
102+
Arrays.asList(
103+
getFieldSchema("intField", "int"),
104+
getFieldSchema("stringField", "string"),
105+
getFieldSchema("partitionField", "string"),
106+
getFieldSchema("booleanField", "boolean"));
60107
protected static final InternalTable TEST_ICEBERG_INTERNAL_TABLE =
61108
InternalTable.builder()
62109
.basePath(TEST_BASE_PATH)
63110
.tableFormat(TableFormat.ICEBERG)
64-
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
111+
.readSchema(INTERNAL_SCHEMA)
112+
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
113+
.build();
114+
protected static final InternalTable TEST_UPDATED_ICEBERG_INTERNAL_TABLE =
115+
InternalTable.builder()
116+
.basePath(TEST_BASE_PATH)
117+
.tableFormat(TableFormat.ICEBERG)
118+
.readSchema(UPDATED_INTERNAL_SCHEMA)
119+
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
120+
.build();
121+
protected static final InternalTable TEST_DELTA_INTERNAL_TABLE =
122+
InternalTable.builder()
123+
.basePath(TEST_BASE_PATH)
124+
.tableFormat(TableFormat.DELTA)
125+
.readSchema(INTERNAL_SCHEMA)
126+
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
127+
.build();
128+
protected static final InternalTable TEST_UPDATED_DELTA_INTERNAL_TABLE =
129+
InternalTable.builder()
130+
.basePath(TEST_BASE_PATH)
131+
.tableFormat(TableFormat.DELTA)
132+
.readSchema(UPDATED_INTERNAL_SCHEMA)
133+
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
65134
.build();
66135
protected static final InternalTable TEST_HUDI_INTERNAL_TABLE =
67136
InternalTable.builder()
68137
.basePath(TEST_BASE_PATH)
69138
.tableFormat(TableFormat.HUDI)
70-
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
139+
.readSchema(INTERNAL_SCHEMA)
140+
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
71141
.build();
72142
protected static final ThreePartHierarchicalTableIdentifier TEST_CATALOG_TABLE_IDENTIFIER =
73143
new ThreePartHierarchicalTableIdentifier(TEST_HMS_DATABASE, TEST_HMS_TABLE);
@@ -95,4 +165,16 @@ protected Database newDatabase(String dbName) {
95165
return new Database(
96166
dbName, "Created by " + HMSCatalogSyncClient.class.getName(), null, Collections.emptyMap());
97167
}
168+
169+
protected static FieldSchema getFieldSchema(String name, String type) {
170+
return new FieldSchema(name, type, null);
171+
}
172+
173+
protected static InternalField getInternalField(
174+
String fieldName, String schemaName, InternalType dataType) {
175+
return InternalField.builder()
176+
.name(fieldName)
177+
.schema(InternalSchema.builder().name(schemaName).dataType(dataType).build())
178+
.build();
179+
}
98180
}

xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.xtable.hms;
2020

21+
import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.FIELD_SCHEMA;
2122
import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_CATALOG_TABLE_IDENTIFIER;
2223
import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_DATABASE;
2324
import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_TABLE;
@@ -48,15 +49,15 @@ void testNewHmsTable() {
4849
expected.setTableName(TEST_HMS_TABLE);
4950
expected.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
5051
expected.setCreateTime((int) createdTime.getEpochSecond());
51-
expected.setSd(getTestHmsTableStorageDescriptor());
52+
expected.setSd(getTestHmsTableStorageDescriptor(FIELD_SCHEMA));
5253
expected.setTableType("EXTERNAL_TABLE");
5354
expected.setParameters(getTestHmsTableParameters());
5455

5556
assertEquals(
5657
expected,
5758
HMSCatalogTableBuilderFactory.newHmsTable(
5859
TEST_CATALOG_TABLE_IDENTIFIER,
59-
getTestHmsTableStorageDescriptor(),
60+
getTestHmsTableStorageDescriptor(FIELD_SCHEMA),
6061
getTestHmsTableParameters()));
6162
}
6263
}

0 commit comments

Comments
 (0)