diff --git a/src/main/java/io/confluent/connect/jdbc/sink/BatchPKCompaction.java b/src/main/java/io/confluent/connect/jdbc/sink/BatchPKCompaction.java new file mode 100644 index 000000000..0675de938 --- /dev/null +++ b/src/main/java/io/confluent/connect/jdbc/sink/BatchPKCompaction.java @@ -0,0 +1,112 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.connect.jdbc.sink; + +import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata; +import io.confluent.connect.jdbc.sink.metadata.SchemaPair; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; + + +public class BatchPKCompaction { + + private final JdbcSinkConfig.PrimaryKeyMode pkMode; + private final FieldsMetadata fieldsMetadata; + private final SchemaPair schemaPair; + + public BatchPKCompaction( + JdbcSinkConfig.PrimaryKeyMode pkMode, + FieldsMetadata fieldsMetadata, + SchemaPair schemaPair) { + this.pkMode = pkMode; + this.fieldsMetadata = fieldsMetadata; + this.schemaPair = schemaPair; + } + + List applyCompaction(List records) { + LinkedHashMap lastValues = new LinkedHashMap<>(); + for (int i = records.size() - 1; i >= 0; i--) { + lastValues.putIfAbsent(getPK(records.get(i)), records.get(i)); + } + List result = new ArrayList<>(lastValues.values()); + Collections.reverse(result); + return result; + } + + private PrimaryKeyValue getPK(SinkRecord record) { + switch (pkMode) { + case KAFKA: { + return new PrimaryKeyValue(record.topic(), record.kafkaPartition(), record.kafkaOffset()); + } + case RECORD_KEY: { + return getPK(schemaPair.keySchema, record.key()); + } + case RECORD_VALUE: { + return getPK(schemaPair.valueSchema, record.value()); + } + case NONE: { + throw new UnsupportedOperationException("Compaction isn't supported for pkMode NONE"); + } + default: + throw new UnsupportedOperationException("Compaction isn't supported for pkMode: " + pkMode); + } + } + + private PrimaryKeyValue getPK(Schema schema, Object value) { + if (schema.type().isPrimitive()) { + assert fieldsMetadata.keyFieldNames.size() == 1; + return new PrimaryKeyValue(value); + } else { + return new PrimaryKeyValue( + fieldsMetadata.keyFieldNames.stream() + .map(fieldName -> ((Struct) value).get(schema.field(fieldName))) + .toArray() + ); + } + } + + private static class PrimaryKeyValue { + + private final Object[] values; + + private PrimaryKeyValue(Object... values) { + this.values = values; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + PrimaryKeyValue that = (PrimaryKeyValue) obj; + return Arrays.equals(this.values, that.values); + } + + @Override + public int hashCode() { + return Arrays.hashCode(values); + } + } +} diff --git a/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java b/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java index e51d4a1f8..42b7f9a8c 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java @@ -43,6 +43,7 @@ import static java.util.Objects.nonNull; public class BufferedRecords { + private static final Logger log = LoggerFactory.getLogger(BufferedRecords.class); private final TableId tableId; @@ -61,6 +62,7 @@ public class BufferedRecords { private StatementBinder updateStatementBinder; private StatementBinder deleteStatementBinder; private boolean deletesInBatch = false; + private BatchPKCompaction batchPKCompaction = null; public BufferedRecords( JdbcSinkConfig config, @@ -178,7 +180,17 @@ public List flush() throws SQLException { return new ArrayList<>(); } log.debug("Flushing {} buffered records", records.size()); - for (SinkRecord record : records) { + + List batch = records; + if (config.batchPkCompactionEnabled) { + if (batchPKCompaction == null) { + batchPKCompaction = new BatchPKCompaction(config.pkMode, fieldsMetadata, + new SchemaPair(keySchema, valueSchema)); + } + batch = batchPKCompaction.applyCompaction(records); + } + + for (SinkRecord record : batch) { if (isNull(record.value()) && nonNull(deleteStatementBinder)) { deleteStatementBinder.bindRecord(record); } else { diff --git a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java index c270d3bcb..9b08b4736 100644 --- a/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkConfig.java @@ -301,6 +301,13 @@ public enum DateTimezone { private static final String MSSQL_USE_MERGE_HOLDLOCK_DISPLAY = "SQL Server - Use HOLDLOCK in MERGE"; + public static final String BATCH_PK_COMPACTION = "batch.pk.compaction"; + private static final String BATCH_PK_COMPACTION_DEFAULT = "false"; + private static final String BATCH_PK_COMPACTION_DOC = + "Whether to retain only the latest record for each unique key"; + private static final String BATCH_PK_COMPACTION_DISPLAY = + "Retain only the latest record for each unique key"; + /** * The properties that begin with this prefix will be used to configure a class, specified by * {@code jdbc.credentials.provider.class} if it implements {@link Configurable}. @@ -457,6 +464,17 @@ public enum DateTimezone { ConfigDef.Width.MEDIUM, REPLACE_NULL_WITH_DEFAULT_DISPLAY ) + .define( + BATCH_PK_COMPACTION, + ConfigDef.Type.BOOLEAN, + BATCH_PK_COMPACTION_DEFAULT, + ConfigDef.Importance.LOW, + BATCH_PK_COMPACTION_DOC, + WRITES_GROUP, + 6, + ConfigDef.Width.LONG, + BATCH_PK_COMPACTION_DISPLAY + ) // Data Mapping .define( TABLE_NAME_FORMAT, @@ -614,6 +632,7 @@ public enum DateTimezone { public final int batchSize; public final boolean deleteEnabled; public final boolean replaceNullWithDefault; + public final boolean batchPkCompactionEnabled; public final int maxRetries; public final int retryBackoffMs; public final boolean autoCreate; @@ -642,6 +661,7 @@ public JdbcSinkConfig(Map props) { batchSize = getInt(BATCH_SIZE); deleteEnabled = getBoolean(DELETE_ENABLED); replaceNullWithDefault = getBoolean(REPLACE_NULL_WITH_DEFAULT); + batchPkCompactionEnabled = getBoolean(BATCH_PK_COMPACTION); maxRetries = getInt(MAX_RETRIES); retryBackoffMs = getInt(RETRY_BACKOFF_MS); autoCreate = getBoolean(AUTO_CREATE); diff --git a/src/test/java/io/confluent/connect/jdbc/sink/BatchPKCompactionTest.java b/src/test/java/io/confluent/connect/jdbc/sink/BatchPKCompactionTest.java new file mode 100644 index 000000000..3429a00e6 --- /dev/null +++ b/src/test/java/io/confluent/connect/jdbc/sink/BatchPKCompactionTest.java @@ -0,0 +1,67 @@ +package io.confluent.connect.jdbc.sink; + +import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata; +import io.confluent.connect.jdbc.sink.metadata.SchemaPair; +import io.confluent.connect.jdbc.sink.metadata.SinkRecordField; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +public class BatchPKCompactionTest { + + private static final String FIELD_ID = "id"; + private static final String FIELD_VALUE = "value"; + + @Test + public void test() { + final Map allFields = new HashMap<>(); + allFields.put(FIELD_ID, new SinkRecordField(Schema.INT64_SCHEMA, FIELD_ID, true)); + allFields.put(FIELD_VALUE, new SinkRecordField(Schema.STRING_SCHEMA, FIELD_VALUE, false)); + BatchPKCompaction compaction = new BatchPKCompaction( + JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY, + new FieldsMetadata( + new HashSet<>(Arrays.asList(FIELD_ID)), + new HashSet<>(Arrays.asList(FIELD_VALUE)), + allFields + ), + new SchemaPair(Schema.STRING_SCHEMA, schemaIdValue) + ); + + final SinkRecord sinkRecord1 = createRecord(1l, "value1"); + final SinkRecord sinkRecord2 = createRecord(2l, "value2"); + final SinkRecord sinkRecord3 = createRecord(1l, "value3"); + List result = compaction.applyCompaction(Arrays.asList(sinkRecord1, sinkRecord2, sinkRecord3)); + + assertEquals(2, result.size()); + assertEquals(sinkRecord2, result.get(0)); + assertEquals(sinkRecord3, result.get(1)); + } + + + private final Schema schemaIdValue = SchemaBuilder.struct() + .field("id", Schema.INT64_SCHEMA) + .field("value", Schema.STRING_SCHEMA); + + private final AtomicLong kafkaOffset = new AtomicLong(0); + + private SinkRecord createRecord(Long id, String value) { + return new SinkRecord( + "topic", + 0, + Schema.STRING_SCHEMA, + id.toString(), + schemaIdValue, + new Struct(schemaIdValue) + .put("id", id) + .put("value", value), + kafkaOffset.incrementAndGet() + ); + } +} \ No newline at end of file