Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ License: https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product includes code from Netty.

* persistence/nosql/persistence/cdi/quarkus-distcache/src/main/java/org/apache/polaris/persistence/nosql/quarkus/distcache/ResolvConf.java

Copyright: Copyright © 2025 The Netty project
Home page: https://netty.io/
License: https://www.apache.org/licenses/LICENSE-2.0

--------------------------------------------------------------------------------

This product includes code from OpenAPITool openapi-generator

* server-templates/formParams.mustache
Expand Down
1 change: 1 addition & 0 deletions bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ dependencies {
api(project(":polaris-persistence-nosql-benchmark"))
api(project(":polaris-persistence-nosql-correctness"))
api(project(":polaris-persistence-nosql-cdi-common"))
api(project(":polaris-persistence-nosql-cdi-quarkus-distcache"))
api(project(":polaris-persistence-nosql-cdi-weld"))
api(project(":polaris-persistence-nosql-standalone"))
api(project(":polaris-persistence-nosql-testextension"))
Expand Down
1 change: 1 addition & 0 deletions gradle/projects.main.properties
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ polaris-persistence-nosql-impl=persistence/nosql/persistence/impl
polaris-persistence-nosql-benchmark=persistence/nosql/persistence/benchmark
polaris-persistence-nosql-correctness=persistence/nosql/persistence/correctness
polaris-persistence-nosql-cdi-common=persistence/nosql/persistence/cdi/common
polaris-persistence-nosql-cdi-quarkus-distcache=persistence/nosql/persistence/cdi/quarkus-distcache
polaris-persistence-nosql-cdi-weld=persistence/nosql/persistence/cdi/weld
polaris-persistence-nosql-standalone=persistence/nosql/persistence/standalone
polaris-persistence-nosql-testextension=persistence/nosql/persistence/testextension
Expand Down
24 changes: 24 additions & 0 deletions persistence/nosql/persistence/cdi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,27 @@ The biggest difference between the Quarkus and Weld variants is the way how data
There are also backend specific builders that leverage Quarkus extensions for the respective
database backends.
The Quarkus variant also adds OpenTelemetry instrumentation to the `Backend` instances.

# Distributed cache invalidation (multiple Polaris nodes)

Most persisted objects are immutable, which eliminates the need to explicitly invalidate objects.

Some specific object types are intentionally mutable.
Consistency during write operations is guaranteed by using CAS operations on those objects.
Read operations, however, fetch through the cache.

Reference pointers are mutable by design.
For writing operations, the current value of the reference pointer is always read from the
backend database.
Read operations, however, fetch the recent pointer via the cache.

To keep the state for read operations up to date, the writing node sends the information about
the mutation to all other nodes via the distributed cache invalidation mechanism.
Short cache expiration times are used to mitigate the risk of missing cache invalidation messages.

In k8s this works out of the box, leveraging k8s name service mechanisms being able to resolve
the set of IP addresses of all nodes in the cluster.
Non-k8s deployments need to configure the DNS resolvable names or IP addresses of all nodes in
the configuration.

Configuration options allow enabling and disabling the cache expiration duration.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

plugins {
id("org.kordamp.gradle.jandex")
id("polaris-server")
}

description = "Polaris NoSQL persistence, distributed cache invalidation for Quarkus."

dependencies {
implementation(project(":polaris-persistence-nosql-cdi-common"))
implementation(project(":polaris-persistence-nosql-api"))

implementation(platform(libs.jackson.bom))
implementation("com.fasterxml.jackson.core:jackson-annotations")
implementation("com.fasterxml.jackson.core:jackson-databind")

compileOnly(libs.smallrye.config.core)

compileOnly(project(":polaris-immutables"))
annotationProcessor(project(":polaris-immutables", configuration = "processor"))

implementation(platform(libs.quarkus.bom))
implementation("io.quarkus:quarkus-core")
implementation("io.quarkus:quarkus-vertx-http")

implementation(libs.jakarta.ws.rs.api)

implementation(libs.guava)
implementation(libs.slf4j.api)

compileOnly(libs.jakarta.annotation.api)
compileOnly(libs.jakarta.validation.api)
compileOnly(libs.jakarta.inject.api)
compileOnly(libs.jakarta.enterprise.cdi.api)

compileOnly(project(":polaris-immutables"))
annotationProcessor(project(":polaris-immutables", configuration = "processor"))

// Must stick with the Quarkus platform versions of Vert.X
// (signature of io.vertx.core.Vertx.createHttpClient() changed from 4.5 to 5.0)
testImplementation(enforcedPlatform(libs.quarkus.bom))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.persistence.nosql.quarkus.distcache;

import static com.google.common.base.Preconditions.checkState;
import static java.net.NetworkInterface.networkInterfaces;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableSet;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.dns.DnsClient;
import io.vertx.core.dns.DnsClientOptions;
import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Vert.x based address resolver.
*
* <p>Resolves names to both IPv4 and IPv6 addresses using a given search-list. These
* functionalities are not supported vie{@code InetAddress}.
*/
record AddressResolver(DnsClient dnsClient, List<String> searchList) {
private static final Logger LOGGER = LoggerFactory.getLogger(AddressResolver.class);

/** Set of all locally bound IP addresses. */
static final Set<String> LOCAL_ADDRESSES;

private static final boolean IP_V4_ONLY;

static {
try {
LOCAL_ADDRESSES =
networkInterfaces()
.flatMap(
ni ->
ni.getInterfaceAddresses().stream()
// Need to do this InetAddress->byte[]->InetAddress dance to get rid of
// host-address suffixes as in `0:0:0:0:0:0:0:1%lo`
.map(InterfaceAddress::getAddress)
.map(InetAddress::getAddress)
.map(
a -> {
try {
return InetAddress.getByAddress(a);
} catch (UnknownHostException e) {
// Should never happen when calling getByAddress() with an IPv4 or
// IPv6 address
throw new RuntimeException(e);
}
})
.map(InetAddress::getHostAddress))
.collect(toUnmodifiableSet());

IP_V4_ONLY = Boolean.parseBoolean(System.getProperty("java.net.preferIPv4Stack", "false"));
} catch (SocketException e) {
throw new RuntimeException(e);
}
}

/**
* Uses a "default" {@link DnsClient} using the first {@code nameserver} and the {@code search}
* list configured in {@code /etc/resolv.conf}.
*/
AddressResolver(Vertx vertx, long queryTimeoutMillis) {
this(createDnsClient(vertx, queryTimeoutMillis), ResolvConf.system().getSearchList());
}

/**
* Creates a "default" {@link DnsClient} using the first nameserver configured in {@code
* /etc/resolv.conf}.
*/
private static DnsClient createDnsClient(Vertx vertx, long queryTimeoutMillis) {
var nameservers = ResolvConf.system().getNameservers();
checkState(!nameservers.isEmpty(), "No nameserver configured in /etc/resolv.conf");
var nameserver = nameservers.getFirst();
LOGGER.info(
"Using nameserver {}/{} with search list {}",
nameserver.getHostName(),
nameserver.getAddress().getHostAddress(),
ResolvConf.system().getSearchList());
return vertx.createDnsClient(
new DnsClientOptions()
// 5 seconds should be enough to resolve
.setQueryTimeout(queryTimeoutMillis)
.setHost(nameserver.getAddress().getHostAddress())
.setPort(nameserver.getPort()));
}

private Future<List<String>> resolveSingle(String name) {
var resultA = dnsClient.resolveA(name);
if (IP_V4_ONLY) {
return resultA;
}
return resultA.compose(
a ->
dnsClient
.resolveAAAA(name)
.map(aaaa -> Stream.concat(aaaa.stream(), a.stream()).collect(toList())));
}

/** Resolve a single name, used by {@link #resolveAll(List)}. */
Future<List<String>> resolve(String name) {
if (name.startsWith("=")) {
return Future.succeededFuture(List.of(name.substring(1)));
}

// By convention, do not consult the 'search' list, when the name to query ends with a dot.
var exact = name.endsWith(".");
var query = exact ? name.substring(0, name.length() - 1) : name;
var future = resolveSingle(query);
if (!exact) {
// Consult the 'search' list if the above 'resolveName' fails.
for (var search : searchList) {
future = future.recover(t -> resolveSingle(query + '.' + search));
}
}

return future;
}

/** Resolve all names in parallel. */
Future<List<String>> resolveAll(List<String> names) {
var composite = Future.all(names.stream().map(this::resolve).collect(toList()));
return composite.map(
c ->
IntStream.range(0, c.size())
.mapToObj(c::resultAt)
.map(
e -> {
@SuppressWarnings("unchecked")
var casted = (List<String>) e;
return casted.stream();
})
.reduce(Stream::concat)
.map(s -> s.collect(toList()))
.orElse(List.of()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.persistence.nosql.quarkus.distcache;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import java.util.UUID;

@ApplicationScoped
class CacheInvalidationInfra {

/** Produces a random, ephemeral server instance ID. */
@Produces
@ApplicationScoped
ServerInstanceId ephemeralServerInstanceId() {
return ServerInstanceId.of(UUID.randomUUID().toString());
}
}
Loading