diff --git a/runtime/defaults/src/main/resources/application.properties b/runtime/defaults/src/main/resources/application.properties index 91d41f9256..d00101894c 100644 --- a/runtime/defaults/src/main/resources/application.properties +++ b/runtime/defaults/src/main/resources/application.properties @@ -144,6 +144,7 @@ polaris.event-listener.type=no-op # polaris.event-listener.aws-cloudwatch.log-stream=polaris-cloudwatch-default-stream # polaris.event-listener.aws-cloudwatch.region=us-east-1 # polaris.event-listener.aws-cloudwatch.synchronous-mode=false +# polaris.event-listener.aws-cloudwatch.event-types= // the absence of this property would result in processing all Polaris event types. polaris.log.request-id-header-name=Polaris-Request-Id # polaris.log.mdc.aid=polaris diff --git a/runtime/service/src/main/java/org/apache/polaris/service/config/PolarisIcebergObjectMapperCustomizer.java b/runtime/service/src/main/java/org/apache/polaris/service/config/PolarisIcebergObjectMapperCustomizer.java index fded73d704..b36d81a30f 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/config/PolarisIcebergObjectMapperCustomizer.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/config/PolarisIcebergObjectMapperCustomizer.java @@ -30,7 +30,12 @@ import io.smallrye.config.WithConverter; import jakarta.inject.Inject; import jakarta.inject.Singleton; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.rest.RESTSerializers; +import org.apache.polaris.service.events.PolarisEvent; +import org.apache.polaris.service.events.json.mixins.IcebergMixins; +import org.apache.polaris.service.events.json.mixins.PolarisEventBaseMixin; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +61,10 @@ public void customize(ObjectMapper objectMapper) { objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); objectMapper.setPropertyNamingStrategy(new PropertyNamingStrategies.KebabCaseStrategy()); + objectMapper.addMixIn(PolarisEvent.class, PolarisEventBaseMixin.class); + objectMapper.addMixIn(TableIdentifier.class, IcebergMixins.TableIdentifierMixin.class); + objectMapper.addMixIn(Namespace.class, IcebergMixins.NamespaceMixin.class); + RESTSerializers.registerAll(objectMapper); Serializers.registerSerializers(objectMapper); objectMapper diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/IcebergMixins.java b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/IcebergMixins.java new file mode 100644 index 0000000000..7f9dcd1f52 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/IcebergMixins.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.json.mixins; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.iceberg.catalog.Namespace; + +/** Mixins for Iceberg classes we don't control, to keep JSON concise. */ +public final class IcebergMixins { + + // Private constructor to prevent instantiation + private IcebergMixins() {} + + /** Serializes Namespace as an object like: "namespace": ["sales", "north.america"] */ + public abstract static class NamespaceMixin { + @JsonProperty("namespace") + public abstract String[] levels(); + } + + /** + * Serializes TableIdentifier as a scalar string like: {"namespace": ["sales", "north.america"], + * "name": "transactions"} + */ + public abstract static class TableIdentifierMixin { + @JsonProperty("namespace") + public abstract Namespace namespace(); + + @JsonProperty("name") + public abstract String name(); + } +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/PolarisEventBaseMixin.java b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/PolarisEventBaseMixin.java new file mode 100644 index 0000000000..7ea183e3c1 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/json/mixins/PolarisEventBaseMixin.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.json.mixins; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +public abstract class PolarisEventBaseMixin {} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java deleted file mode 100644 index 63311fc035..0000000000 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/PropertyMapEventListener.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.polaris.service.events.jsonEventListener; - -import java.util.HashMap; -import org.apache.polaris.service.events.IcebergRestCatalogEvents; -import org.apache.polaris.service.events.listeners.PolarisEventListener; - -/** - * This class provides a common framework for transforming Polaris events into a HashMap, which can - * be used to transform the event further, such as transforming into a JSON string, and send them to - * various destinations. Concrete implementations should override the - * {{@code @link#transformAndSendEvent(HashMap)}} method to define how the event data should be - * transformed into a JSON string, transmitted, and/or stored. - */ -public abstract class PropertyMapEventListener implements PolarisEventListener { - protected abstract void transformAndSendEvent(HashMap properties); - - @Override - public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) { - HashMap properties = new HashMap<>(); - properties.put("event_type", event.getClass().getSimpleName()); - properties.put("table_identifier", event.tableIdentifier().toString()); - transformAndSendEvent(properties); - } -} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java new file mode 100644 index 0000000000..7aaf7604b1 --- /dev/null +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/AllEventsForwardingListener.java @@ -0,0 +1,854 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.polaris.service.events.listeners; + +import org.apache.polaris.service.events.AfterAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeAttemptTaskEvent; +import org.apache.polaris.service.events.BeforeLimitRequestRateEvent; +import org.apache.polaris.service.events.CatalogGenericTableServiceEvents; +import org.apache.polaris.service.events.CatalogPolicyServiceEvents; +import org.apache.polaris.service.events.CatalogsServiceEvents; +import org.apache.polaris.service.events.IcebergRestCatalogEvents; +import org.apache.polaris.service.events.PolarisEvent; +import org.apache.polaris.service.events.PrincipalRolesServiceEvents; +import org.apache.polaris.service.events.PrincipalsServiceEvents; + +/** + * Base class for event listeners that wish to generically forward all {@link PolarisEvent + * PolarisEvents} to an external sink. + * + *

This design follows the Template Method pattern, centralizing shared control flow in the base + * class while allowing subclasses to supply the event-specific behavior. + */ +public abstract class AllEventsForwardingListener implements PolarisEventListener { + + /** Subclasses implement the actual logic once, generically. */ + protected abstract void handle(PolarisEvent event); + + /** Optional filter (config-based). Default: handle all. */ + protected boolean shouldHandle(PolarisEvent event) { + return true; + } + + @Override + public void onAfterGetCatalog(CatalogsServiceEvents.AfterGetCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateCatalog(CatalogsServiceEvents.BeforeCreateCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateCatalog(CatalogsServiceEvents.AfterCreateCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDeleteCatalog(CatalogsServiceEvents.BeforeDeleteCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDeleteCatalog(CatalogsServiceEvents.AfterDeleteCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetCatalog(CatalogsServiceEvents.BeforeGetCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdateCatalog(CatalogsServiceEvents.BeforeUpdateCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdateCatalog(CatalogsServiceEvents.AfterUpdateCatalogEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListCatalogs(CatalogsServiceEvents.BeforeListCatalogsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListCatalogs(CatalogsServiceEvents.AfterListCatalogsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreatePrincipal(PrincipalsServiceEvents.BeforeCreatePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreatePrincipal(PrincipalsServiceEvents.AfterCreatePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDeletePrincipal(PrincipalsServiceEvents.BeforeDeletePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDeletePrincipal(PrincipalsServiceEvents.AfterDeletePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetPrincipal(PrincipalsServiceEvents.BeforeGetPrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterGetPrincipal(PrincipalsServiceEvents.AfterGetPrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdatePrincipal(PrincipalsServiceEvents.BeforeUpdatePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdatePrincipal(PrincipalsServiceEvents.AfterUpdatePrincipalEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRotateCredentials( + PrincipalsServiceEvents.BeforeRotateCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRotateCredentials(PrincipalsServiceEvents.AfterRotateCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListPrincipals(PrincipalsServiceEvents.BeforeListPrincipalsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListPrincipals(PrincipalsServiceEvents.AfterListPrincipalsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeResetCredentials(PrincipalsServiceEvents.BeforeResetCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterResetCredentials(PrincipalsServiceEvents.AfterResetCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeAssignPrincipalRole( + PrincipalsServiceEvents.BeforeAssignPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterAssignPrincipalRole( + PrincipalsServiceEvents.AfterAssignPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRevokePrincipalRole( + PrincipalsServiceEvents.BeforeRevokePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRevokePrincipalRole( + PrincipalsServiceEvents.AfterRevokePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListAssignedPrincipalRoles( + PrincipalsServiceEvents.BeforeListAssignedPrincipalRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListAssignedPrincipalRoles( + PrincipalsServiceEvents.AfterListAssignedPrincipalRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreatePrincipalRole( + PrincipalRolesServiceEvents.BeforeCreatePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreatePrincipalRole( + PrincipalRolesServiceEvents.AfterCreatePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDeletePrincipalRole( + PrincipalRolesServiceEvents.BeforeDeletePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDeletePrincipalRole( + PrincipalRolesServiceEvents.AfterDeletePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetPrincipalRole( + PrincipalRolesServiceEvents.BeforeGetPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterGetPrincipalRole( + PrincipalRolesServiceEvents.AfterGetPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdatePrincipalRole( + PrincipalRolesServiceEvents.BeforeUpdatePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdatePrincipalRole( + PrincipalRolesServiceEvents.AfterUpdatePrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListPrincipalRoles( + PrincipalRolesServiceEvents.BeforeListPrincipalRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListPrincipalRoles( + PrincipalRolesServiceEvents.AfterListPrincipalRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateCatalogRole(CatalogsServiceEvents.BeforeCreateCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateCatalogRole(CatalogsServiceEvents.AfterCreateCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDeleteCatalogRole(CatalogsServiceEvents.BeforeDeleteCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDeleteCatalogRole(CatalogsServiceEvents.AfterDeleteCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetCatalogRole(CatalogsServiceEvents.BeforeGetCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterGetCatalogRole(CatalogsServiceEvents.AfterGetCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdateCatalogRole(CatalogsServiceEvents.BeforeUpdateCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdateCatalogRole(CatalogsServiceEvents.AfterUpdateCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListCatalogRoles(CatalogsServiceEvents.BeforeListCatalogRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListCatalogRoles(CatalogsServiceEvents.AfterListCatalogRolesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeAssignCatalogRoleToPrincipalRole( + PrincipalRolesServiceEvents.BeforeAssignCatalogRoleToPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterAssignCatalogRoleToPrincipalRole( + PrincipalRolesServiceEvents.AfterAssignCatalogRoleToPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRevokeCatalogRoleFromPrincipalRole( + PrincipalRolesServiceEvents.BeforeRevokeCatalogRoleFromPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRevokeCatalogRoleFromPrincipalRole( + PrincipalRolesServiceEvents.AfterRevokeCatalogRoleFromPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListAssigneePrincipalsForPrincipalRole( + PrincipalRolesServiceEvents.BeforeListAssigneePrincipalsForPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListAssigneePrincipalsForPrincipalRole( + PrincipalRolesServiceEvents.AfterListAssigneePrincipalsForPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListCatalogRolesForPrincipalRole( + PrincipalRolesServiceEvents.BeforeListCatalogRolesForPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListCatalogRolesForPrincipalRole( + PrincipalRolesServiceEvents.AfterListCatalogRolesForPrincipalRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeAddGrantToCatalogRole( + CatalogsServiceEvents.BeforeAddGrantToCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterAddGrantToCatalogRole( + CatalogsServiceEvents.AfterAddGrantToCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRevokeGrantFromCatalogRole( + CatalogsServiceEvents.BeforeRevokeGrantFromCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRevokeGrantFromCatalogRole( + CatalogsServiceEvents.AfterRevokeGrantFromCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListAssigneePrincipalRolesForCatalogRole( + CatalogsServiceEvents.BeforeListAssigneePrincipalRolesForCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListAssigneePrincipalRolesForCatalogRole( + CatalogsServiceEvents.AfterListAssigneePrincipalRolesForCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListGrantsForCatalogRole( + CatalogsServiceEvents.BeforeListGrantsForCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListGrantsForCatalogRole( + CatalogsServiceEvents.AfterListGrantsForCatalogRoleEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateNamespace(IcebergRestCatalogEvents.BeforeCreateNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateNamespace(IcebergRestCatalogEvents.AfterCreateNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListNamespaces(IcebergRestCatalogEvents.BeforeListNamespacesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListNamespaces(IcebergRestCatalogEvents.AfterListNamespacesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadNamespaceMetadata( + IcebergRestCatalogEvents.BeforeLoadNamespaceMetadataEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadNamespaceMetadata( + IcebergRestCatalogEvents.AfterLoadNamespaceMetadataEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCheckExistsNamespace( + IcebergRestCatalogEvents.BeforeCheckExistsNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCheckExistsNamespace( + IcebergRestCatalogEvents.AfterCheckExistsNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDropNamespace(IcebergRestCatalogEvents.BeforeDropNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDropNamespace(IcebergRestCatalogEvents.AfterDropNamespaceEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdateNamespaceProperties( + IcebergRestCatalogEvents.BeforeUpdateNamespacePropertiesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdateNamespaceProperties( + IcebergRestCatalogEvents.AfterUpdateNamespacePropertiesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateTable(IcebergRestCatalogEvents.BeforeCreateTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateTable(IcebergRestCatalogEvents.AfterCreateTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCommitTable(IcebergRestCatalogEvents.BeforeCommitTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCommitTable(IcebergRestCatalogEvents.AfterCommitTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRefreshTable(IcebergRestCatalogEvents.BeforeRefreshTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRefreshTable(IcebergRestCatalogEvents.AfterRefreshTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListTables(IcebergRestCatalogEvents.BeforeListTablesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListTables(IcebergRestCatalogEvents.AfterListTablesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadTable(IcebergRestCatalogEvents.BeforeLoadTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadTable(IcebergRestCatalogEvents.AfterLoadTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCheckExistsTable(IcebergRestCatalogEvents.BeforeCheckExistsTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCheckExistsTable(IcebergRestCatalogEvents.AfterCheckExistsTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDropTable(IcebergRestCatalogEvents.BeforeDropTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDropTable(IcebergRestCatalogEvents.AfterDropTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRegisterTable(IcebergRestCatalogEvents.BeforeRegisterTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRegisterTable(IcebergRestCatalogEvents.AfterRegisterTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRenameTable(IcebergRestCatalogEvents.BeforeRenameTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRenameTable(IcebergRestCatalogEvents.AfterRenameTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdateTable(IcebergRestCatalogEvents.BeforeUpdateTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdateTable(IcebergRestCatalogEvents.AfterUpdateTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateView(IcebergRestCatalogEvents.BeforeCreateViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateView(IcebergRestCatalogEvents.AfterCreateViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCommitView(IcebergRestCatalogEvents.BeforeCommitViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCommitView(IcebergRestCatalogEvents.AfterCommitViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRefreshView(IcebergRestCatalogEvents.BeforeRefreshViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRefreshView(IcebergRestCatalogEvents.AfterRefreshViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListViews(IcebergRestCatalogEvents.BeforeListViewsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListViews(IcebergRestCatalogEvents.AfterListViewsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadView(IcebergRestCatalogEvents.BeforeLoadViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadView(IcebergRestCatalogEvents.AfterLoadViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCheckExistsView(IcebergRestCatalogEvents.BeforeCheckExistsViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCheckExistsView(IcebergRestCatalogEvents.AfterCheckExistsViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDropView(IcebergRestCatalogEvents.BeforeDropViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDropView(IcebergRestCatalogEvents.AfterDropViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeRenameView(IcebergRestCatalogEvents.BeforeRenameViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterRenameView(IcebergRestCatalogEvents.AfterRenameViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeReplaceView(IcebergRestCatalogEvents.BeforeReplaceViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterReplaceView(IcebergRestCatalogEvents.AfterReplaceViewEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadCredentials(IcebergRestCatalogEvents.BeforeLoadCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadCredentials(IcebergRestCatalogEvents.AfterLoadCredentialsEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCommitTransaction( + IcebergRestCatalogEvents.BeforeCommitTransactionEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCommitTransaction(IcebergRestCatalogEvents.AfterCommitTransactionEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeSendNotification(IcebergRestCatalogEvents.BeforeSendNotificationEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterSendNotification(IcebergRestCatalogEvents.AfterSendNotificationEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetConfig(IcebergRestCatalogEvents.BeforeGetConfigEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterGetConfig(IcebergRestCatalogEvents.AfterGetConfigEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreatePolicy(CatalogPolicyServiceEvents.BeforeCreatePolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreatePolicy(CatalogPolicyServiceEvents.AfterCreatePolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListPolicies(CatalogPolicyServiceEvents.BeforeListPoliciesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListPolicies(CatalogPolicyServiceEvents.AfterListPoliciesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadPolicy(CatalogPolicyServiceEvents.BeforeLoadPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadPolicy(CatalogPolicyServiceEvents.AfterLoadPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeUpdatePolicy(CatalogPolicyServiceEvents.BeforeUpdatePolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterUpdatePolicy(CatalogPolicyServiceEvents.AfterUpdatePolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDropPolicy(CatalogPolicyServiceEvents.BeforeDropPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDropPolicy(CatalogPolicyServiceEvents.AfterDropPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeAttachPolicy(CatalogPolicyServiceEvents.BeforeAttachPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterAttachPolicy(CatalogPolicyServiceEvents.AfterAttachPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDetachPolicy(CatalogPolicyServiceEvents.BeforeDetachPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDetachPolicy(CatalogPolicyServiceEvents.AfterDetachPolicyEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeGetApplicablePolicies( + CatalogPolicyServiceEvents.BeforeGetApplicablePoliciesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterGetApplicablePolicies( + CatalogPolicyServiceEvents.AfterGetApplicablePoliciesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeCreateGenericTable( + CatalogGenericTableServiceEvents.BeforeCreateGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterCreateGenericTable( + CatalogGenericTableServiceEvents.AfterCreateGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeDropGenericTable( + CatalogGenericTableServiceEvents.BeforeDropGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterDropGenericTable( + CatalogGenericTableServiceEvents.AfterDropGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeListGenericTables( + CatalogGenericTableServiceEvents.BeforeListGenericTablesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterListGenericTables( + CatalogGenericTableServiceEvents.AfterListGenericTablesEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLoadGenericTable( + CatalogGenericTableServiceEvents.BeforeLoadGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterLoadGenericTable( + CatalogGenericTableServiceEvents.AfterLoadGenericTableEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeAttemptTask(BeforeAttemptTaskEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onAfterAttemptTask(AfterAttemptTaskEvent event) { + if (shouldHandle(event)) handle(event); + } + + @Override + public void onBeforeLimitRequestRate(BeforeLimitRequestRateEvent event) { + if (shouldHandle(event)) handle(event); + } +} diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchConfiguration.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java similarity index 91% rename from runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchConfiguration.java rename to runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java index e511f13a0c..57f9ae98c7 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchConfiguration.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchConfiguration.java @@ -16,13 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch; +package org.apache.polaris.service.events.listeners.aws.cloudwatch; import io.quarkus.runtime.annotations.StaticInitSafe; import io.smallrye.config.ConfigMapping; import io.smallrye.config.WithDefault; import io.smallrye.config.WithName; import jakarta.enterprise.context.ApplicationScoped; +import java.util.Optional; +import java.util.Set; +import org.apache.polaris.service.events.PolarisEvent; /** Configuration interface for AWS CloudWatch event listener integration. */ @StaticInitSafe @@ -86,4 +89,8 @@ public interface AwsCloudWatchConfiguration { @WithName("synchronous-mode") @WithDefault("false") boolean synchronousMode(); + + @WithName("event-types") + Optional>> + eventTypes(); // defaults to empty option i.e. process all events } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java similarity index 72% rename from runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java rename to runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java index 87cf70a2f1..21e3975fcf 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListener.java @@ -17,10 +17,13 @@ * under the License. */ -package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch; +package org.apache.polaris.service.events.listeners.aws.cloudwatch; +import com.fasterxml.jackson.annotation.JsonUnwrapped; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; import io.smallrye.common.annotation.Identifier; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -29,14 +32,15 @@ import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.SecurityContext; import java.time.Clock; -import java.util.HashMap; +import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.polaris.core.auth.PolarisPrincipal; import org.apache.polaris.core.context.CallContext; -import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer; -import org.apache.polaris.service.events.jsonEventListener.PropertyMapEventListener; +import org.apache.polaris.service.events.PolarisEvent; +import org.apache.polaris.service.events.listeners.AllEventsForwardingListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.regions.Region; @@ -51,10 +55,10 @@ @ApplicationScoped @Identifier("aws-cloudwatch") -public class AwsCloudWatchEventListener extends PropertyMapEventListener { +public class AwsCloudWatchEventListener extends AllEventsForwardingListener { private static final Logger LOGGER = LoggerFactory.getLogger(AwsCloudWatchEventListener.class); - final ObjectMapper objectMapper = new ObjectMapper(); + final ObjectMapper objectMapper; private CloudWatchLogsAsyncClient client; private final String logGroup; @@ -62,6 +66,8 @@ public class AwsCloudWatchEventListener extends PropertyMapEventListener { private final Region region; private final boolean synchronousMode; private final Clock clock; + private final Set> allowedEventTypes; + private final boolean listenToAllEvents; @Inject CallContext callContext; @@ -69,15 +75,35 @@ public class AwsCloudWatchEventListener extends PropertyMapEventListener { @Inject public AwsCloudWatchEventListener( - AwsCloudWatchConfiguration config, - Clock clock, - PolarisIcebergObjectMapperCustomizer customizer) { + AwsCloudWatchConfiguration config, Clock clock, ObjectMapper mapper) { this.logStream = config.awsCloudWatchLogStream(); this.logGroup = config.awsCloudWatchLogGroup(); this.region = Region.of(config.awsCloudWatchRegion()); this.synchronousMode = config.synchronousMode(); this.clock = clock; - customizer.customize(this.objectMapper); + this.objectMapper = mapper; + this.allowedEventTypes = config.eventTypes().orElse(Set.of()); + this.listenToAllEvents = + allowedEventTypes.isEmpty() + || allowedEventTypes.stream().anyMatch(c -> c == PolarisEvent.class); + } + + @Override + protected boolean shouldHandle(PolarisEvent event) { + if (event == null) { + return false; + } + + if (this.listenToAllEvents) { + return true; + } + Class actualType = event.getClass(); + return allowedEventTypes.stream().anyMatch(cfg -> cfg.isAssignableFrom(actualType)); + } + + @Override + protected void handle(PolarisEvent event) { + transformAndSendEvent(event); } @PostConstruct @@ -151,28 +177,44 @@ void shutdown() { } } - @Override - protected void transformAndSendEvent(HashMap properties) { - properties.put("realm_id", callContext.getRealmContext().getRealmIdentifier()); - properties.put("principal", securityContext.getUserPrincipal().getName()); - properties.put( - "activated_roles", ((PolarisPrincipal) securityContext.getUserPrincipal()).getRoles()); - // TODO: Add request ID when it is available + @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) + public record CloudWatchEvent( + String principal, + String realmId, + Collection activatedRoles, + String eventType, + @JsonUnwrapped PolarisEvent event // flatten + ) {} + + protected void transformAndSendEvent(PolarisEvent event) { + + CloudWatchEvent payload = + new CloudWatchEvent( + securityContext.getUserPrincipal().getName(), + callContext.getRealmContext().getRealmIdentifier(), + ((PolarisPrincipal) securityContext.getUserPrincipal()).getRoles(), + event.getClass().getSimpleName(), + event); + String eventAsJson; + try { - eventAsJson = objectMapper.writeValueAsString(properties); - } catch (JsonProcessingException e) { - LOGGER.error("Error processing event into JSON string: ", e); - LOGGER.debug("Failed to convert the following object into JSON string: {}", properties); + eventAsJson = objectMapper.writeValueAsString(payload); + } catch (JsonProcessingException ex) { + LOGGER.error("Error serializing CloudWatch payload: ", ex); + LOGGER.debug("Failed to convert the following object into JSON string: {}", payload); return; } + InputLogEvent inputLogEvent = InputLogEvent.builder().message(eventAsJson).timestamp(clock.millis()).build(); + PutLogEventsRequest.Builder requestBuilder = PutLogEventsRequest.builder() .logGroupName(logGroup) .logStreamName(logStream) .logEvents(List.of(inputLogEvent)); + CompletableFuture future = client .putLogEvents(requestBuilder.build()) @@ -183,6 +225,7 @@ protected void transformAndSendEvent(HashMap properties) { "Error writing log to CloudWatch. Event: {}, Error: ", inputLogEvent, err); } }); + if (synchronousMode) { future.join(); } diff --git a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java similarity index 81% rename from runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java rename to runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java index 3aac097b17..708e88bd4e 100644 --- a/runtime/service/src/test/java/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java +++ b/runtime/service/src/test/java/org/apache/polaris/service/events/listeners/aws/cloudwatch/AwsCloudWatchEventListenerTest.java @@ -17,12 +17,14 @@ * under the License. */ -package org.apache.polaris.service.events.jsonEventListener.aws.cloudwatch; +package org.apache.polaris.service.events.listeners.aws.cloudwatch; import static org.apache.polaris.containerspec.ContainerSpecHelper.containerSpecHelper; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategies; import io.quarkus.runtime.configuration.MemorySize; import jakarta.ws.rs.core.SecurityContext; @@ -33,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.auth.PolarisPrincipal; @@ -40,6 +43,9 @@ import org.apache.polaris.core.context.RealmContext; import org.apache.polaris.service.config.PolarisIcebergObjectMapperCustomizer; import org.apache.polaris.service.events.IcebergRestCatalogEvents; +import org.apache.polaris.service.events.PolarisEvent; +import org.apache.polaris.service.events.json.mixins.IcebergMixins; +import org.apache.polaris.service.events.json.mixins.PolarisEventBaseMixin; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -86,17 +92,22 @@ class AwsCloudWatchEventListenerTest { private ExecutorService executorService; private AutoCloseable mockitoContext; + private static final ObjectMapper objectMapper = new ObjectMapper(); + ; @BeforeEach void setUp() { mockitoContext = MockitoAnnotations.openMocks(this); executorService = Executors.newSingleThreadExecutor(); + customizer.customize(objectMapper); + // Configure the mocks when(config.awsCloudWatchLogGroup()).thenReturn(LOG_GROUP); when(config.awsCloudWatchLogStream()).thenReturn(LOG_STREAM); when(config.awsCloudWatchRegion()).thenReturn("us-east-1"); - when(config.synchronousMode()).thenReturn(false); // Default to async mode + when(config.synchronousMode()).thenReturn(false); // default async + when(config.eventTypes()).thenReturn(java.util.Optional.empty()); // handle all events } @AfterEach @@ -130,7 +141,7 @@ private CloudWatchLogsAsyncClient createCloudWatchAsyncClient() { private AwsCloudWatchEventListener createListener(CloudWatchLogsAsyncClient client) { AwsCloudWatchEventListener listener = - new AwsCloudWatchEventListener(config, clock, customizer) { + new AwsCloudWatchEventListener(config, clock, objectMapper) { @Override protected CloudWatchLogsAsyncClient createCloudWatchAsyncClient() { return client; @@ -201,7 +212,8 @@ void shouldSendEventToCloudWatch() { listener.start(); try { // Create and send a test event - TableIdentifier testTable = TableIdentifier.of("test_namespace", "test_table"); + Namespace namespaceTest = Namespace.of("test_namespace.test1", "test1a"); + TableIdentifier testTable = TableIdentifier.of(namespaceTest, "test_table"); listener.onAfterRefreshTable( new IcebergRestCatalogEvents.AfterRefreshTableEvent("test_catalog", testTable)); @@ -236,12 +248,18 @@ void shouldSendEventToCloudWatch() { .satisfies( logEvent -> { String message = logEvent.message(); + JsonNode root = objectMapper.readTree(message); + JsonNode event = root.path("event").isMissingNode() ? root : root.path("event"); assertThat(message).contains(REALM); assertThat(message) .contains( IcebergRestCatalogEvents.AfterRefreshTableEvent.class.getSimpleName()); assertThat(message).contains(TEST_USER); - assertThat(message).contains(testTable.toString()); + // table_identifier object + JsonNode tableId = event.path("table_identifier"); + assertThat(tableId.isObject()).isTrue(); + assertThat(tableId.path("name").asText()).isEqualTo("test_table"); + assertThat(tableId.path("namespace").isArray()).isTrue(); }); } finally { // Clean up @@ -309,17 +327,49 @@ void shouldSendEventInSynchronousMode() { @Test void ensureObjectMapperCustomizerIsApplied() { - AwsCloudWatchEventListener listener = createListener(createCloudWatchAsyncClient()); - listener.start(); + + AwsCloudWatchEventListener listener = + new AwsCloudWatchEventListener(config, clock, objectMapper); assertThat(listener.objectMapper.getPropertyNamingStrategy()) .isInstanceOf(PropertyNamingStrategies.KebabCaseStrategy.class); assertThat(listener.objectMapper.getFactory().streamReadConstraints().getMaxDocumentLength()) .isEqualTo(MAX_BODY_SIZE.longValue()); + + assertThat(objectMapper.findMixInClassFor(Namespace.class)) + .as("Namespace mixin should be registered") + .isEqualTo(IcebergMixins.NamespaceMixin.class); + + assertThat(objectMapper.findMixInClassFor(TableIdentifier.class)) + .as("TableIdentifier mixin should be registered") + .isEqualTo(IcebergMixins.TableIdentifierMixin.class); + + assertThat(objectMapper.findMixInClassFor(PolarisEvent.class)) + .as("Namespace mixin should be registered") + .isEqualTo(PolarisEventBaseMixin.class); + } + + @Test + void shouldListenToAllEventTypesWhenConfigNotProvided() { + // given: config.eventTypes() is empty → listen to all events + when(config.eventTypes()).thenReturn(java.util.Optional.empty()); + + AwsCloudWatchEventListener listener = + new AwsCloudWatchEventListener(config, clock, objectMapper); + + // This is any random PolarisEvent — if the listener listens to all types, + // shouldHandle(event) should return true + PolarisEvent randomEvent = + new IcebergRestCatalogEvents.AfterRefreshTableEvent( + "test_catalog", TableIdentifier.of("db", "table")); + + boolean shouldHandle = listener.shouldHandle(randomEvent); + assertThat(shouldHandle) + .as("Listener should handle all events when no eventTypes are configured") + .isTrue(); } private void verifyLogGroupAndStreamExist(CloudWatchLogsAsyncClient client) { - // Verify log group exists DescribeLogGroupsResponse groups = client .describeLogGroups( diff --git a/runtime/service/src/test/resources/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/Dockerfile-localstack-version b/runtime/service/src/test/resources/org/apache/polaris/service/events/listeners/aws/cloudwatch/Dockerfile-localstack-version similarity index 100% rename from runtime/service/src/test/resources/org/apache/polaris/service/events/jsonEventListener/aws/cloudwatch/Dockerfile-localstack-version rename to runtime/service/src/test/resources/org/apache/polaris/service/events/listeners/aws/cloudwatch/Dockerfile-localstack-version