Skip to content

Commit 571fd67

Browse files
anton-kutuzovraunaqmorarka
authored andcommitted
Fix updating column statistics when inserting with overwrite mode
1 parent 8c2a693 commit 571fd67

File tree

3 files changed

+103
-9
lines changed

3 files changed

+103
-9
lines changed

plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@
181181
import static io.trino.metastore.PrincipalPrivileges.NO_PRIVILEGES;
182182
import static io.trino.metastore.PrincipalPrivileges.fromHivePrivilegeInfos;
183183
import static io.trino.metastore.StatisticsUpdateMode.MERGE_INCREMENTAL;
184+
import static io.trino.metastore.StatisticsUpdateMode.OVERWRITE_ALL;
184185
import static io.trino.metastore.StorageFormat.VIEW_STORAGE_FORMAT;
185186
import static io.trino.metastore.type.Category.PRIMITIVE;
186187
import static io.trino.parquet.writer.ParquetWriter.SUPPORTED_BLOOM_FILTER_TYPES;
@@ -2360,7 +2361,8 @@ else if (partitionUpdate.getUpdateMode() == APPEND) {
23602361
partitionValues,
23612362
partitionUpdate.getWritePath(),
23622363
partitionUpdate.getFileNames(),
2363-
partitionStatistics));
2364+
partitionStatistics,
2365+
MERGE_INCREMENTAL));
23642366
}
23652367
else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode() == OVERWRITE) {
23662368
// insert into new partition or overwrite existing partition
@@ -2385,6 +2387,13 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode
23852387
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
23862388
cleanExtraOutputFiles(fileSystem, session.getQueryId(), partitionUpdate.getTargetPath(), ImmutableSet.copyOf(partitionUpdate.getFileNames()));
23872389
}
2390+
partitionUpdateInfosBuilder.add(
2391+
new PartitionUpdateInfo(
2392+
partitionValues,
2393+
partitionUpdate.getWritePath(),
2394+
partitionUpdate.getFileNames(),
2395+
partitionStatistics,
2396+
OVERWRITE_ALL));
23882397
}
23892398
else {
23902399
metastore.dropPartition(session, handle.getSchemaName(), handle.getTableName(), partition.getValues(), true);
@@ -2690,7 +2699,8 @@ private void finishOptimize(ConnectorSession session, ConnectorTableExecuteHandl
26902699
partitionValues,
26912700
partitionUpdate.getWritePath(),
26922701
partitionUpdate.getFileNames(),
2693-
PartitionStatistics.empty()));
2702+
PartitionStatistics.empty(),
2703+
MERGE_INCREMENTAL));
26942704
}
26952705
}
26962706

plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ public class SemiTransactionalHiveMetastore
171171
AcidOperation.INSERT, ActionType.INSERT_EXISTING,
172172
AcidOperation.MERGE, ActionType.MERGE);
173173

174+
private static final boolean SHOULD_MERGE_STATISTICS = true;
175+
174176
private final HiveMetastore delegate;
175177
private final TypeManager typeManager;
176178
private final boolean partitionProjectionEnabled;
@@ -911,7 +913,7 @@ public synchronized void addPartition(
911913
if (oldPartitionAction == null) {
912914
partitionActionsOfTable.put(
913915
partition.getValues(),
914-
new Action<>(ActionType.ADD, new PartitionAndMore(partition, currentLocation, files, statistics, statistics, cleanExtraOutputFilesOnCommit), session.getIdentity(), session.getQueryId()));
916+
new Action<>(ActionType.ADD, new PartitionAndMore(partition, currentLocation, files, statistics, statistics, SHOULD_MERGE_STATISTICS, cleanExtraOutputFilesOnCommit), session.getIdentity(), session.getQueryId()));
915917
return;
916918
}
917919
switch (oldPartitionAction.type()) {
@@ -921,7 +923,7 @@ public synchronized void addPartition(
921923
}
922924
partitionActionsOfTable.put(
923925
partition.getValues(),
924-
new Action<>(ActionType.ALTER, new PartitionAndMore(partition, currentLocation, files, statistics, statistics, cleanExtraOutputFilesOnCommit), session.getIdentity(), session.getQueryId()));
926+
new Action<>(ActionType.ALTER, new PartitionAndMore(partition, currentLocation, files, statistics, statistics, SHOULD_MERGE_STATISTICS, cleanExtraOutputFilesOnCommit), session.getIdentity(), session.getQueryId()));
925927
}
926928
case ADD, ALTER, INSERT_EXISTING, MERGE ->
927929
throw new TrinoException(ALREADY_EXISTS, format("Partition already exists for table '%s.%s': %s", databaseName, tableName, partition.getValues()));
@@ -1003,8 +1005,9 @@ public synchronized void finishInsertIntoExistingPartitions(
10031005
partition,
10041006
partitionInfo.currentLocation(),
10051007
Optional.of(partitionInfo.fileNames()),
1006-
MERGE_INCREMENTAL.updatePartitionStatistics(currentStatistics, partitionInfo.statisticsUpdate()),
1008+
partitionInfo.statisticsUpdateMode().updatePartitionStatistics(currentStatistics, partitionInfo.statisticsUpdate()),
10071009
partitionInfo.statisticsUpdate(),
1010+
partitionInfo.statisticsUpdateMode() != OVERWRITE_ALL,
10081011
cleanExtraOutputFilesOnCommit),
10091012
session.getIdentity(),
10101013
session.getQueryId()));
@@ -1989,7 +1992,7 @@ private void prepareInsertExistingPartition(ConnectorIdentity identity, String q
19891992
partition.getSchemaTableName(),
19901993
Optional.of(getPartitionName(partition.getDatabaseName(), partition.getTableName(), partition.getValues())),
19911994
partitionAndMore.statisticsUpdate(),
1992-
true));
1995+
partitionAndMore.mergeStatistic()));
19931996
}
19941997

19951998
private void executeCleanupTasksForAbort(Collection<DeclaredIntentionToWrite> declaredIntentionsToWrite)
@@ -2840,6 +2843,7 @@ private record PartitionAndMore(
28402843
Optional<List<String>> fileNames,
28412844
PartitionStatistics statistics,
28422845
PartitionStatistics statisticsUpdate,
2846+
boolean mergeStatistic,
28432847
boolean cleanExtraOutputFilesOnCommit)
28442848
{
28452849
private PartitionAndMore
@@ -3362,14 +3366,15 @@ public static void cleanExtraOutputFiles(TrinoFileSystem fileSystem, String quer
33623366
}
33633367
}
33643368

3365-
public record PartitionUpdateInfo(List<String> partitionValues, Location currentLocation, List<String> fileNames, PartitionStatistics statisticsUpdate)
3369+
public record PartitionUpdateInfo(List<String> partitionValues, Location currentLocation, List<String> fileNames, PartitionStatistics statisticsUpdate, StatisticsUpdateMode statisticsUpdateMode)
33663370
{
33673371
public PartitionUpdateInfo
33683372
{
33693373
requireNonNull(partitionValues, "partitionValues is null");
33703374
requireNonNull(currentLocation, "currentLocation is null");
33713375
requireNonNull(fileNames, "fileNames is null");
33723376
requireNonNull(statisticsUpdate, "statisticsUpdate is null");
3377+
requireNonNull(statisticsUpdateMode, "statisticsUpdateMode is null");
33733378
}
33743379
}
33753380
}

plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseTestHiveOnDataLake.java

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import java.util.List;
5151
import java.util.Map;
5252
import java.util.Optional;
53+
import java.util.OptionalLong;
5354
import java.util.TimeZone;
5455
import java.util.stream.Collectors;
5556

@@ -281,6 +282,80 @@ public void testSyncPartitionOnBucketRoot()
281282
assertUpdate("DROP TABLE " + fullyQualifiedTestTableName);
282283
}
283284

285+
@Test
286+
public void testUpdateStatisticInsertOverwritePartitionedTable()
287+
{
288+
String partitionValue = "0";
289+
Session session = Session.builder(getQueryRunner().getDefaultSession())
290+
.setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "OVERWRITE")
291+
.setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true")
292+
.build();
293+
String tableName = "test_statistic" + randomNameSuffix();
294+
String testTable = getFullyQualifiedTestTableName(tableName);
295+
computeActual(getCreateTableStatement(
296+
testTable,
297+
"partitioned_by=ARRAY['regionkey']"));
298+
copyTpchNationToTable(session, testTable);
299+
Table hiveTable = metastoreClient.getTable(HIVE_TEST_SCHEMA, tableName).orElseThrow();
300+
Partition partition = metastoreClient.getPartition(hiveTable, List.of(partitionValue)).orElseThrow();
301+
Map<String, Map<String, HiveColumnStatistics>> partitionStatistics = metastoreClient.getPartitionColumnStatistics(
302+
HIVE_TEST_SCHEMA,
303+
tableName,
304+
ImmutableSet.of("regionkey=0"),
305+
partition.getColumns().stream().map(Column::getName).collect(toSet()));
306+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().isPresent()).isTrue();
307+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().get().getMin()).isEqualTo(OptionalLong.of(0));
308+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().get().getMax()).isEqualTo(OptionalLong.of(16));
309+
310+
assertUpdate(session, "INSERT INTO " + testTable + "(name, comment, nationkey, regionkey) values ('name1', 'comment1', 20, 0)", 1);
311+
312+
partitionStatistics = metastoreClient.getPartitionColumnStatistics(
313+
HIVE_TEST_SCHEMA,
314+
tableName,
315+
ImmutableSet.of("regionkey=0"),
316+
partition.getColumns().stream().map(Column::getName).collect(toSet()));
317+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().isPresent()).isTrue();
318+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().get().getMin()).isEqualTo(OptionalLong.of(20));
319+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().get().getMax()).isEqualTo(OptionalLong.of(20));
320+
}
321+
322+
@Test
323+
public void testUpdateStatisticInsertAppendPartitionedTable()
324+
{
325+
String partitionValue = "0";
326+
Session session = Session.builder(getQueryRunner().getDefaultSession())
327+
.setCatalogSessionProperty("hive", "insert_existing_partitions_behavior", "APPEND")
328+
.setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true")
329+
.build();
330+
String tableName = "test_statistic" + randomNameSuffix();
331+
String testTable = getFullyQualifiedTestTableName(tableName);
332+
computeActual(session, getCreateTableStatement(
333+
testTable,
334+
"partitioned_by=ARRAY['regionkey']"));
335+
copyTpchNationToTable(session, testTable);
336+
Table hiveTable = metastoreClient.getTable(HIVE_TEST_SCHEMA, tableName).orElseThrow();
337+
Partition partition = metastoreClient.getPartition(hiveTable, List.of(partitionValue)).orElseThrow();
338+
Map<String, Map<String, HiveColumnStatistics>> partitionStatistics = metastoreClient.getPartitionColumnStatistics(
339+
HIVE_TEST_SCHEMA,
340+
tableName,
341+
ImmutableSet.of("regionkey=0"),
342+
partition.getColumns().stream().map(Column::getName).collect(toSet()));
343+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().isPresent()).isTrue();
344+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().get().getMin()).isEqualTo(OptionalLong.of(0));
345+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().get().getMax()).isEqualTo(OptionalLong.of(16));
346+
347+
computeActual(session, "INSERT INTO " + testTable + "(name, comment, nationkey, regionkey) values ('name1', 'comment1', 20, 0)");
348+
349+
partitionStatistics = metastoreClient.getPartitionColumnStatistics(
350+
HIVE_TEST_SCHEMA,
351+
tableName,
352+
ImmutableSet.of("regionkey=0"),
353+
partition.getColumns().stream().map(Column::getName).collect(toSet()));
354+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().isPresent()).isTrue();
355+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().get().getMin()).isEqualTo(OptionalLong.of(0));
356+
assertThat(partitionStatistics.get("regionkey=0").get("nationkey").getIntegerStatistics().get().getMax()).isEqualTo(OptionalLong.of(20));
357+
}
358+
284359
@Test
285360
public void testSyncPartitionCaseSensitivePathVariation()
286361
{
@@ -2470,9 +2545,13 @@ protected String getCreateTableStatement(String tableName, List<String> properti
24702545
tableName);
24712546
}
24722547

2473-
protected void copyTpchNationToTable(String testTable)
2548+
protected void copyTpchNationToTable(String testTable) {
2549+
copyTpchNationToTable(getSession(), testTable);
2550+
}
2551+
2552+
protected void copyTpchNationToTable(Session session, String testTable)
24742553
{
2475-
computeActual(format("INSERT INTO " + testTable + " SELECT name, comment, nationkey, regionkey FROM tpch.tiny.nation"));
2554+
computeActual(session, format("INSERT INTO " + testTable + " SELECT name, comment, nationkey, regionkey FROM tpch.tiny.nation"));
24762555
}
24772556

24782557
private void testWriteWithFileSize(String testTable, int scaleFactorInThousands, long fileSizeRangeStart, long fileSizeRangeEnd)

0 commit comments

Comments
 (0)