|
45 | 45 | import java.util.concurrent.TimeUnit; |
46 | 46 | import java.util.stream.Collectors; |
47 | 47 |
|
48 | | -import com.google.common.cache.CacheBuilder; |
49 | | -import com.google.common.cache.CacheLoader; |
50 | | -import com.google.common.cache.LoadingCache; |
| 48 | +import com.google.common.cache.Cache; |
51 | 49 | import com.google.common.collect.ImmutableList; |
52 | 50 | import com.google.common.collect.ImmutableSet; |
53 | 51 | import com.google.common.util.concurrent.UncheckedExecutionException; |
|
73 | 71 | import io.lettuce.core.RedisURI; |
74 | 72 | import io.lettuce.core.SslVerifyMode; |
75 | 73 | import io.lettuce.core.protocol.ProtocolVersion; |
| 74 | +import io.trino.collect.cache.EvictableCacheBuilder; |
76 | 75 | import io.trino.spi.HostAddress; |
77 | 76 | import io.trino.spi.TrinoException; |
78 | 77 | import io.trino.spi.connector.ColumnMetadata; |
@@ -106,18 +105,16 @@ public class RediSearchSession { |
106 | 105 | private final RediSearchTranslator translator; |
107 | 106 | private final AbstractRedisClient client; |
108 | 107 | private final StatefulRedisModulesConnection<String, String> connection; |
109 | | - private final LoadingCache<SchemaTableName, RediSearchTable> tableCache; |
| 108 | + private final Cache<SchemaTableName, RediSearchTable> tableCache; |
110 | 109 |
|
111 | 110 | public RediSearchSession(TypeManager typeManager, RediSearchConfig config) { |
112 | 111 | this.typeManager = requireNonNull(typeManager, "typeManager is null"); |
113 | 112 | this.config = requireNonNull(config, "config is null"); |
114 | 113 | this.translator = new RediSearchTranslator(config); |
115 | 114 | this.client = client(config); |
116 | 115 | this.connection = RedisModulesUtils.connection(client); |
117 | | - this.tableCache = CacheBuilder.newBuilder().expireAfterWrite(config.getTableCacheExpiration(), TimeUnit.SECONDS) |
118 | | - .refreshAfterWrite(config.getTableCacheRefresh(), TimeUnit.SECONDS) |
119 | | - .build(CacheLoader.from(this::loadTableSchema)); |
120 | | - |
| 116 | + this.tableCache = EvictableCacheBuilder.newBuilder() |
| 117 | + .expireAfterWrite(config.getTableCacheRefresh(), TimeUnit.SECONDS).build(); |
121 | 118 | } |
122 | 119 |
|
123 | 120 | private AbstractRedisClient client(RediSearchConfig config) { |
@@ -194,7 +191,7 @@ public Set<String> getAllTables() { |
194 | 191 |
|
195 | 192 | @SuppressWarnings("unchecked") |
196 | 193 | public void createTable(SchemaTableName schemaTableName, List<RediSearchColumnHandle> columns) { |
197 | | - String index = index(schemaTableName); |
| 194 | + String index = schemaTableName.getTableName(); |
198 | 195 | if (!connection.sync().ftList().contains(index)) { |
199 | 196 | List<Field<String>> fields = columns.stream().filter(c -> !RediSearchBuiltinField.isKeyColumn(c.getName())) |
200 | 197 | .map(c -> buildField(c.getName(), c.getType())).collect(Collectors.toList()); |
@@ -266,7 +263,8 @@ private RediSearchTable loadTableSchema(SchemaTableName schemaTableName) throws |
266 | 263 | fields.add(docField); |
267 | 264 | } |
268 | 265 | } |
269 | | - return new RediSearchTable(new RediSearchTableHandle(schemaTableName), columns.build(), indexInfo); |
| 266 | + RediSearchTableHandle tableHandle = new RediSearchTableHandle(schemaTableName, index); |
| 267 | + return new RediSearchTable(tableHandle, columns.build(), indexInfo); |
270 | 268 | } |
271 | 269 |
|
272 | 270 | private Optional<IndexInfo> indexInfo(String index) { |
@@ -339,17 +337,13 @@ private boolean isGroupOperation(AggregateOperation<String, String> operation) { |
339 | 337 | } |
340 | 338 |
|
341 | 339 | public AggregateWithCursorResults<String> cursorRead(RediSearchTableHandle tableHandle, long cursor) { |
342 | | - String index = index(tableHandle.getSchemaTableName()); |
| 340 | + String index = tableHandle.getIndex(); |
343 | 341 | if (config.getCursorCount() > 0) { |
344 | 342 | return connection.sync().ftCursorRead(index, cursor, config.getCursorCount()); |
345 | 343 | } |
346 | 344 | return connection.sync().ftCursorRead(index, cursor); |
347 | 345 | } |
348 | 346 |
|
349 | | - private String index(SchemaTableName schemaTableName) { |
350 | | - return schemaTableName.getTableName(); |
351 | | - } |
352 | | - |
353 | 347 | private Field<String> buildField(String columnName, Type columnType) { |
354 | 348 | Field.Type fieldType = toFieldType(columnType); |
355 | 349 | switch (fieldType) { |
@@ -427,7 +421,7 @@ private TypeSignature varcharType() { |
427 | 421 | } |
428 | 422 |
|
429 | 423 | public void cursorDelete(RediSearchTableHandle tableHandle, long cursor) { |
430 | | - connection.sync().ftCursorDelete(index(tableHandle.getSchemaTableName()), cursor); |
| 424 | + connection.sync().ftCursorDelete(tableHandle.getIndex(), cursor); |
431 | 425 | } |
432 | 426 |
|
433 | 427 | public Long deleteDocs(List<String> docIds) { |
|
0 commit comments