From 34f1fd03189e1c2885d5911b1e8462bd406ad83e Mon Sep 17 00:00:00 2001 From: Robert Stupp Date: Thu, 20 Nov 2025 17:39:03 +0100 Subject: [PATCH] NoSQL: Quarkus distributed cache invalidation Adds support for distributed NoSQL cache invalidation leveraging Quarkus. --- LICENSE | 10 + bom/build.gradle.kts | 1 + gradle/projects.main.properties | 1 + persistence/nosql/persistence/cdi/README.md | 24 + .../cdi/quarkus-distcache/build.gradle.kts | 60 ++ .../quarkus/distcache/AddressResolver.java | 163 +++++ .../distcache/CacheInvalidationInfra.java | 34 ++ .../distcache/CacheInvalidationReceiver.java | 172 ++++++ .../distcache/CacheInvalidationSender.java | 317 ++++++++++ ...usDistributedCacheInvalidationsConfig.java | 99 +++ .../nosql/quarkus/distcache/ResolvConf.java | 127 ++++ .../quarkus/distcache/ServerInstanceId.java | 30 + .../nosql/quarkus/distcache/package-info.java | 35 ++ .../quarkus/distcache/HttpTestServer.java | 70 +++ .../distcache/TestAddressResolver.java | 170 ++++++ .../TestCacheInvalidationReceiver.java | 186 ++++++ .../TestCacheInvalidationSender.java | 566 ++++++++++++++++++ .../quarkus/distcache/TestResolvConf.java | 108 ++++ .../src/test/resources/logback-test.xml | 34 ++ 19 files changed, 2207 insertions(+) create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/build.gradle.kts create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/AddressResolver.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationInfra.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationReceiver.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationSender.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/QuarkusDistributedCacheInvalidationsConfig.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ResolvConf.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ServerInstanceId.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/package-info.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/HttpTestServer.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestAddressResolver.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationReceiver.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationSender.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestResolvConf.java create mode 100644 persistence/nosql/persistence/cdi/quarkus-distcache/src/test/resources/logback-test.xml diff --git a/LICENSE b/LICENSE index 8f50538f8d..437a6849e0 100644 --- a/LICENSE +++ b/LICENSE @@ -230,6 +230,16 @@ License: https://www.apache.org/licenses/LICENSE-2.0 -------------------------------------------------------------------------------- +This product includes code from Netty. + +* persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ResolvConf.java + +Copyright: Copyright © 2025 The Netty project +Home page: https://netty.io/ +License: https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product includes code from OpenAPITool openapi-generator * server-templates/formParams.mustache diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index 83faf336b7..d58ea88b48 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -59,6 +59,7 @@ dependencies { api(project(":polaris-persistence-nosql-benchmark")) api(project(":polaris-persistence-nosql-correctness")) api(project(":polaris-persistence-nosql-cdi-common")) + api(project(":polaris-persistence-nosql-cdi-quarkus-distcache")) api(project(":polaris-persistence-nosql-cdi-weld")) api(project(":polaris-persistence-nosql-standalone")) api(project(":polaris-persistence-nosql-testextension")) diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index d6ecdb8309..fb34daf057 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -74,6 +74,7 @@ polaris-persistence-nosql-impl=persistence/nosql/persistence/impl polaris-persistence-nosql-benchmark=persistence/nosql/persistence/benchmark polaris-persistence-nosql-correctness=persistence/nosql/persistence/correctness polaris-persistence-nosql-cdi-common=persistence/nosql/persistence/cdi/common +polaris-persistence-nosql-cdi-quarkus-distcache=persistence/nosql/persistence/cdi/quarkus-distcache polaris-persistence-nosql-cdi-weld=persistence/nosql/persistence/cdi/weld polaris-persistence-nosql-standalone=persistence/nosql/persistence/standalone polaris-persistence-nosql-testextension=persistence/nosql/persistence/testextension diff --git a/persistence/nosql/persistence/cdi/README.md b/persistence/nosql/persistence/cdi/README.md index 8f9dc55452..2f353b7fff 100644 --- a/persistence/nosql/persistence/cdi/README.md +++ b/persistence/nosql/persistence/cdi/README.md @@ -39,3 +39,27 @@ The biggest difference between the Quarkus and Weld variants is the way how data There are also backend specific builders that leverage Quarkus extensions for the respective database backends. The Quarkus variant also adds OpenTelemetry instrumentation to the `Backend` instances. + +# Distributed cache invalidation (multiple Polaris nodes) + +Most persisted objects are immutable, which eliminates the need to explicitly invalidate objects. + +Some specific object types are intentionally mutable. +Consistency during write operations is guaranteed by using CAS operations on those objects. +Read operations, however, fetch through the cache. + +Reference pointers are mutable by design. +For writing operations, the current value of the reference pointer is always read from the +backend database. +Read operations, however, fetch the recent pointer via the cache. + +To keep the state for read operations up to date, the writing node sends the information about +the mutation to all other nodes via the distributed cache invalidation mechanism. +Short cache expiration times are used to mitigate the risk of missing cache invalidation messages. + +In k8s this works out of the box, leveraging k8s name service mechanisms being able to resolve +the set of IP addresses of all nodes in the cluster. +Non-k8s deployments need to configure the DNS resolvable names or IP addresses of all nodes in +the configuration. + +Configuration options allow enabling and disabling the cache expiration duration. diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/build.gradle.kts b/persistence/nosql/persistence/cdi/quarkus-distcache/build.gradle.kts new file mode 100644 index 0000000000..bcb65de5ca --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/build.gradle.kts @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + id("org.kordamp.gradle.jandex") + id("polaris-server") +} + +description = "Polaris NoSQL persistence, distributed cache invalidation for Quarkus." + +dependencies { + implementation(project(":polaris-persistence-nosql-cdi-common")) + implementation(project(":polaris-persistence-nosql-api")) + + implementation(platform(libs.jackson.bom)) + implementation("com.fasterxml.jackson.core:jackson-annotations") + implementation("com.fasterxml.jackson.core:jackson-databind") + + compileOnly(libs.smallrye.config.core) + + compileOnly(project(":polaris-immutables")) + annotationProcessor(project(":polaris-immutables", configuration = "processor")) + + implementation(platform(libs.quarkus.bom)) + implementation("io.quarkus:quarkus-core") + implementation("io.quarkus:quarkus-vertx-http") + + implementation(libs.jakarta.ws.rs.api) + + implementation(libs.guava) + implementation(libs.slf4j.api) + + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.inject.api) + compileOnly(libs.jakarta.enterprise.cdi.api) + + compileOnly(project(":polaris-immutables")) + annotationProcessor(project(":polaris-immutables", configuration = "processor")) + + // Must stick with the Quarkus platform versions of Vert.X + // (signature of io.vertx.core.Vertx.createHttpClient() changed from 4.5 to 5.0) + testImplementation(enforcedPlatform(libs.quarkus.bom)) +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/AddressResolver.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/AddressResolver.java new file mode 100644 index 0000000000..1e3806eac7 --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/AddressResolver.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +import static com.google.common.base.Preconditions.checkState; +import static java.net.NetworkInterface.networkInterfaces; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toUnmodifiableSet; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.dns.DnsClient; +import io.vertx.core.dns.DnsClientOptions; +import java.net.InetAddress; +import java.net.InterfaceAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.List; +import java.util.Set; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Vert.x based address resolver. + * + *

Resolves names to both IPv4 and IPv6 addresses using a given search-list. These + * functionalities are not supported vie{@code InetAddress}. + */ +record AddressResolver(DnsClient dnsClient, List searchList) { + private static final Logger LOGGER = LoggerFactory.getLogger(AddressResolver.class); + + /** Set of all locally bound IP addresses. */ + static final Set LOCAL_ADDRESSES; + + private static final boolean IP_V4_ONLY; + + static { + try { + LOCAL_ADDRESSES = + networkInterfaces() + .flatMap( + ni -> + ni.getInterfaceAddresses().stream() + // Need to do this InetAddress->byte[]->InetAddress dance to get rid of + // host-address suffixes as in `0:0:0:0:0:0:0:1%lo` + .map(InterfaceAddress::getAddress) + .map(InetAddress::getAddress) + .map( + a -> { + try { + return InetAddress.getByAddress(a); + } catch (UnknownHostException e) { + // Should never happen when calling getByAddress() with an IPv4 or + // IPv6 address + throw new RuntimeException(e); + } + }) + .map(InetAddress::getHostAddress)) + .collect(toUnmodifiableSet()); + + IP_V4_ONLY = Boolean.parseBoolean(System.getProperty("java.net.preferIPv4Stack", "false")); + } catch (SocketException e) { + throw new RuntimeException(e); + } + } + + /** + * Uses a "default" {@link DnsClient} using the first {@code nameserver} and the {@code search} + * list configured in {@code /etc/resolv.conf}. + */ + AddressResolver(Vertx vertx, long queryTimeoutMillis) { + this(createDnsClient(vertx, queryTimeoutMillis), ResolvConf.system().getSearchList()); + } + + /** + * Creates a "default" {@link DnsClient} using the first nameserver configured in {@code + * /etc/resolv.conf}. + */ + private static DnsClient createDnsClient(Vertx vertx, long queryTimeoutMillis) { + var nameservers = ResolvConf.system().getNameservers(); + checkState(!nameservers.isEmpty(), "No nameserver configured in /etc/resolv.conf"); + var nameserver = nameservers.getFirst(); + LOGGER.info( + "Using nameserver {}/{} with search list {}", + nameserver.getHostName(), + nameserver.getAddress().getHostAddress(), + ResolvConf.system().getSearchList()); + return vertx.createDnsClient( + new DnsClientOptions() + // 5 seconds should be enough to resolve + .setQueryTimeout(queryTimeoutMillis) + .setHost(nameserver.getAddress().getHostAddress()) + .setPort(nameserver.getPort())); + } + + private Future> resolveSingle(String name) { + var resultA = dnsClient.resolveA(name); + if (IP_V4_ONLY) { + return resultA; + } + return resultA.compose( + a -> + dnsClient + .resolveAAAA(name) + .map(aaaa -> Stream.concat(aaaa.stream(), a.stream()).collect(toList()))); + } + + /** Resolve a single name, used by {@link #resolveAll(List)}. */ + Future> resolve(String name) { + if (name.startsWith("=")) { + return Future.succeededFuture(List.of(name.substring(1))); + } + + // By convention, do not consult the 'search' list, when the name to query ends with a dot. + var exact = name.endsWith("."); + var query = exact ? name.substring(0, name.length() - 1) : name; + var future = resolveSingle(query); + if (!exact) { + // Consult the 'search' list if the above 'resolveName' fails. + for (var search : searchList) { + future = future.recover(t -> resolveSingle(query + '.' + search)); + } + } + + return future; + } + + /** Resolve all names in parallel. */ + Future> resolveAll(List names) { + var composite = Future.all(names.stream().map(this::resolve).collect(toList())); + return composite.map( + c -> + IntStream.range(0, c.size()) + .mapToObj(c::resultAt) + .map( + e -> { + @SuppressWarnings("unchecked") + var casted = (List) e; + return casted.stream(); + }) + .reduce(Stream::concat) + .map(s -> s.collect(toList())) + .orElse(List.of())); + } +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationInfra.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationInfra.java new file mode 100644 index 0000000000..e2d0f1b90c --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationInfra.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Produces; +import java.util.UUID; + +@ApplicationScoped +class CacheInvalidationInfra { + + /** Produces a random, ephemeral server instance ID. */ + @Produces + @ApplicationScoped + ServerInstanceId ephemeralServerInstanceId() { + return ServerInstanceId.of(UUID.randomUUID().toString()); + } +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationReceiver.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationReceiver.java new file mode 100644 index 0000000000..d106043fc2 --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationReceiver.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import static java.util.Collections.emptyList; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.quarkus.vertx.http.ManagementInterface; +import io.vertx.ext.web.RoutingContext; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations; +import org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictObj; +import org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictReference; +import org.apache.polaris.persistence.nosql.api.cache.DistributedCacheInvalidation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Receiver for distributed cache invalidation messages. + * + *

This bean is automatically initialized via the {@code @Observes} annotation of {@link + * #registerManagementRoutes(ManagementInterface)}. See Quarkus + * docs on management endpoint applications. + */ +@ApplicationScoped +class CacheInvalidationReceiver { + static final String CACHE_INVALIDATION_TOKEN_HEADER = "Polaris-Cache-Invalidation-Token"; + + private static final Logger LOGGER = LoggerFactory.getLogger(CacheInvalidationReceiver.class); + + private final DistributedCacheInvalidation distributedCacheInvalidation; + private final String serverInstanceId; + private final Set validTokens; + private final String invalidationPath; + private final ObjectMapper objectMapper; + + @SuppressWarnings("CdiInjectionPointsInspection") + @Inject + CacheInvalidationReceiver( + QuarkusDistributedCacheInvalidationsConfig storeConfig, + ServerInstanceId serverInstanceId, + DistributedCacheInvalidation.Receiver distributedCacheInvalidation) { + this.distributedCacheInvalidation = distributedCacheInvalidation; + this.serverInstanceId = serverInstanceId.instanceId(); + this.invalidationPath = storeConfig.cacheInvalidationUri(); + this.validTokens = + new HashSet<>(storeConfig.cacheInvalidationValidTokens().orElse(emptyList())); + this.objectMapper = + new ObjectMapper() + // forward compatibility + .disable(FAIL_ON_UNKNOWN_PROPERTIES); + } + + void registerManagementRoutes(@Observes ManagementInterface mi) { + mi.router().post(invalidationPath).handler(this::cacheInvalidations); + LOGGER.info("Registered cache invalidation management endpoint {}", invalidationPath); + } + + void cacheInvalidations(RoutingContext rc) { + var request = rc.request(); + var senderId = request.getParam("sender"); + var token = request.getHeader(CACHE_INVALIDATION_TOKEN_HEADER); + + cacheInvalidations( + rc, + () -> { + try { + var json = rc.body().asString(); + if (json == null || json.isEmpty()) { + return CacheInvalidations.cacheInvalidations(emptyList()); + } + return objectMapper.readValue(json, CacheInvalidations.class); + } catch (Exception e) { + LOGGER.error("Failed to deserialize cache invalidation", e); + return CacheInvalidations.cacheInvalidations(emptyList()); + } + }, + senderId, + token); + } + + void cacheInvalidations( + RoutingContext rc, + Supplier invalidations, + String senderId, + String token) { + if (token == null || !validTokens.contains(token)) { + LOGGER.warn("Received cache invalidation with invalid token {}", token); + responseInvalidToken(rc); + return; + } + if (serverInstanceId.equals(senderId)) { + LOGGER.trace("Ignoring invalidations from local instance"); + responseNoContent(rc); + return; + } + if (!"application/json".equals(rc.request().getHeader("Content-Type"))) { + LOGGER.warn("Received cache invalidation with invalid HTTP content type"); + responseInvalidContentType(rc); + return; + } + + List invalidationList; + try { + invalidationList = invalidations.get().invalidations(); + } catch (RuntimeException e) { + responseServerError(rc); + return; + } + + var cacheInvalidation = distributedCacheInvalidation; + if (cacheInvalidation != null) { + for (CacheInvalidations.CacheInvalidation invalidation : invalidationList) { + switch (invalidation.type()) { + case CacheInvalidationEvictObj.TYPE -> { + var putObj = (CacheInvalidationEvictObj) invalidation; + cacheInvalidation.evictObj(putObj.realmId(), putObj.id()); + } + case CacheInvalidationEvictReference.TYPE -> { + var putReference = (CacheInvalidationEvictReference) invalidation; + cacheInvalidation.evictReference(putReference.realmId(), putReference.ref()); + } + default -> { + // nothing we can do about a new invalidation type here + } + } + } + } + + responseNoContent(rc); + } + + private void responseServerError(RoutingContext rc) { + rc.response().setStatusCode(500).setStatusMessage("Server error parsing request body").end(); + } + + private void responseInvalidToken(RoutingContext rc) { + rc.response().setStatusCode(400).setStatusMessage("Invalid token").end(); + } + + private void responseInvalidContentType(RoutingContext rc) { + rc.response().setStatusCode(415).setStatusMessage("Unsupported media type").end(); + } + + private void responseNoContent(RoutingContext rc) { + rc.response().setStatusCode(204).setStatusMessage("No content").end(); + } +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationSender.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationSender.java new file mode 100644 index 0000000000..b23731712c --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/CacheInvalidationSender.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; +import static java.util.Collections.emptyList; +import static org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictObj.cacheInvalidationEvictObj; +import static org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictReference.cacheInvalidationEvictReference; +import static org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.cacheInvalidations; +import static org.apache.polaris.persistence.nosql.quarkus.distcache.AddressResolver.LOCAL_ADDRESSES; +import static org.apache.polaris.persistence.nosql.quarkus.distcache.CacheInvalidationReceiver.CACHE_INVALIDATION_TOKEN_HEADER; +import static org.apache.polaris.persistence.nosql.quarkus.distcache.QuarkusDistributedCacheInvalidationsConfig.CACHE_INVALIDATIONS_CONFIG_PREFIX; +import static org.apache.polaris.persistence.nosql.quarkus.distcache.QuarkusDistributedCacheInvalidationsConfig.CONFIG_SERVICE_NAMES; +import static org.apache.polaris.persistence.nosql.quarkus.distcache.QuarkusDistributedCacheInvalidationsConfig.CONFIG_VALID_TOKENS; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import io.quarkus.runtime.Startup; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientResponse; +import io.vertx.core.http.HttpMethod; +import jakarta.annotation.Nonnull; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidation; +import org.apache.polaris.persistence.nosql.api.cache.DistributedCacheInvalidation; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sender for distributed cache invalidation messages. + * + *

This bean is automatically initialized via the {@code @Observes} annotation, and automatically + * injected via the implemented {@link DistributedCacheInvalidation.Sender} interface. + */ +@ApplicationScoped +@Startup +class CacheInvalidationSender implements DistributedCacheInvalidation.Sender { + private static final Logger LOGGER = LoggerFactory.getLogger(CacheInvalidationSender.class); + + private final Vertx vertx; + private final long serviceNameLookupIntervalMillis; + + private final HttpClient httpClient; + private final AddressResolver addressResolver; + + private final List serviceNames; + private final int httpPort; + private final String invalidationUri; + private final long requestTimeout; + + private final ObjectMapper objectMapper = new ObjectMapper(); + private final Lock lock = new ReentrantLock(); + private final int batchSize; + private final BlockingQueue invalidations = new LinkedBlockingQueue<>(); + private boolean triggered; + private final String token; + + /** Contains the IPv4/6 addresses resolved from {@link #serviceNames}. */ + private volatile List resolvedAddresses = emptyList(); + + @Inject + CacheInvalidationSender( + @SuppressWarnings("CdiInjectionPointsInspection") Vertx vertx, + QuarkusDistributedCacheInvalidationsConfig config, + @ConfigProperty(name = "quarkus.management.port") int httpPort, + ServerInstanceId serverInstanceId) { + this.vertx = vertx; + + this.addressResolver = new AddressResolver(vertx, config.dnsQueryTimeout().toMillis()); + this.requestTimeout = + config + .cacheInvalidationRequestTimeout() + .orElse(Duration.of(30, ChronoUnit.SECONDS)) + .toMillis(); + this.httpClient = vertx.createHttpClient(); + this.serviceNames = config.cacheInvalidationServiceNames().orElse(emptyList()); + this.httpPort = httpPort; + this.invalidationUri = + config.cacheInvalidationUri() + "?sender=" + serverInstanceId.instanceId(); + this.serviceNameLookupIntervalMillis = + config.cacheInvalidationServiceNameLookupInterval().toMillis(); + this.batchSize = config.cacheInvalidationBatchSize(); + this.token = config.cacheInvalidationValidTokens().map(List::getFirst).orElse(null); + if (!serviceNames.isEmpty()) { + try { + LOGGER.info("Sending remote cache invalidations to service name(s) {}", serviceNames); + // Wait for the initial name service resolution to complete. + updateServiceNames().toCompletionStage().toCompletableFuture().get(); + if (config.cacheInvalidationValidTokens().isEmpty()) { + LOGGER.warn( + "No token configured for cache invalidation messages - will not send any invalidation message. You need to configure the token(s) via {}.{}", + CACHE_INVALIDATIONS_CONFIG_PREFIX, + CONFIG_VALID_TOKENS); + } + } catch (Exception e) { + throw new RuntimeException( + "Failed to resolve service names " + serviceNames + " for remote cache invalidations", + (e instanceof ExecutionException) ? e.getCause() : e); + } + } else if (token != null) { + LOGGER.warn( + "No service names are configured to send cache invalidation messages to - will not send any invalidation message. You need to configure the service name(s) via {}.{}", + CACHE_INVALIDATIONS_CONFIG_PREFIX, + CONFIG_SERVICE_NAMES); + } + } + + private Future> updateServiceNames() { + var previous = new HashSet<>(resolvedAddresses); + return resolveServiceNames(serviceNames) + .map(all -> all.stream().filter(adr -> !LOCAL_ADDRESSES.contains(adr)).toList()) + .onSuccess( + all -> { + // refresh addresses regularly + scheduleServiceNameResolution(); + + var resolved = new HashSet<>(all); + if (!resolved.equals(previous)) { + LOGGER.info( + "Service names for remote cache invalidations {} now resolve to {}", + serviceNames, + all); + } + + updateResolvedAddresses(all); + }) + .onFailure( + t -> { + // refresh addresses regularly + scheduleServiceNameResolution(); + + LOGGER.warn("Failed to resolve service names: {}", t.toString()); + }); + } + + @VisibleForTesting + void updateResolvedAddresses(List all) { + resolvedAddresses = all; + } + + private void scheduleServiceNameResolution() { + vertx.setTimer(serviceNameLookupIntervalMillis, x -> updateServiceNames()); + } + + @VisibleForTesting + Future> resolveServiceNames(List serviceNames) { + return addressResolver.resolveAll(serviceNames); + } + + void enqueue(CacheInvalidation invalidation) { + if (serviceNames.isEmpty() || token == null) { + // Don't do anything if there are no targets to send invalidations to or whether no token has + // been configured. + return; + } + + lock.lock(); + try { + invalidations.add(invalidation); + + if (!triggered) { + LOGGER.trace("Triggered invalidation submission"); + vertx.executeBlocking(this::sendInvalidations); + triggered = true; + } + } finally { + lock.unlock(); + } + } + + private Void sendInvalidations() { + var batch = new ArrayList(batchSize); + try { + while (true) { + lock.lock(); + try { + invalidations.drainTo(batch, 100); + if (batch.isEmpty()) { + LOGGER.trace("Done sending invalidations"); + triggered = false; + break; + } + } finally { + lock.unlock(); + } + submit(batch, resolvedAddresses); + batch = new ArrayList<>(batchSize); + } + } finally { + // Handle the very unlikely case that the call to submit() failed and we cannot be sure that + // the current batch was submitted. + if (!batch.isEmpty()) { + lock.lock(); + try { + invalidations.addAll(batch); + triggered = false; + } finally { + lock.unlock(); + } + } + } + return null; + } + + @VisibleForTesting + List>> submit( + List batch, List resolvedAddresses) { + LOGGER.trace("Submitting {} invalidations", batch.size()); + + String json; + try { + json = objectMapper.writeValueAsString(cacheInvalidations(batch)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + var futures = + new ArrayList>>(resolvedAddresses.size()); + for (var address : resolvedAddresses) { + futures.add( + httpClient + .request(HttpMethod.POST, httpPort, address, invalidationUri) + .compose( + req -> + req.putHeader("Content-Type", APPLICATION_JSON) + .putHeader(CACHE_INVALIDATION_TOKEN_HEADER, token) + .send(json)) + .compose(resp -> resp.body().map(b -> Map.entry(resp, b))) + .timeout(requestTimeout, TimeUnit.MILLISECONDS) + .onComplete( + success -> { + var resp = success.getKey(); + var statusCode = resp.statusCode(); + if (statusCode != 200 && statusCode != 204) { + LOGGER.warn( + "{} cache invalidations could not be sent to {}:{}{} - HTTP {}/{} - body: {}", + batch.size(), + address, + httpPort, + invalidationUri, + statusCode, + resp.statusMessage(), + success.getValue()); + } else { + LOGGER.trace( + "{} cache invalidations sent to {}:{}", batch.size(), address, httpPort); + } + }, + failure -> { + if (failure instanceof SocketException + || failure instanceof UnknownHostException) { + LOGGER.warn( + "Technical network issue sending cache invalidations to {}:{}{} : {}", + address, + httpPort, + invalidationUri, + failure.getMessage()); + } else { + LOGGER.error( + "Technical failure sending cache invalidations to {}:{}{}", + address, + httpPort, + invalidationUri, + failure); + } + })); + } + return futures; + } + + @Override + public void evictReference(@Nonnull String repositoryId, @Nonnull String refName) { + enqueue(cacheInvalidationEvictReference(repositoryId, refName)); + } + + @Override + public void evictObj(@Nonnull String repositoryId, @Nonnull ObjRef objId) { + enqueue(cacheInvalidationEvictObj(repositoryId, objId)); + } +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/QuarkusDistributedCacheInvalidationsConfig.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/QuarkusDistributedCacheInvalidationsConfig.java new file mode 100644 index 0000000000..643e44cb6f --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/QuarkusDistributedCacheInvalidationsConfig.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +import static org.apache.polaris.persistence.nosql.quarkus.distcache.QuarkusDistributedCacheInvalidationsConfig.CACHE_INVALIDATIONS_CONFIG_PREFIX; + +import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import io.smallrye.config.WithName; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import org.apache.polaris.immutables.PolarisImmutable; + +@PolarisImmutable +@ConfigMapping(prefix = CACHE_INVALIDATIONS_CONFIG_PREFIX) +public interface QuarkusDistributedCacheInvalidationsConfig { + + String CACHE_INVALIDATIONS_CONFIG_PREFIX = "polaris.persistence.distributed-cache-invalidations"; + String CONFIG_VALID_TOKENS = "valid-tokens"; + String CONFIG_SERVICE_NAMES = "service-names"; + String CONFIG_URI = "uri"; + String CONFIG_BATCH_SIZE = "batch-size"; + String CONFIG_SERVICE_NAME_LOOKUP_INTERVAL = "service-name-lookup-interval"; + String CONFIG_REQUEST_TIMEOUT = "request-timeout"; + String CONFIG_DNS_QUERY_TIMEOUT = "dns.query-timeout"; + + /** + * Host names or IP addresses or kubernetes headless-service name of all Polaris server instances + * accessing the same repository. + * + *

This value is automatically configured via the Polaris Helm chart, additional configuration + * is not required. + * + *

If you have your own Helm chart or custom deployment, make sure to configure the IPs of all + * Polaris instances here. + * + *

Names that start with an equal sign are not resolved but used "as is". + */ + @WithName(CONFIG_SERVICE_NAMES) + Optional> cacheInvalidationServiceNames(); + + /** + * List of cache-invalidation tokens to authenticate incoming cache-invalidation messages. + * + *

The first token is used in outgoing cache-invalidation messages. + */ + @WithName(CONFIG_VALID_TOKENS) + Optional> cacheInvalidationValidTokens(); + + /** + * URI of the cache-invalidation endpoint, only available on the Quarkus management port, defaults + * to 9000. + */ + @WithName(CONFIG_URI) + @WithDefault("/polaris-management/cache-coherency") + String cacheInvalidationUri(); + + /** + * Interval of service-name lookups to resolve the {@linkplain #cacheInvalidationServiceNames() + * service names} into IP addresses. + */ + @WithName(CONFIG_SERVICE_NAME_LOOKUP_INTERVAL) + @WithDefault("PT10S") + Duration cacheInvalidationServiceNameLookupInterval(); + + /** Maximum number of cache-invalidation messages to send in a single request to peer nodes. */ + @WithName(CONFIG_BATCH_SIZE) + @WithDefault("20") + int cacheInvalidationBatchSize(); + + /** + * Request timeout for sent cache-invalidation messages. Timeouts trigger a warning or error + * message. + */ + @WithName(CONFIG_REQUEST_TIMEOUT) + Optional cacheInvalidationRequestTimeout(); + + /** Timeout for DNS queries to resolve peer nodes. */ + @WithName(CONFIG_DNS_QUERY_TIMEOUT) + @WithDefault("PT5S") + Duration dnsQueryTimeout(); +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ResolvConf.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ResolvConf.java new file mode 100644 index 0000000000..6900097199 --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ResolvConf.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +// Code mostly copied from io.netty.resolver.dns.ResolvConf, but with the addition to extract +// the 'search' option values. +// +// Marker for Polaris LICENSE file - keep it +// CODE_COPIED_TO_POLARIS + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.unmodifiableList; + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Looks up the {@code nameserver}s and {@code search} domains from the {@code /etc/resolv.conf} + * file, intended for Linux and macOS only. + * + *

Extracting the {@code nameserver}s and {@code search} domains is necessary to correctly + * resolve names in k8s environments. + */ +final class ResolvConf { + private final List nameservers; + private final List searchList; + + /** + * Reads from the given reader and extracts the {@code nameserver}s and {@code search} domains + * using the syntax of the {@code /etc/resolv.conf} file, see {@code man resolv.conf}. + * + * @param reader contents of {@code resolv.conf} are read from this {@link BufferedReader}, up to + * the caller to close it + */ + static ResolvConf fromReader(BufferedReader reader) throws IOException { + return new ResolvConf(reader); + } + + /** + * Reads the given file and extracts the {@code nameserver}s and {@code search} domains using the + * syntax of the {@code /etc/resolv.conf} file, see {@code man resolv.conf}. + */ + static ResolvConf fromFile(String file) throws IOException { + try (var fileReader = new FileReader(file, UTF_8); + BufferedReader reader = new BufferedReader(fileReader)) { + return fromReader(reader); + } + } + + /** + * Returns the {@code nameserver}s and {@code search} domains from the {@code /etc/resolv.conf} + * file. The file is only read once during the lifetime of this class. + */ + static ResolvConf system() { + var resolvConv = ResolvConfLazy.machineResolvConf; + if (resolvConv != null) { + return resolvConv; + } + throw new IllegalStateException("/etc/resolv.conf could not be read"); + } + + private ResolvConf(BufferedReader reader) throws IOException { + var nameservers = new ArrayList(); + var searchList = new ArrayList(); + String ln; + while ((ln = reader.readLine()) != null) { + ln = ln.trim(); + if (ln.isEmpty()) { + continue; + } + + if (ln.startsWith("nameserver")) { + ln = ln.substring("nameserver".length()).trim(); + nameservers.add(new InetSocketAddress(ln, 53)); + } + if (ln.startsWith("search")) { + ln = ln.substring("search".length()).trim(); + searchList.addAll(Arrays.asList(ln.split(" "))); + } + } + this.nameservers = unmodifiableList(nameservers); + this.searchList = unmodifiableList(searchList); + } + + List getNameservers() { + return nameservers; + } + + List getSearchList() { + return searchList; + } + + private static final class ResolvConfLazy { + static final ResolvConf machineResolvConf; + + static { + ResolvConf resolvConf; + try { + resolvConf = ResolvConf.fromFile("/etc/resolv.conf"); + } catch (IOException e) { + resolvConf = null; + } + machineResolvConf = resolvConf; + } + } +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ServerInstanceId.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ServerInstanceId.java new file mode 100644 index 0000000000..f39c0540f4 --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ServerInstanceId.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +import org.apache.polaris.immutables.PolarisImmutable; + +@PolarisImmutable +interface ServerInstanceId { + String instanceId(); + + static ServerInstanceId of(String instanceId) { + return ImmutableServerInstanceId.builder().instanceId(instanceId).build(); + } +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/package-info.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/package-info.java new file mode 100644 index 0000000000..f10e51718c --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/package-info.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Distributed cache invalidation for the NoSQL cache. + * + *

Provides both a receiver and a sender for {@link + * org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations cache invalidation messages}. + * + *

The receiver registers a route on the Quarkus management interface. Senders emit asynchronous + * requests to the cache-invalidation management endpoint. + * + *

Each Polaris instance submits its ephemeral instance-ID in the invalidation messages to + * prevent the risk of processing loopback messages as a safety net. + * + *

All polaris instances share a common token (dynamically generated via helm chart mechanisms) + * to protect against externally injected, malicious invalidation messages. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/HttpTestServer.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/HttpTestServer.java new file mode 100644 index 0000000000..c89a44545e --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/HttpTestServer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; + +/** HTTP test server. */ +public class HttpTestServer implements AutoCloseable { + private final HttpServer server; + + public HttpTestServer(String context, HttpHandler handler) throws IOException { + this(new InetSocketAddress("localhost", 0), context, handler); + } + + public HttpTestServer(InetSocketAddress bind, String context, HttpHandler handler) + throws IOException { + HttpHandler safeHandler = + exchange -> { + try { + handler.handle(exchange); + } catch (RuntimeException | Error e) { + exchange.sendResponseHeaders(503, 0); + throw e; + } + }; + server = HttpServer.create(bind, 0); + server.createContext(context, safeHandler); + server.setExecutor(null); + + server.start(); + } + + public InetSocketAddress getAddress() { + return server.getAddress(); + } + + public URI getUri() { + return URI.create( + "http://" + + getAddress().getAddress().getHostAddress() + + ":" + + getAddress().getPort() + + "/"); + } + + @Override + public void close() { + server.stop(0); + } +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestAddressResolver.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestAddressResolver.java new file mode 100644 index 0000000000..3e08793f05 --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestAddressResolver.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; +import static org.apache.polaris.persistence.nosql.quarkus.distcache.AddressResolver.LOCAL_ADDRESSES; + +import io.vertx.core.Vertx; +import io.vertx.core.dns.DnsException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestAddressResolver { + @InjectSoftAssertions protected SoftAssertions soft; + + protected Vertx vertx; + AddressResolver addressResolver; + + @BeforeEach + void setUp() { + vertx = Vertx.builder().build(); + } + + @AfterEach + void tearDown() throws Exception { + try { + if (addressResolver != null) { + addressResolver + .dnsClient() + .close() + .toCompletionStage() + .toCompletableFuture() + .get(1, TimeUnit.MINUTES); + } + } finally { + try { + vertx.close().toCompletionStage().toCompletableFuture().get(1, TimeUnit.MINUTES); + } finally { + vertx = null; + } + } + } + + @Test + public void resolveNoName() throws Exception { + addressResolver = new AddressResolver(vertx, 5000); + soft.assertThat( + addressResolver + .resolveAll(Collections.emptyList()) + .toCompletionStage() + .toCompletableFuture() + .get(1, TimeUnit.MINUTES)) + .isEmpty(); + } + + @Test + public void resolveGoodName() throws Exception { + addressResolver = new AddressResolver(vertx, 5000); + + AddressResolver addressResolverWithSearch = + new AddressResolver(addressResolver.dnsClient(), List.of("org")); + + List withoutSearchList = + addressResolver + .resolveAll(singletonList("projectnessie.org")) + .toCompletionStage() + .toCompletableFuture() + .get(1, TimeUnit.MINUTES); + soft.assertThat(withoutSearchList).isNotEmpty(); + + List withSearchList1 = + addressResolverWithSearch + .resolveAll(singletonList("projectnessie.org")) + .toCompletionStage() + .toCompletableFuture() + .get(1, TimeUnit.MINUTES); + soft.assertThat(withoutSearchList).isNotEmpty(); + soft.assertThat(withSearchList1).isNotEmpty().isNotEmpty(); + soft.assertThat(withSearchList1).containsExactlyInAnyOrderElementsOf(withoutSearchList); + + List withSearchList2 = + addressResolverWithSearch + .resolveAll(singletonList("projectnessie")) + .toCompletionStage() + .toCompletableFuture() + .get(1, TimeUnit.MINUTES); + soft.assertThat(withSearchList2).isNotEmpty(); + soft.assertThat(withSearchList2).containsExactlyInAnyOrderElementsOf(withoutSearchList); + + List withSearchListQualified = + addressResolverWithSearch + .resolveAll(singletonList("projectnessie.org.")) + .toCompletionStage() + .toCompletableFuture() + .get(1, TimeUnit.MINUTES); + soft.assertThat(withSearchListQualified).isNotEmpty(); + + soft.assertThat(withSearchListQualified).containsExactlyInAnyOrderElementsOf(withoutSearchList); + } + + @Test + @DisabledOnOs(value = OS.MAC, disabledReason = "Resolving 'localhost' doesn't work on macOS") + public void resolveSingleName() throws Exception { + addressResolver = new AddressResolver(vertx, 5000); + soft.assertThat( + addressResolver + .resolveAll(singletonList("localhost")) + .toCompletionStage() + .toCompletableFuture() + .get(1, TimeUnit.MINUTES)) + .isNotEmpty() + .containsAnyOf("0:0:0:0:0:0:0:1", "127.0.0.1"); + } + + @Test + public void resolveBadName() { + addressResolver = new AddressResolver(vertx, 5000); + soft.assertThat( + addressResolver + .resolveAll(singletonList("wepofkjeopiwkf.wepofkeowpkfpoew.weopfkewopfk.local")) + .toCompletionStage() + .toCompletableFuture()) + .failsWithin(1, TimeUnit.MINUTES) + .withThrowableThat() + .withCauseInstanceOf(DnsException.class); + } + + @Test + @DisabledOnOs(value = OS.MAC, disabledReason = "Resolving 'localhost' doesn't work on macOS") + public void resolveFilterLocalAddresses() throws Exception { + addressResolver = new AddressResolver(vertx, 5000); + soft.assertThat( + addressResolver + .resolveAll(singletonList("localhost")) + .map( + s -> s.stream().filter(adr -> !LOCAL_ADDRESSES.contains(adr)).collect(toList())) + .toCompletionStage() + .toCompletableFuture() + .get(1, TimeUnit.MINUTES)) + .isEmpty(); + } +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationReceiver.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationReceiver.java new file mode 100644 index 0000000000..e6e7d4073f --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationReceiver.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +import static java.util.Collections.singletonList; +import static org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictObj.cacheInvalidationEvictObj; +import static org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictReference.cacheInvalidationEvictReference; +import static org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.cacheInvalidations; +import static org.apache.polaris.persistence.nosql.quarkus.distcache.CacheInvalidationReceiver.CACHE_INVALIDATION_TOKEN_HEADER; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.vertx.core.Future; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.ext.web.RequestBody; +import io.vertx.ext.web.RoutingContext; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; +import org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations; +import org.apache.polaris.persistence.nosql.api.cache.DistributedCacheInvalidation; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.junit.jupiter.api.Test; + +public class TestCacheInvalidationReceiver { + private static final ObjRef SOME_OBJ_REF = ObjRef.objRef("foo", 1234); + + @Test + public void senderReceiver() throws Exception { + var distributedCacheInvalidation = mock(DistributedCacheInvalidation.Receiver.class); + + var token = "cafe"; + var tokens = singletonList(token); + var receiverId = ServerInstanceId.of("receiverId"); + var senderId = ServerInstanceId.of("senderId"); + + var receiver = buildReceiver(tokens, receiverId, distributedCacheInvalidation); + + var invalidations = cacheInvalidations(allInvalidationTypes()); + + var rc = + expectResponse( + r -> { + when(r.getParam("sender")).thenReturn(senderId.instanceId()); + when(r.getHeader(CACHE_INVALIDATION_TOKEN_HEADER)).thenReturn(token); + }); + var reqBody = mock(RequestBody.class); + when(reqBody.asString()).thenReturn(new ObjectMapper().writeValueAsString(invalidations)); + when(rc.body()).thenReturn(reqBody); + + receiver.cacheInvalidations(rc); + + verify(rc.response()).setStatusCode(204); + verify(rc.response()).setStatusMessage("No content"); + + verify(distributedCacheInvalidation).evictObj("repo", SOME_OBJ_REF); + verify(distributedCacheInvalidation).evictReference("repo", "refs/foo/bar"); + verifyNoMoreInteractions(distributedCacheInvalidation); + } + + @Test + public void doesNotAcceptInvalidationsWithoutTokens() { + var distributedCacheInvalidation = mock(DistributedCacheInvalidation.Receiver.class); + + var token = "cafe"; + var tokens = List.of(); + var receiverId = ServerInstanceId.of("receiverId"); + var senderId = ServerInstanceId.of("senderId"); + + var receiver = buildReceiver(tokens, receiverId, distributedCacheInvalidation); + + var rc = expectResponse(); + receiver.cacheInvalidations( + rc, () -> cacheInvalidations(allInvalidationTypes()), senderId.instanceId(), token); + + verify(rc.response()).setStatusCode(400); + verify(rc.response()).setStatusMessage("Invalid token"); + + verifyNoMoreInteractions(distributedCacheInvalidation); + } + + @Test + public void receiveFromSelf() { + var distributedCacheInvalidation = mock(DistributedCacheInvalidation.Receiver.class); + + var token = "cafe"; + var tokens = singletonList(token); + var receiverId = ServerInstanceId.of("receiverId"); + + var receiver = buildReceiver(tokens, receiverId, distributedCacheInvalidation); + + var rc = expectResponse(); + receiver.cacheInvalidations( + rc, () -> cacheInvalidations(allInvalidationTypes()), receiverId.instanceId(), token); + + verify(rc.response()).setStatusCode(204); + verify(rc.response()).setStatusMessage("No content"); + + verifyNoMoreInteractions(distributedCacheInvalidation); + } + + @Test + public void unknownToken() { + var distributedCacheInvalidation = mock(DistributedCacheInvalidation.Receiver.class); + + var token = "cafe"; + var tokens = singletonList(token); + var differentToken = "otherToken"; + var receiverId = ServerInstanceId.of("receiverId"); + var senderId = ServerInstanceId.of("senderId"); + + CacheInvalidationReceiver receiver = + buildReceiver(tokens, receiverId, distributedCacheInvalidation); + + RoutingContext rc = expectResponse(); + receiver.cacheInvalidations( + rc, + () -> cacheInvalidations(allInvalidationTypes()), + senderId.instanceId(), + differentToken); + + verify(rc.response()).setStatusCode(400); + verify(rc.response()).setStatusMessage("Invalid token"); + + verifyNoMoreInteractions(distributedCacheInvalidation); + } + + private RoutingContext expectResponse() { + return expectResponse(r -> {}); + } + + private RoutingContext expectResponse(Consumer requestMocker) { + var response = mock(HttpServerResponse.class); + when(response.setStatusCode(anyInt())).thenReturn(response); + when(response.setStatusMessage(anyString())).thenReturn(response); + when(response.end()).thenReturn(Future.succeededFuture()); + + var request = mock(HttpServerRequest.class); + when(request.getHeader("Content-Type")).thenReturn("application/json"); + requestMocker.accept(request); + + var rc = mock(RoutingContext.class); + when(rc.response()).thenReturn(response); + when(rc.request()).thenReturn(request); + return rc; + } + + private static CacheInvalidationReceiver buildReceiver( + List tokens, + ServerInstanceId receiverId, + DistributedCacheInvalidation.Receiver distCacheInvalidation) { + QuarkusDistributedCacheInvalidationsConfig config = + mock(QuarkusDistributedCacheInvalidationsConfig.class); + when(config.cacheInvalidationValidTokens()).thenReturn(Optional.of(tokens)); + + return new CacheInvalidationReceiver(config, receiverId, distCacheInvalidation); + } + + List allInvalidationTypes() { + return List.of( + cacheInvalidationEvictReference("repo", "refs/foo/bar"), + cacheInvalidationEvictObj("repo", SOME_OBJ_REF)); + } +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationSender.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationSender.java new file mode 100644 index 0000000000..0981747107 --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestCacheInvalidationSender.java @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +import static io.vertx.core.Future.failedFuture; +import static io.vertx.core.Future.succeededFuture; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Collections.singletonList; +import static org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictObj.cacheInvalidationEvictObj; +import static org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidationEvictReference.cacheInvalidationEvictReference; +import static org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.cacheInvalidations; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientResponse; +import java.io.InputStream; +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Stream; +import org.apache.polaris.persistence.nosql.api.cache.CacheInvalidations.CacheInvalidation; +import org.apache.polaris.persistence.nosql.api.cache.DistributedCacheInvalidation; +import org.apache.polaris.persistence.nosql.api.obj.ObjRef; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestCacheInvalidationSender { + private static final ObjRef SOME_OBJ_REF = ObjRef.objRef("foo", 1234); + + @InjectSoftAssertions protected SoftAssertions soft; + + protected Vertx vertx; + + @BeforeEach + void setUp() { + vertx = Vertx.builder().build(); + } + + @AfterEach + void tearDown() throws Exception { + try { + vertx.close().toCompletionStage().toCompletableFuture().get(1, TimeUnit.MINUTES); + } finally { + vertx = null; + } + } + + @Test + public void serviceNameLookupFailure() { + var senderId = ServerInstanceId.of("senderId"); + + var token = "token"; + var tokens = singletonList(token); + + var config = + buildConfig( + tokens, + Optional.of(singletonList("serviceName")), + Duration.ofSeconds(10), + Duration.ofSeconds(10)); + + soft.assertThatThrownBy( + () -> + new CacheInvalidationSender(vertx, config, 80, senderId) { + @Override + Future> resolveServiceNames(List serviceNames) { + return failedFuture(new RuntimeException("foo")); + } + + @Override + List>> submit( + List batch, List resolvedAddresses) { + soft.fail("Not expected"); + return null; + } + }) + .hasMessage("Failed to resolve service names [serviceName] for remote cache invalidations") + .cause() + .hasMessage("foo"); + } + + @Test + public void regularServiceNameLookups() throws Exception { + var senderId = ServerInstanceId.of("senderId"); + + var token = "token"; + var tokens = singletonList(token); + + var config = + buildConfig( + tokens, + Optional.of(singletonList("serviceName")), + Duration.ofMillis(1), + Duration.ofSeconds(10)); + + var resolveSemaphore = new Semaphore(1); + var continueSemaphore = new Semaphore(0); + var submittedSemaphore = new Semaphore(0); + var updateResolvedSemaphore = new Semaphore(0); + var currentAddresses = List.of("127.1.1.1"); + var resolveResult = new AtomicReference<>(succeededFuture(currentAddresses)); + var submitResolvedAddresses = new AtomicReference>(); + + try { + CacheInvalidationSender sender = + new CacheInvalidationSender(vertx, config, 80, senderId) { + @Override + Future> resolveServiceNames(List serviceNames) { + try { + assertThat(resolveSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + try { + return resolveResult.get(); + } finally { + continueSemaphore.release(); + } + } + + @Override + void updateResolvedAddresses(List all) { + try { + super.updateResolvedAddresses(all); + } finally { + updateResolvedSemaphore.release(); + } + } + + @Override + List>> submit( + List batch, List resolvedAddresses) { + submitResolvedAddresses.set(resolvedAddresses); + submittedSemaphore.release(); + return null; + } + }; + + // "consume" after initial, blocking call to resolveServiceNames() from the constructor + assertThat(continueSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + assertThat(updateResolvedSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + + // Send an invalidation, compare addresses + sender.evictObj("repo", SOME_OBJ_REF); + assertThat(submittedSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(submitResolvedAddresses.get()) + .containsExactlyInAnyOrderElementsOf(currentAddresses); + + // simulate change of resolved addresses + currentAddresses = List.of("127.2.2.2", "127.3.3.3"); + resolveResult.set(succeededFuture(currentAddresses)); + resolveSemaphore.release(); + // wait until next call to resolveServiceNames() has been triggered + assertThat(continueSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + assertThat(updateResolvedSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + + // Send another invalidation, compare addresses + sender.evictObj("repo", SOME_OBJ_REF); + assertThat(submittedSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(submitResolvedAddresses.get()) + .containsExactlyInAnyOrderElementsOf(currentAddresses); + + // simulate a failure resolving the addresses + resolveResult.set(failedFuture(new RuntimeException("blah"))); + resolveSemaphore.release(); + // wait until next call to resolveServiceNames() has been triggered + assertThat(continueSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + + // Send another invalidation, compare addresses + sender.evictObj("repo", SOME_OBJ_REF); + assertThat(submittedSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(submitResolvedAddresses.get()) + .containsExactlyInAnyOrderElementsOf(currentAddresses); + + // simulate another change of resolved addresses + currentAddresses = List.of("127.4.4.4", "127.5.5.5"); + resolveResult.set(succeededFuture(currentAddresses)); + resolveSemaphore.release(); + // wait until next call to resolveServiceNames() has been triggered + assertThat(continueSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + assertThat(updateResolvedSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + + // Send another invalidation, compare addresses + sender.evictObj("repo", SOME_OBJ_REF); + assertThat(submittedSemaphore.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + soft.assertThat(submitResolvedAddresses.get()) + .containsExactlyInAnyOrderElementsOf(currentAddresses); + } finally { + // Permit a lot, the test might otherwise "hang" in resolveServiceNames() + resolveSemaphore.release(10_000_000); + } + } + + @Test + public void noServiceNames() throws Exception { + var senderId = ServerInstanceId.of("senderId"); + + var token = "token"; + var tokens = singletonList(token); + + var config = + buildConfig(tokens, Optional.empty(), Duration.ofSeconds(10), Duration.ofSeconds(10)); + + var sender = + new CacheInvalidationSender(vertx, config, 80, senderId) { + @Override + Future> resolveServiceNames(List serviceNames) { + return succeededFuture(List.of()); + } + + @Override + List>> submit( + List batch, List resolvedAddresses) { + soft.fail("Not expected"); + return null; + } + }; + + var senderSpy = spy(sender); + + senderSpy.evictObj("repo", SOME_OBJ_REF); + + // Hard to test that nothing is done, if the list of resolved addresses is empty, but the + // condition is easy. If this tests is flaky, then there's something broken. + Thread.sleep(100L); + + verify(senderSpy).evictObj("repo", SOME_OBJ_REF); + verify(senderSpy).enqueue(cacheInvalidationEvictObj("repo", SOME_OBJ_REF)); + verifyNoMoreInteractions(senderSpy); + } + + @ParameterizedTest + @MethodSource("invalidations") + public void mockedSendSingleInvalidation( + Consumer invalidation, CacheInvalidation expected) + throws Exception { + var senderId = ServerInstanceId.of("senderId"); + + var token = "token"; + var tokens = singletonList(token); + + var serviceNames = singletonList("service-name"); + var resolvedServiceNames = singletonList("service-name-resolved"); + + var config = + buildConfig( + tokens, Optional.of(serviceNames), Duration.ofSeconds(10), Duration.ofSeconds(10)); + + var sem = new Semaphore(0); + var sender = + new CacheInvalidationSender(vertx, config, 80, senderId) { + @Override + Future> resolveServiceNames(List serviceNames) { + return succeededFuture(resolvedServiceNames); + } + + @Override + List>> submit( + List batch, List resolvedAddresses) { + sem.release(1); + return null; + } + }; + + var senderSpy = spy(sender); + + invalidation.accept(senderSpy); + assertThat(sem.tryAcquire(30, TimeUnit.SECONDS)).isTrue(); + + verify(senderSpy).submit(singletonList(expected), resolvedServiceNames); + } + + @Test + public void mockedAllInvalidationTypes() throws Exception { + var senderId = ServerInstanceId.of("senderId"); + + var token = "token"; + var tokens = singletonList(token); + + var serviceNames = singletonList("service-name"); + var resolvedServiceNames = singletonList("service-name-resolved"); + + var config = + buildConfig( + tokens, Optional.of(serviceNames), Duration.ofSeconds(10), Duration.ofSeconds(10)); + + var sem = new Semaphore(0); + var received = new ConcurrentLinkedQueue<>(); + var sender = + new CacheInvalidationSender(vertx, config, 80, senderId) { + @Override + Future> resolveServiceNames(List serviceNames) { + return succeededFuture(resolvedServiceNames); + } + + @Override + List>> submit( + List batch, List resolvedAddresses) { + received.addAll(batch); + soft.assertThat(resolvedAddresses) + .containsExactlyInAnyOrderElementsOf(resolvedServiceNames); + sem.release(batch.size()); + return null; + } + }; + + var senderSpy = spy(sender); + + var expected = + invalidations().map(args -> args.get()[1]).map(CacheInvalidation.class::cast).toList(); + + invalidations() + .map(args -> args.get()[0]) + .map( + i -> { + @SuppressWarnings({"UnnecessaryLocalVariable", "unchecked"}) + Consumer r = (Consumer) i; + return r; + }) + .forEach(i -> i.accept(senderSpy)); + + assertThat(sem.tryAcquire(expected.size(), 30, TimeUnit.SECONDS)).isTrue(); + + soft.assertThat(received).containsExactlyInAnyOrderElementsOf(expected); + } + + @ParameterizedTest + @MethodSource("invalidations") + public void sendSingleInvalidation( + @SuppressWarnings("unused") Consumer invalidation, + CacheInvalidation expected) + throws Exception { + var senderId = ServerInstanceId.of("senderId"); + + var token = "token"; + var tokens = singletonList(token); + + var serviceNames = singletonList("service-name"); + + var config = + buildConfig( + tokens, Optional.of(serviceNames), Duration.ofSeconds(10), Duration.ofSeconds(10)); + + var mapper = new ObjectMapper(); + + var body = new AtomicReference(); + var reqUri = new AtomicReference(); + try (var receiver = + new HttpTestServer( + config.cacheInvalidationUri(), + exchange -> { + try (InputStream requestBody = exchange.getRequestBody()) { + body.set(new String(requestBody.readAllBytes(), UTF_8)); + } + reqUri.set(exchange.getRequestURI()); + exchange.sendResponseHeaders(204, 0); + exchange.getResponseBody().close(); + })) { + + var uri = receiver.getUri(); + + var sender = + new CacheInvalidationSender(vertx, config, uri.getPort(), senderId) { + @Override + Future> resolveServiceNames(List serviceNames) { + return succeededFuture(List.of(uri.getHost())); + } + }; + + var future = + CompletableFuture.allOf( + sender.submit(singletonList(expected), singletonList(uri.getHost())).stream() + .map(Future::toCompletionStage) + .map(CompletionStage::toCompletableFuture) + .toArray(CompletableFuture[]::new)); + + soft.assertThat(future).succeedsWithin(30, TimeUnit.SECONDS); + + soft.assertThat(body.get()) + .isEqualTo(mapper.writeValueAsString(cacheInvalidations(singletonList(expected)))); + soft.assertThat(reqUri.get()).extracting(URI::getPath).isEqualTo("/foo/bar/"); + soft.assertThat(reqUri.get()) + .extracting(URI::getQuery) + .isEqualTo("sender=" + senderId.instanceId()); + } + } + + @Test + public void allInvalidationTypes() throws Exception { + var senderId = ServerInstanceId.of("senderId"); + + var token = "token"; + var tokens = singletonList(token); + + var serviceNames = singletonList("service-name"); + + var config = + buildConfig( + tokens, Optional.of(serviceNames), Duration.ofSeconds(10), Duration.ofSeconds(30)); + + var expected = + invalidations().map(args -> args.get()[1]).map(CacheInvalidation.class::cast).toList(); + + var mapper = new ObjectMapper(); + + var body = new AtomicReference(); + var reqUri = new AtomicReference(); + try (HttpTestServer receiver = + new HttpTestServer( + config.cacheInvalidationUri(), + exchange -> { + try (InputStream requestBody = exchange.getRequestBody()) { + body.set(new String(requestBody.readAllBytes(), UTF_8)); + } + reqUri.set(exchange.getRequestURI()); + exchange.sendResponseHeaders(204, 0); + exchange.getResponseBody().close(); + })) { + + var uri = receiver.getUri(); + + var sender = + new CacheInvalidationSender(vertx, config, uri.getPort(), senderId) { + @Override + Future> resolveServiceNames(List serviceNames) { + return succeededFuture(List.of(uri.getHost())); + } + }; + + var future = + Future.all(sender.submit(expected, singletonList(uri.getHost()))) + .toCompletionStage() + .toCompletableFuture(); + + soft.assertThat(future).succeedsWithin(30, TimeUnit.SECONDS); + + soft.assertThat(body.get()) + .isEqualTo(mapper.writeValueAsString(cacheInvalidations(expected))); + soft.assertThat(reqUri.get()).extracting(URI::getPath).isEqualTo("/foo/bar/"); + soft.assertThat(reqUri.get()) + .extracting(URI::getQuery) + .isEqualTo("sender=" + senderId.instanceId()); + } + } + + @Test + public void sendInvalidationTimeout() throws Exception { + var senderId = ServerInstanceId.of("senderId"); + + var token = "token"; + var tokens = singletonList(token); + + var serviceNames = singletonList("service-name"); + + var config = + buildConfig( + tokens, Optional.of(serviceNames), Duration.ofSeconds(10), Duration.ofMillis(1)); + + var expected = + invalidations().map(args -> args.get()[1]).map(CacheInvalidation.class::cast).toList(); + + try (var receiver = + new HttpTestServer( + config.cacheInvalidationUri(), + exchange -> { + try (InputStream requestBody = exchange.getRequestBody()) { + requestBody.readAllBytes(); + } + // don't send a response -> provoke a timeout + exchange.getResponseBody().close(); + })) { + + var uri = receiver.getUri(); + + var sender = + new CacheInvalidationSender(vertx, config, uri.getPort(), senderId) { + @Override + Future> resolveServiceNames(List serviceNames) { + return succeededFuture(List.of(uri.getHost())); + } + }; + + var future = + CompletableFuture.allOf( + sender.submit(expected, singletonList(uri.getHost())).stream() + .map(Future::toCompletionStage) + .map(CompletionStage::toCompletableFuture) + .toArray(CompletableFuture[]::new)); + + soft.assertThat(future) + .failsWithin(30, TimeUnit.SECONDS) + .withThrowableOfType(ExecutionException.class) + .withMessageContaining("Timeout 1 (ms) fired"); + } + } + + static Stream invalidations() { + return Stream.of( + arguments( + (Consumer) i -> i.evictObj("repo", SOME_OBJ_REF), + cacheInvalidationEvictObj("repo", SOME_OBJ_REF)), + arguments( + (Consumer) i -> i.evictReference("repo", "refs/foo/bar"), + cacheInvalidationEvictReference("repo", "refs/foo/bar"))); + } + + private static QuarkusDistributedCacheInvalidationsConfig buildConfig( + List tokens, + Optional> serviceName, + Duration interval, + Duration requestTimeout) { + var config = mock(QuarkusDistributedCacheInvalidationsConfig.class); + when(config.cacheInvalidationValidTokens()).thenReturn(Optional.of(tokens)); + when(config.cacheInvalidationServiceNames()).thenReturn(serviceName); + when(config.cacheInvalidationServiceNameLookupInterval()).thenReturn(interval); + when(config.cacheInvalidationBatchSize()).thenReturn(10); + when(config.cacheInvalidationUri()).thenReturn("/foo/bar/"); + when(config.cacheInvalidationRequestTimeout()).thenReturn(Optional.of(requestTimeout)); + when(config.dnsQueryTimeout()).thenReturn(Duration.ofSeconds(5)); + return config; + } +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestResolvConf.java b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestResolvConf.java new file mode 100644 index 0000000000..38a9555a66 --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/java/org/apache/polaris/persistence/nosql/quarkus/distcache/TestResolvConf.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.persistence.nosql.quarkus.distcache; + +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.StringReader; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Stream; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestResolvConf { + @InjectSoftAssertions protected SoftAssertions soft; + + @ParameterizedTest + @MethodSource + public void resolve( + String resolvConfContent, List nameservers, List searchList) + throws Exception { + ResolvConf resolvConf = + ResolvConf.fromReader(new BufferedReader(new StringReader(resolvConfContent))); + soft.assertThat(resolvConf) + .extracting(ResolvConf::getNameservers, ResolvConf::getSearchList) + .containsExactly(nameservers, searchList); + } + + @Test + public void system() throws IOException { + String file = Files.readString(Paths.get("/etc/resolv.conf")); + + ResolvConf resolvConf = ResolvConf.system(); + soft.assertThat(resolvConf.getNameservers()).isNotEmpty(); + // This 'if' ensures that this test passes on the macOS test run in CI. + if (file.contains("\nsearch ") || file.startsWith("search ")) { + soft.assertThat(resolvConf.getSearchList()).isNotEmpty(); + } else { + soft.assertThat(resolvConf.getSearchList()).isEmpty(); + } + } + + static Stream resolve() { + return Stream.of( + arguments( + """ + # See man:systemd-resolved.service(8) for details about the supported modes of + # operation for /etc/resolv.conf. + + nameserver 127.0.0.1 + search search.domain + """, + List.of(new InetSocketAddress("127.0.0.1", 53)), + List.of("search.domain")), + arguments( + """ + nameserver 127.0.0.1 + nameserver 1.2.3.4 + """, + List.of(new InetSocketAddress("127.0.0.1", 53), new InetSocketAddress("1.2.3.4", 53)), + List.of()), + arguments( + """ + nameserver 127.0.0.1 + nameserver 1.2.3.4 + search search.domain + search anothersearch.anotherdomain + """, + List.of(new InetSocketAddress("127.0.0.1", 53), new InetSocketAddress("1.2.3.4", 53)), + List.of("search.domain", "anothersearch.anotherdomain")), + arguments( + """ + nameserver 127.0.0.1 + nameserver 1.2.3.4 + search search.domain anothersearch.anotherdomain + """, + List.of(new InetSocketAddress("127.0.0.1", 53), new InetSocketAddress("1.2.3.4", 53)), + List.of("search.domain", "anothersearch.anotherdomain")), + arguments("", List.of(), List.of())); + } +} diff --git a/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/resources/logback-test.xml b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..3cba413f23 --- /dev/null +++ b/persistence/nosql/persistence/cdi/quarkus-distcache/src/test/resources/logback-test.xml @@ -0,0 +1,34 @@ + + + + + + + %date{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + +