Skip to content

Commit d21b364

Browse files
committed
Filter hive splits on $path constraints
Earlier splits were only pruned if the $path predicate was representable as a Domain
1 parent d24393a commit d21b364

File tree

5 files changed

+97
-27
lines changed

5 files changed

+97
-27
lines changed

plugin/trino-hive/src/main/java/io/trino/plugin/hive/BackgroundHiveSplitLoader.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import io.trino.spi.TrinoException;
5353
import io.trino.spi.connector.ColumnHandle;
5454
import io.trino.spi.connector.ConnectorSession;
55+
import io.trino.spi.connector.Constraint;
5556
import io.trino.spi.connector.DynamicFilter;
5657
import io.trino.spi.predicate.TupleDomain;
5758
import io.trino.spi.type.TypeManager;
@@ -148,6 +149,7 @@ public class BackgroundHiveSplitLoader
148149

149150
private final Table table;
150151
private final TupleDomain<? extends ColumnHandle> compactEffectivePredicate;
152+
private final Constraint constraint;
151153
private final DynamicFilter dynamicFilter;
152154
private final long dynamicFilteringWaitTimeoutMillis;
153155
private final TypeManager typeManager;
@@ -192,6 +194,7 @@ public BackgroundHiveSplitLoader(
192194
Table table,
193195
Iterator<HivePartitionMetadata> partitions,
194196
TupleDomain<? extends ColumnHandle> compactEffectivePredicate,
197+
Constraint constraint,
195198
DynamicFilter dynamicFilter,
196199
Duration dynamicFilteringWaitTimeout,
197200
TypeManager typeManager,
@@ -209,6 +212,7 @@ public BackgroundHiveSplitLoader(
209212
{
210213
this.table = table;
211214
this.compactEffectivePredicate = compactEffectivePredicate;
215+
this.constraint = constraint;
212216
this.dynamicFilter = dynamicFilter;
213217
this.dynamicFilteringWaitTimeoutMillis = dynamicFilteringWaitTimeout.toMillis();
214218
this.typeManager = typeManager;
@@ -423,6 +427,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
423427
schema,
424428
partitionKeys,
425429
effectivePredicate,
430+
constraint,
426431
partitionMatchSupplier,
427432
partition.getHiveColumnCoercions(),
428433
Optional.empty(),
@@ -475,6 +480,7 @@ private ListenableFuture<Void> loadPartition(HivePartitionMetadata partition)
475480
schema,
476481
partitionKeys,
477482
effectivePredicate,
483+
constraint,
478484
partitionMatchSupplier,
479485
partition.getHiveColumnCoercions(),
480486
bucketConversionRequiresWorkerParticipation ? bucketConversion : Optional.empty(),

plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ public ConnectorSplitSource getSplits(
263263
table,
264264
hivePartitions,
265265
hiveTable.getCompactEffectivePredicate(),
266+
constraint,
266267
dynamicFilter,
267268
getDynamicFilteringWaitTimeout(session),
268269
typeManager,

plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/InternalHiveSplitFactory.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import com.google.common.collect.ImmutableMap;
18+
import io.airlift.slice.Slice;
1819
import io.airlift.units.DataSize;
1920
import io.trino.metastore.HiveTypeName;
2021
import io.trino.plugin.hive.AcidInfo;
@@ -32,7 +33,10 @@
3233
import io.trino.plugin.hive.parquet.ParquetPageSourceFactory;
3334
import io.trino.plugin.hive.rcfile.RcFilePageSourceFactory;
3435
import io.trino.spi.HostAddress;
36+
import io.trino.spi.connector.ColumnHandle;
37+
import io.trino.spi.connector.Constraint;
3538
import io.trino.spi.predicate.Domain;
39+
import io.trino.spi.predicate.NullableValue;
3640
import io.trino.spi.predicate.TupleDomain;
3741

3842
import java.util.Collection;
@@ -41,11 +45,14 @@
4145
import java.util.Optional;
4246
import java.util.OptionalInt;
4347
import java.util.function.BooleanSupplier;
48+
import java.util.function.Predicate;
4449

4550
import static com.google.common.base.Preconditions.checkArgument;
4651
import static com.google.common.collect.ImmutableList.toImmutableList;
4752
import static io.airlift.slice.Slices.utf8Slice;
53+
import static io.trino.plugin.hive.HiveColumnHandle.PATH_TYPE;
4854
import static io.trino.plugin.hive.HiveColumnHandle.isPathColumnHandle;
55+
import static io.trino.plugin.hive.HiveColumnHandle.pathColumnHandle;
4956
import static io.trino.plugin.hive.util.AcidTables.isFullAcidTable;
5057
import static io.trino.plugin.hive.util.HiveUtil.getSerializationLibraryName;
5158
import static java.util.Objects.requireNonNull;
@@ -58,6 +65,7 @@ public class InternalHiveSplitFactory
5865
private final List<HivePartitionKey> partitionKeys;
5966
private final Optional<Domain> pathDomain;
6067
private final Map<Integer, HiveTypeName> hiveColumnCoercions;
68+
private final Constraint constraint;
6169
private final BooleanSupplier partitionMatchSupplier;
6270
private final Optional<BucketConversion> bucketConversion;
6371
private final Optional<HiveSplit.BucketValidation> bucketValidation;
@@ -71,6 +79,7 @@ public InternalHiveSplitFactory(
7179
Map<String, String> schema,
7280
List<HivePartitionKey> partitionKeys,
7381
TupleDomain<HiveColumnHandle> effectivePredicate,
82+
Constraint constraint,
7483
BooleanSupplier partitionMatchSupplier,
7584
Map<Integer, HiveTypeName> hiveColumnCoercions,
7685
Optional<BucketConversion> bucketConversion,
@@ -84,6 +93,7 @@ public InternalHiveSplitFactory(
8493
this.strippedSchema = stripUnnecessaryProperties(requireNonNull(schema, "schema is null"));
8594
this.partitionKeys = requireNonNull(partitionKeys, "partitionKeys is null");
8695
pathDomain = getPathDomain(requireNonNull(effectivePredicate, "effectivePredicate is null"));
96+
this.constraint = requireNonNull(constraint, "constraint is null");
8797
this.partitionMatchSupplier = requireNonNull(partitionMatchSupplier, "partitionMatchSupplier is null");
8898
this.hiveColumnCoercions = ImmutableMap.copyOf(requireNonNull(hiveColumnCoercions, "hiveColumnCoercions is null"));
8999
this.bucketConversion = requireNonNull(bucketConversion, "bucketConversion is null");
@@ -144,7 +154,7 @@ private Optional<InternalHiveSplit> createInternalHiveSplit(
144154
boolean splittable,
145155
Optional<AcidInfo> acidInfo)
146156
{
147-
if (!pathMatchesPredicate(pathDomain, path)) {
157+
if (!pathMatchesPredicate(pathDomain, constraint, path)) {
148158
return Optional.empty();
149159
}
150160

@@ -259,10 +269,16 @@ private static Optional<Domain> getPathDomain(TupleDomain<HiveColumnHandle> effe
259269
.findFirst());
260270
}
261271

262-
private static boolean pathMatchesPredicate(Optional<Domain> pathDomain, String path)
272+
private static boolean pathMatchesPredicate(Optional<Domain> pathDomain, Constraint constraint, String path)
263273
{
264-
return pathDomain
265-
.map(domain -> domain.includesNullableValue(utf8Slice(path)))
266-
.orElse(true);
274+
Slice pathSlice = utf8Slice(path);
275+
if (pathDomain.isPresent() && !pathDomain.get().includesNullableValue(pathSlice)) {
276+
return false;
277+
}
278+
if (constraint.getPredicateColumns().isEmpty() || !constraint.getPredicateColumns().get().contains(pathColumnHandle())) {
279+
return true;
280+
}
281+
Predicate<Map<ColumnHandle, NullableValue>> predicate = constraint.predicate().orElse(_ -> true);
282+
return predicate.test(ImmutableMap.of(pathColumnHandle(), new NullableValue(PATH_TYPE, pathSlice)));
267283
}
268284
}

plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5599,6 +5599,33 @@ public void testBucketFilteringByInPredicate()
55995599
}
56005600
}
56015601

5602+
@Test
5603+
void testPathConstraintSplitPruning()
5604+
{
5605+
String tableName = "test_path_constraint_split_pruning_" + randomNameSuffix();
5606+
assertUpdate("CREATE TABLE " + tableName + " (id bigint, data varchar)");
5607+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a'), (2, 'b'), (3, 'c')", 3);
5608+
assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'a'), (2, 'b'), (3, 'c')", 3);
5609+
5610+
try {
5611+
Set<String> files = getTableFiles(tableName);
5612+
assertThat(files).hasSizeGreaterThanOrEqualTo(2);
5613+
String fileName = files.stream().findAny().orElseThrow();
5614+
String query = "SELECT * FROM " + tableName + " WHERE \"$path\" LIKE '%" + fileName + "%'";
5615+
assertQueryStats(
5616+
getSession(),
5617+
query,
5618+
queryStats -> {
5619+
assertThat(queryStats.getTotalDrivers()).isEqualTo(1);
5620+
assertThat(queryStats.getPhysicalInputPositions()).isEqualTo(3);
5621+
},
5622+
results -> assertThat(results.getRowCount()).isEqualTo(3));
5623+
}
5624+
finally {
5625+
assertUpdate("DROP TABLE " + tableName);
5626+
}
5627+
}
5628+
56025629
@Test
56035630
public void testSchemaMismatchesWithDereferenceProjections()
56045631
{

plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestBackgroundHiveSplitLoader.java

Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import io.trino.spi.connector.ColumnHandle;
4848
import io.trino.spi.connector.ConnectorSession;
4949
import io.trino.spi.connector.ConnectorSplitSource.ConnectorSplitBatch;
50+
import io.trino.spi.connector.Constraint;
5051
import io.trino.spi.connector.DynamicFilter;
5152
import io.trino.spi.connector.SchemaTableName;
5253
import io.trino.spi.predicate.Domain;
@@ -154,7 +155,7 @@ public void tearDown()
154155
public void testNoPathFilter()
155156
throws Exception
156157
{
157-
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(TEST_LOCATIONS, TupleDomain.none());
158+
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(TEST_LOCATIONS, TupleDomain.none(), Constraint.alwaysTrue());
158159

159160
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
160161
backgroundHiveSplitLoader.start(hiveSplitSource);
@@ -205,7 +206,8 @@ public void testPathFilter()
205206
{
206207
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
207208
TEST_LOCATIONS,
208-
LOCATION_DOMAIN);
209+
LOCATION_DOMAIN,
210+
Constraint.alwaysTrue());
209211

210212
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
211213
backgroundHiveSplitLoader.start(hiveSplitSource);
@@ -214,13 +216,32 @@ public void testPathFilter()
214216
assertThat(paths.get(0)).isEqualTo(LOCATION.toString());
215217
}
216218

219+
@Test
220+
public void testPathConstraint()
221+
throws Exception
222+
{
223+
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
224+
TEST_LOCATIONS,
225+
LOCATION_DOMAIN,
226+
new Constraint(
227+
TupleDomain.all(),
228+
LOCATION_DOMAIN.transformKeys(ColumnHandle.class::cast).asPredicate(),
229+
Set.of(pathColumnHandle())));
230+
231+
HiveSplitSource hiveSplitSource = hiveSplitSource(backgroundHiveSplitLoader);
232+
backgroundHiveSplitLoader.start(hiveSplitSource);
233+
List<String> paths = drain(hiveSplitSource);
234+
assertThat(paths).containsExactly(LOCATION.toString());
235+
}
236+
217237
@Test
218238
public void testPathFilterOneBucketMatchPartitionedTable()
219239
throws Exception
220240
{
221241
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
222242
TEST_LOCATIONS,
223243
LOCATION_DOMAIN,
244+
Constraint.alwaysTrue(),
224245
Optional.of(new HiveBucketFilter(Set.of(0, 1))),
225246
PARTITIONED_TABLE,
226247
Optional.of(new HiveTablePartitioning(true, BUCKETING_V1, BUCKET_COUNT, BUCKET_COLUMN_HANDLES, false, List.of(), true)));
@@ -239,6 +260,7 @@ public void testPathFilterBucketedPartitionedTable()
239260
BackgroundHiveSplitLoader backgroundHiveSplitLoader = backgroundHiveSplitLoader(
240261
TEST_LOCATIONS,
241262
LOCATION_DOMAIN,
263+
Constraint.alwaysTrue(),
242264
Optional.empty(),
243265
PARTITIONED_TABLE,
244266
Optional.of(
@@ -474,6 +496,7 @@ public HivePartitionMetadata next()
474496
}
475497
},
476498
TupleDomain.all(),
499+
Constraint.alwaysTrue(),
477500
DynamicFilter.EMPTY,
478501
new Duration(0, SECONDS),
479502
TESTING_TYPE_MANAGER,
@@ -796,6 +819,7 @@ public void testBuildManifestFileIterator()
796819
schema,
797820
List.of(),
798821
TupleDomain.all(),
822+
Constraint.alwaysTrue(),
799823
() -> true,
800824
ImmutableMap.of(),
801825
Optional.empty(),
@@ -837,6 +861,7 @@ public void testBuildManifestFileIteratorNestedDirectory()
837861
schema,
838862
List.of(),
839863
TupleDomain.all(),
864+
Constraint.alwaysTrue(),
840865
() -> true,
841866
ImmutableMap.of(),
842867
Optional.empty(),
@@ -875,6 +900,7 @@ public void testBuildManifestFileIteratorWithCacheInvalidation()
875900
schema,
876901
List.of(),
877902
TupleDomain.all(),
903+
Constraint.alwaysTrue(),
878904
() -> true,
879905
ImmutableMap.of(),
880906
Optional.empty(),
@@ -1056,6 +1082,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader(
10561082
return backgroundHiveSplitLoader(
10571083
fileSystemFactory,
10581084
TupleDomain.all(),
1085+
Constraint.alwaysTrue(),
10591086
dynamicFilter,
10601087
dynamicFilteringProbeBlockingTimeoutMillis,
10611088
Optional.empty(),
@@ -1079,12 +1106,14 @@ private static TrinoFileSystemFactory createTestingFileSystem(Collection<Locatio
10791106

10801107
private BackgroundHiveSplitLoader backgroundHiveSplitLoader(
10811108
List<Location> locations,
1082-
TupleDomain<HiveColumnHandle> tupleDomain)
1109+
TupleDomain<HiveColumnHandle> tupleDomain,
1110+
Constraint constraint)
10831111
throws IOException
10841112
{
10851113
return backgroundHiveSplitLoader(
10861114
locations,
10871115
tupleDomain,
1116+
constraint,
10881117
Optional.empty(),
10891118
SIMPLE_TABLE,
10901119
Optional.empty());
@@ -1093,37 +1122,23 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader(
10931122
private BackgroundHiveSplitLoader backgroundHiveSplitLoader(
10941123
List<Location> locations,
10951124
TupleDomain<HiveColumnHandle> compactEffectivePredicate,
1125+
Constraint constraint,
10961126
Optional<HiveBucketFilter> hiveBucketFilter,
10971127
Table table,
10981128
Optional<HiveTablePartitioning> tablePartitioning)
10991129
throws IOException
1100-
{
1101-
return backgroundHiveSplitLoader(
1102-
locations,
1103-
compactEffectivePredicate,
1104-
hiveBucketFilter,
1105-
table,
1106-
tablePartitioning,
1107-
Optional.empty());
1108-
}
1109-
1110-
private BackgroundHiveSplitLoader backgroundHiveSplitLoader(
1111-
List<Location> locations,
1112-
TupleDomain<HiveColumnHandle> compactEffectivePredicate,
1113-
Optional<HiveBucketFilter> hiveBucketFilter,
1114-
Table table,
1115-
Optional<HiveTablePartitioning> tablePartitioning,
1116-
Optional<ValidWriteIdList> validWriteIds)
1117-
throws IOException
11181130
{
11191131
TrinoFileSystemFactory fileSystemFactory = createTestingFileSystem(locations);
11201132
return backgroundHiveSplitLoader(
11211133
fileSystemFactory,
11221134
compactEffectivePredicate,
1135+
constraint,
1136+
DynamicFilter.EMPTY,
1137+
new Duration(0, SECONDS),
11231138
hiveBucketFilter,
11241139
table,
11251140
tablePartitioning,
1126-
validWriteIds);
1141+
Optional.empty());
11271142
}
11281143

11291144
private BackgroundHiveSplitLoader backgroundHiveSplitLoader(
@@ -1137,6 +1152,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader(
11371152
return backgroundHiveSplitLoader(
11381153
fileSystemFactory,
11391154
compactEffectivePredicate,
1155+
Constraint.alwaysTrue(),
11401156
DynamicFilter.EMPTY,
11411157
new Duration(0, SECONDS),
11421158
hiveBucketFilter,
@@ -1148,6 +1164,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader(
11481164
private BackgroundHiveSplitLoader backgroundHiveSplitLoader(
11491165
TrinoFileSystemFactory fileSystemFactory,
11501166
TupleDomain<HiveColumnHandle> compactEffectivePredicate,
1167+
Constraint constraint,
11511168
DynamicFilter dynamicFilter,
11521169
Duration dynamicFilteringProbeBlockingTimeout,
11531170
Optional<HiveBucketFilter> hiveBucketFilter,
@@ -1166,6 +1183,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader(
11661183
table,
11671184
hivePartitionMetadatas.iterator(),
11681185
compactEffectivePredicate,
1186+
constraint,
11691187
dynamicFilter,
11701188
dynamicFilteringProbeBlockingTimeout,
11711189
TESTING_TYPE_MANAGER,
@@ -1210,6 +1228,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoader(
12101228
SIMPLE_TABLE,
12111229
partitions.iterator(),
12121230
TupleDomain.none(),
1231+
Constraint.alwaysTrue(),
12131232
DynamicFilter.EMPTY,
12141233
new Duration(0, SECONDS),
12151234
TESTING_TYPE_MANAGER,
@@ -1237,6 +1256,7 @@ private BackgroundHiveSplitLoader backgroundHiveSplitLoaderOfflinePartitions()
12371256
SIMPLE_TABLE,
12381257
createPartitionMetadataWithOfflinePartitions(),
12391258
TupleDomain.all(),
1259+
Constraint.alwaysTrue(),
12401260
DynamicFilter.EMPTY,
12411261
new Duration(0, SECONDS),
12421262
TESTING_TYPE_MANAGER,

0 commit comments

Comments
 (0)