Skip to content

Commit 2e36deb

Browse files
authored
configurable NoOp support for DBZ CDC events (#113)
* preview feature to configure no op support based on CDC event type basic unit test available but config tests missing * add new configuration option to the properties in the docs * slight refactoring for the NoOp support configuration option when using CDC added simple unit test for the Rdbms related classes
1 parent 0346d0e commit 2e36deb

File tree

14 files changed

+417
-31
lines changed

14 files changed

+417
-31
lines changed

README.md

Lines changed: 25 additions & 24 deletions
Large diffs are not rendered by default.

config/MongoDbSinkConnector.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ mongodb.field.renamer.mapping=[]
4141
mongodb.field.renamer.regexp=[]
4242
mongodb.post.processor.chain=at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder
4343
mongodb.change.data.capture.handler=
44+
mongodb.change.data.capture.handler.operations=c,r,u,d
4445
mongodb.delete.on.null.values=false
4546
mongodb.writemodel.strategy=at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy
4647
mongodb.max.batch.size=0

src/main/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkConnectorConfig.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package at.grahsl.kafka.connect.mongodb;
1818

1919
import at.grahsl.kafka.connect.mongodb.cdc.CdcHandler;
20-
import at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler;
21-
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler;
20+
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
21+
import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType;
22+
import at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.*;
23+
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.*;
2224
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.mysql.MysqlHandler;
2325
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.postgres.PostgresHandler;
2426
import at.grahsl.kafka.connect.mongodb.processor.*;
@@ -40,6 +42,7 @@
4042

4143
import java.io.IOException;
4244
import java.util.*;
45+
import java.util.concurrent.ConcurrentSkipListSet;
4346
import java.util.function.Consumer;
4447
import java.util.regex.Pattern;
4548
import java.util.stream.Collectors;
@@ -78,6 +81,7 @@ public enum FieldProjectionTypes {
7881
public static final String MONGODB_FIELD_RENAMER_REGEXP_DEFAULT = "[]";
7982
public static final String MONGODB_POST_PROCESSOR_CHAIN_DEFAULT = "at.grahsl.kafka.connect.mongodb.processor.DocumentIdAdder";
8083
public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT = "";
84+
public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DEFAULT = "c,r,u,d";
8185
public static final boolean MONGODB_DELETE_ON_NULL_VALUES_DEFAULT = false;
8286
public static final String MONGODB_WRITEMODEL_STRATEGY_DEFAULT = "at.grahsl.kafka.connect.mongodb.writemodel.strategy.ReplaceOneDefaultStrategy";
8387
public static final int MONGODB_MAX_BATCH_SIZE_DEFAULT = 0;
@@ -129,6 +133,9 @@ public enum FieldProjectionTypes {
129133
public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER = "mongodb.change.data.capture.handler";
130134
private static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DOC = "class name of CDC handler to use for processing";
131135

136+
public static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS = "mongodb.change.data.capture.handler.operations";
137+
private static final String MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DOC = "comma separated list of supported CDC operation types (missing ones result in no ops)";
138+
132139
public static final String MONGODB_DELETE_ON_NULL_VALUES = "mongodb.delete.on.null.values";
133140
private static final String MONGODB_DELETE_ON_NULL_VALUES_DOC = "whether or not the connector tries to delete documents based on key when value is null";
134141

@@ -257,6 +264,7 @@ public Map<String, ConfigValue> validateAll(Map<String, String> props) {
257264
.define(MONGODB_FIELD_RENAMER_REGEXP, Type.STRING, MONGODB_FIELD_RENAMER_REGEXP_DEFAULT, Importance.LOW, MONGODB_FIELD_RENAMER_REGEXP_DOC)
258265
.define(MONGODB_POST_PROCESSOR_CHAIN, Type.STRING, MONGODB_POST_PROCESSOR_CHAIN_DEFAULT, emptyString().or(matching(FULLY_QUALIFIED_CLASS_NAME_LIST)), Importance.LOW, MONGODB_POST_PROCESSOR_CHAIN_DOC)
259266
.define(MONGODB_CHANGE_DATA_CAPTURE_HANDLER, Type.STRING, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DEFAULT, emptyString().or(matching(FULLY_QUALIFIED_CLASS_NAME)), Importance.LOW, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_DOC)
267+
.define(MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS, Type.STRING, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DEFAULT, Importance.LOW, MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS_DOC)
260268
.define(MONGODB_DELETE_ON_NULL_VALUES, Type.BOOLEAN, MONGODB_DELETE_ON_NULL_VALUES_DEFAULT, Importance.MEDIUM, MONGODB_DELETE_ON_NULL_VALUES_DOC)
261269
.define(MONGODB_WRITEMODEL_STRATEGY, Type.STRING, MONGODB_WRITEMODEL_STRATEGY_DEFAULT, Importance.LOW, MONGODB_WRITEMODEL_STRATEGY_DOC)
262270
.define(MONGODB_MAX_BATCH_SIZE, Type.INT, MONGODB_MAX_BATCH_SIZE_DEFAULT, ConfigDef.Range.atLeast(0), Importance.MEDIUM, MONGODB_MAX_BATCH_SIZE_DOC)
@@ -636,10 +644,14 @@ public CdcHandler getCdcHandler(String collection) {
636644
throw new ConfigException("error: unknown cdc handler "+cdcHandler);
637645
}
638646

647+
List<OperationType> supportedOperations = new HashSet<>(
648+
splitAndTrimAndRemoveConfigListEntries(getString(MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS,collection))
649+
).stream().map(OperationType::fromText).collect(Collectors.toList());
650+
639651
try {
640-
return (CdcHandler) Class.forName(cdcHandler)
641-
.getConstructor(MongoDbSinkConnectorConfig.class)
642-
.newInstance(this);
652+
Class<CdcHandler> cdcHandlerClass = (Class<CdcHandler>)Class.forName(cdcHandler);
653+
return cdcHandlerClass.getConstructor(MongoDbSinkConnectorConfig.class,List.class)
654+
.newInstance(this,supportedOperations);
643655
} catch (ReflectiveOperationException e) {
644656
throw new ConfigException(e.getMessage(),e);
645657
} catch (ClassCastException e) {

src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/mongodb/MongoDbHandler.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import org.slf4j.LoggerFactory;
2929

3030
import java.util.HashMap;
31+
import java.util.List;
3132
import java.util.Map;
3233
import java.util.Optional;
34+
import java.util.stream.Stream;
3335

3436
public class MongoDbHandler extends DebeziumCdcHandler {
3537

@@ -53,6 +55,27 @@ public MongoDbHandler(MongoDbSinkConnectorConfig config,
5355
registerOperations(operations);
5456
}
5557

58+
public MongoDbHandler(MongoDbSinkConnectorConfig config, List<OperationType> supportedTypes) {
59+
super(config);
60+
final Map<OperationType,CdcOperation> operations = new HashMap<>();
61+
Stream.of(OperationType.values()).forEach(ot -> operations.put(ot, new MongoDbNoOp()));
62+
supportedTypes.forEach(ot -> {
63+
switch (ot) {
64+
case CREATE:
65+
case READ:
66+
operations.put(ot,new MongoDbInsert());
67+
break;
68+
case UPDATE:
69+
operations.put(ot,new MongoDbUpdate());
70+
break;
71+
case DELETE:
72+
operations.put(ot,new MongoDbDelete());
73+
break;
74+
}
75+
});
76+
registerOperations(operations);
77+
}
78+
5679
@Override
5780
public Optional<WriteModel<BsonDocument>> handle(SinkDocument doc) {
5881

@@ -72,7 +95,7 @@ public Optional<WriteModel<BsonDocument>> handle(SinkDocument doc) {
7295
logger.debug("key: "+keyDoc.toString());
7396
logger.debug("value: "+valueDoc.toString());
7497

75-
return Optional.of(getCdcOperation(valueDoc).perform(doc));
98+
return Optional.ofNullable(getCdcOperation(valueDoc).perform(doc));
7699
}
77100

78101
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb;
2+
3+
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
4+
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
5+
import com.mongodb.client.model.WriteModel;
6+
import org.bson.BsonDocument;
7+
8+
public class MongoDbNoOp implements CdcOperation {
9+
10+
@Override
11+
public WriteModel<BsonDocument> perform(SinkDocument doc) {
12+
return null;
13+
}
14+
15+
}

src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/RdbmsHandler.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@
3131
import org.slf4j.LoggerFactory;
3232

3333
import java.util.HashMap;
34+
import java.util.List;
3435
import java.util.Map;
3536
import java.util.Optional;
37+
import java.util.stream.Stream;
3638

3739
public class RdbmsHandler extends DebeziumCdcHandler {
3840

@@ -57,6 +59,28 @@ public RdbmsHandler(MongoDbSinkConnectorConfig config,
5759
registerOperations(operations);
5860
}
5961

62+
public RdbmsHandler(MongoDbSinkConnectorConfig config, List<OperationType> supportedTypes) {
63+
super(config);
64+
final Map<OperationType,CdcOperation> operations = new HashMap<>();
65+
Stream.of(OperationType.values()).forEach(ot -> operations.put(ot, new RdbmsNoOp()));
66+
supportedTypes.forEach(ot -> {
67+
switch (ot) {
68+
case CREATE:
69+
case READ:
70+
operations.put(ot,new RdbmsInsert());
71+
break;
72+
case UPDATE:
73+
operations.put(ot,new RdbmsUpdate());
74+
break;
75+
case DELETE:
76+
operations.put(ot,new RdbmsDelete());
77+
break;
78+
}
79+
});
80+
registerOperations(operations);
81+
}
82+
83+
6084
@Override
6185
public Optional<WriteModel<BsonDocument>> handle(SinkDocument doc) {
6286

@@ -69,7 +93,7 @@ public Optional<WriteModel<BsonDocument>> handle(SinkDocument doc) {
6993
return Optional.empty();
7094
}
7195

72-
return Optional.of(getCdcOperation(valueDoc)
96+
return Optional.ofNullable(getCdcOperation(valueDoc)
7397
.perform(new SinkDocument(keyDoc,valueDoc)));
7498
}
7599

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms;
2+
3+
import at.grahsl.kafka.connect.mongodb.cdc.CdcOperation;
4+
import at.grahsl.kafka.connect.mongodb.converter.SinkDocument;
5+
import com.mongodb.client.model.WriteModel;
6+
import org.bson.BsonDocument;
7+
8+
public class RdbmsNoOp implements CdcOperation {
9+
10+
@Override
11+
public WriteModel<BsonDocument> perform(SinkDocument doc) {
12+
return null;
13+
}
14+
15+
}

src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/mysql/MysqlHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType;
2222
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler;
2323

24+
import java.util.List;
2425
import java.util.Map;
2526

2627
public class MysqlHandler extends RdbmsHandler {
@@ -37,4 +38,9 @@ public MysqlHandler(MongoDbSinkConnectorConfig config) {
3738
public MysqlHandler(MongoDbSinkConnectorConfig config, Map<OperationType, CdcOperation> operations) {
3839
super(config, operations);
3940
}
41+
42+
public MysqlHandler(MongoDbSinkConnectorConfig config, List<OperationType> supportedTypes) {
43+
super(config, supportedTypes);
44+
}
45+
4046
}

src/main/java/at/grahsl/kafka/connect/mongodb/cdc/debezium/rdbms/postgres/PostgresHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import at.grahsl.kafka.connect.mongodb.cdc.debezium.OperationType;
2222
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler;
2323

24+
import java.util.List;
2425
import java.util.Map;
2526

2627
public class PostgresHandler extends RdbmsHandler {
@@ -38,4 +39,8 @@ public PostgresHandler(MongoDbSinkConnectorConfig config, Map<OperationType, Cdc
3839
super(config, operations);
3940
}
4041

42+
public PostgresHandler(MongoDbSinkConnectorConfig config, List<OperationType> supportedTypes) {
43+
super(config, supportedTypes);
44+
}
45+
4146
}

src/test/java/at/grahsl/kafka/connect/mongodb/MongoDbSinkTaskTest.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,82 @@ valueSchema,new Struct(valueSchema)
597597

598598
}
599599

600+
@Test
601+
@DisplayName("test build WriteModelCDC for Rdbms NoOp used for all operation types")
602+
void testBuildWriteModelCdcForRdbmsNoOpWithAllOperationTypes() {
603+
604+
Schema keySchema = getRdbmsKeySchemaSample();
605+
Schema valueSchema = getRdbmsValueSchemaSample();
606+
List<SinkRecord> sinkRecords = new ArrayList<SinkRecord>() {{
607+
add(new SinkRecord("test-topic",0,
608+
keySchema,new Struct(keySchema)
609+
.put("id",1234),
610+
valueSchema,new Struct(valueSchema)
611+
.put("op","c")
612+
.put("before", null)
613+
.put("after",
614+
new Struct(valueSchema.field("after").schema())
615+
.put("id",1234)
616+
.put("first_name","Alice_1234")
617+
.put("last_name","van Wonderland")
618+
.put("email","alice_1234@wonder.land"))
619+
//.put("source",...) //NOTE: SKIPPED SINCE NOT USED AT ALL SO FAR
620+
,0
621+
));
622+
add(new SinkRecord("test-topic",0,
623+
keySchema,new Struct(keySchema)
624+
.put("id",1234),
625+
valueSchema,new Struct(valueSchema)
626+
.put("op","u")
627+
.put("before", new Struct(valueSchema.field("before").schema())
628+
.put("id",1234)
629+
.put("first_name","Alice_1234")
630+
.put("last_name","van Wonderland")
631+
.put("email","alice_1234@wonder.land"))
632+
.put("after", new Struct(valueSchema.field("after").schema())
633+
.put("id",1234)
634+
.put("first_name","Alice1234")
635+
.put("last_name","in Wonderland")
636+
.put("email","alice1234@wonder.land"))
637+
//.put("source",...) //NOTE: SKIPPED SINCE NOT USED AT ALL SO FAR
638+
,1
639+
));
640+
add(new SinkRecord("test-topic",0,
641+
keySchema,new Struct(keySchema)
642+
.put("id",1234),
643+
valueSchema,new Struct(valueSchema)
644+
.put("op","d")
645+
.put("before", new Struct(valueSchema.field("before").schema())
646+
.put("id",1234)
647+
.put("first_name","Alice1234")
648+
.put("last_name","in Wonderland")
649+
.put("email","alice1234@wonder.land"))
650+
.put("after", null)
651+
//.put("source",...) //NOTE: SKIPPED SINCE NOT USED AT ALL SO FAR
652+
,2));
653+
}};
654+
655+
MongoDbSinkTask sinkTask = new MongoDbSinkTask();
656+
Map<String,String> props = new HashMap<>();
657+
props.put("topics","dbserver1.catalogA.tableB");
658+
props.put(MongoDbSinkConnectorConfig.MONGODB_COLLECTIONS_CONF,"dbserver1.catalogA.tableB");
659+
props.put(MongoDbSinkConnectorConfig.MONGODB_COLLECTION_CONF
660+
+"."+"dbserver1.catalogA.tableB","dbserver1.catalogA.tableB");
661+
props.put(MongoDbSinkConnectorConfig.MONGODB_CHANGE_DATA_CAPTURE_HANDLER
662+
+"."+"dbserver1.catalogA.tableB",RdbmsHandler.class.getName());
663+
props.put(MongoDbSinkConnectorConfig.MONGODB_CHANGE_DATA_CAPTURE_HANDLER_OPERATIONS
664+
+"."+"dbserver1.catalogA.tableB","");
665+
sinkTask.start(props);
666+
667+
List<? extends WriteModel> writeModels =
668+
sinkTask.buildWriteModelCDC(sinkRecords,"dbserver1.catalogA.tableB");
669+
670+
assertNotNull(writeModels, "WriteModel list was null");
671+
672+
assertTrue(writeModels.isEmpty(), "WriteModel list was NOT empty");
673+
674+
}
675+
600676
private static Schema getRdbmsKeySchemaSample() {
601677
return SchemaBuilder.struct()
602678
.name("dbserver1.catalogA.tableB.Key")

0 commit comments

Comments
 (0)