From 23997774b189f4753d2d3b08acc6f9c1eb98101c Mon Sep 17 00:00:00 2001 From: stheppi Date: Wed, 30 Oct 2024 17:11:44 +0000 Subject: [PATCH 1/2] Add record is null smt --- .../connect/smt/header/RecordIsNull.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 src/main/java/io/lenses/connect/smt/header/RecordIsNull.java diff --git a/src/main/java/io/lenses/connect/smt/header/RecordIsNull.java b/src/main/java/io/lenses/connect/smt/header/RecordIsNull.java new file mode 100644 index 0000000..09d13c0 --- /dev/null +++ b/src/main/java/io/lenses/connect/smt/header/RecordIsNull.java @@ -0,0 +1,45 @@ +package io.lenses.connect.smt.header; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.components.Versioned; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.predicates.Predicate; + +import java.util.Map; + +public class RecordIsNull> implements Predicate, Versioned { + + public static final String OVERVIEW_DOC = "A predicate which is true for records which are tombstones (i.e. have null value)."; + public static final ConfigDef CONFIG_DEF = new ConfigDef(); + + @Override + public String version() { + return AppInfoParser.getVersion(); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public boolean test(R record) { + return record == null; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map configs) { + + } + + @Override + public String toString() { + return "RecordIsTombstone{}"; + } +} From 475fa7e397ecbb8f4822a5b874ff84f5ab34f3a6 Mon Sep 17 00:00:00 2001 From: stheppi Date: Wed, 30 Oct 2024 17:15:09 +0000 Subject: [PATCH 2/2] Add the header --- .../connect/smt/header/RecordIsNull.java | 62 ++++++++++--------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/src/main/java/io/lenses/connect/smt/header/RecordIsNull.java b/src/main/java/io/lenses/connect/smt/header/RecordIsNull.java index 09d13c0..f059eee 100644 --- a/src/main/java/io/lenses/connect/smt/header/RecordIsNull.java +++ b/src/main/java/io/lenses/connect/smt/header/RecordIsNull.java @@ -1,45 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ package io.lenses.connect.smt.header; +import java.util.Map; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.transforms.predicates.Predicate; -import java.util.Map; - public class RecordIsNull> implements Predicate, Versioned { - public static final String OVERVIEW_DOC = "A predicate which is true for records which are tombstones (i.e. have null value)."; - public static final ConfigDef CONFIG_DEF = new ConfigDef(); - - @Override - public String version() { - return AppInfoParser.getVersion(); - } - - @Override - public ConfigDef config() { - return CONFIG_DEF; - } + public static final String OVERVIEW_DOC = + "A predicate which is true for records which are tombstones (i.e. have null value)."; + public static final ConfigDef CONFIG_DEF = new ConfigDef(); - @Override - public boolean test(R record) { - return record == null; - } + @Override + public String version() { + return AppInfoParser.getVersion(); + } - @Override - public void close() { + @Override + public ConfigDef config() { + return CONFIG_DEF; + } - } + @Override + public boolean test(R record) { + return record == null; + } - @Override - public void configure(Map configs) { + @Override + public void close() {} - } + @Override + public void configure(Map configs) {} - @Override - public String toString() { - return "RecordIsTombstone{}"; - } + @Override + public String toString() { + return "RecordIsTombstone{}"; + } }