From c5e121fb462174ca0f8ab23698c05c992fe45c4f Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Mon, 24 Nov 2025 22:24:25 +0000 Subject: [PATCH 01/10] fix: use of config in tests Some of the existing tests create a JsonRecordBuilder and start using it without configuring it. This is not valid and would not happen in real use as configure() is a required step in the Connect lifecycle. As I want to introduce new config for the record builder, I need to correct this mistake first. Signed-off-by: Dale Lane --- .../mqsource/AbstractJMSContextIT.java | 2 ++ .../builders/JsonRecordBuilderIT.java | 8 ++++-- .../builders/RecordBuilderFactory.java | 4 +-- .../builders/RecordBuilderFactoryTest.java | 26 +++++++++++++------ 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java index 316a47c..090987f 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/AbstractJMSContextIT.java @@ -19,6 +19,7 @@ import com.github.dockerjava.api.model.HostConfig; import com.github.dockerjava.api.model.PortBinding; import com.github.dockerjava.api.model.Ports; +import com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder; import com.ibm.eventstreams.connect.mqsource.utils.MQTestUtil; import com.ibm.mq.jms.MQConnectionFactory; import com.ibm.msg.client.wmq.WMQConstants; @@ -102,6 +103,7 @@ public static Map getDefaultConnectorProperties() { props.put("mq.channel.name", CHANNEL_NAME); props.put("mq.queue", DEFAULT_SOURCE_QUEUE); props.put("mq.user.authentication.mqcsp", "false"); + props.put("mq.record.builder", DefaultRecordBuilder.class.getCanonicalName()); props.put("topic", "mytopic"); return props; } diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java index 93e33df..0fa7746 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilderIT.java @@ -65,6 +65,7 @@ public void buildFromJmsTextMessage() throws Exception { // use the builder to convert it to a Kafka record final JsonRecordBuilder builder = new JsonRecordBuilder(); + builder.configure(getDefaultConnectorProperties()); final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message); // verify the Kafka record @@ -82,6 +83,7 @@ public void buildFromJmsBytesMessage() throws Exception { // use the builder to convert it to a Kafka record final JsonRecordBuilder builder = new JsonRecordBuilder(); + builder.configure(getDefaultConnectorProperties()); final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message); // verify the Kafka record @@ -100,6 +102,7 @@ public void buildFromJmsMapMessage() throws Exception { // use the builder to convert it to a Kafka record final JsonRecordBuilder builder = new JsonRecordBuilder(); + builder.configure(getDefaultConnectorProperties()); final RecordBuilderException exc = assertThrows(RecordBuilderException.class, () -> { builder.toSourceRecord(getJmsContext(), topic, isJMS, message); }); @@ -115,6 +118,7 @@ public void buildFromJmsTestJsonError() throws Exception { // use the builder to convert it to a Kafka record final JsonRecordBuilder builder = new JsonRecordBuilder(); + builder.configure(getDefaultConnectorProperties()); final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message)); assertEquals("Converting byte[] to Kafka Connect data failed due to serialization error: ", exec.getMessage()); } @@ -143,7 +147,7 @@ public void buildFromJmsTestErrorToleranceNone() throws Exception { // use the builder to convert it to a Kafka record final JsonRecordBuilder builder = new JsonRecordBuilder(); - final HashMap config = new HashMap(); + final Map config = getDefaultConnectorProperties(); config.put("errors.tolerance", "none"); config.put("mq.message.body.jms", "true"); config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); @@ -187,7 +191,7 @@ public void testToSourceRecord_JsonRecordBuilder_JsonMessage() throws Exception assertThat(sourceRecord).isNotNull(); assertThat(sourceRecord.value()).isInstanceOf(Map.class); assertNull(sourceRecord.valueSchema()); // JSON with no schema - + // Verify JSON data @SuppressWarnings("unchecked") Map value = (Map) sourceRecord.value(); diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactory.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactory.java index a7a10a8..e9ee351 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactory.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactory.java @@ -32,7 +32,7 @@ public static RecordBuilder getRecordBuilder(final Map props) { ); } - protected static RecordBuilder getRecordBuilder(final String builderClass, final Map props) { + private static RecordBuilder getRecordBuilder(final String builderClass, final Map props) { final RecordBuilder builder; @@ -40,7 +40,7 @@ protected static RecordBuilder getRecordBuilder(final String builderClass, final final Class c = Class.forName(builderClass).asSubclass(RecordBuilder.class); builder = c.newInstance(); builder.configure(props); - } catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException | NullPointerException exc) { + } catch (ClassNotFoundException | ClassCastException | IllegalAccessException | InstantiationException exc) { log.error("Could not instantiate message builder {}", builderClass); throw new RecordBuilderException("Could not instantiate message builder", exc); } diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java index c79bc5f..50a22e0 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java @@ -18,32 +18,42 @@ import org.assertj.core.api.Assertions; import org.junit.Test; +import com.ibm.eventstreams.connect.mqsource.MQSourceConnector; + import java.util.HashMap; import java.util.Map; public class RecordBuilderFactoryTest { - final Map emptyProps = new HashMap<>(); + final Map placeholderProps = Map.of( + "mq.queue.manager", "placeholder", + "mq.queue", "placeholder", + "topic", "placeholder" + ); @Test public void testGetRecordBuilder_ForJsonRecordBuilder() { - RecordBuilder recordBuilder = RecordBuilderFactory.getRecordBuilder("com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder", emptyProps); + Map props = new HashMap<>(placeholderProps); + props.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + + RecordBuilder recordBuilder = RecordBuilderFactory.getRecordBuilder(props); Assertions.assertThat(recordBuilder).isInstanceOf(JsonRecordBuilder.class); } @Test public void testGetRecordBuilder_ForDefaultRecordBuilder() { - RecordBuilder recordBuilder = RecordBuilderFactory.getRecordBuilder("com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder", emptyProps); + Map props = new HashMap<>(placeholderProps); + props.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder"); + + RecordBuilder recordBuilder = RecordBuilderFactory.getRecordBuilder(props); Assertions.assertThat(recordBuilder).isInstanceOf(DefaultRecordBuilder.class); } @Test(expected = RecordBuilderException.class) public void testGetRecordBuilder_JunkClass() { - RecordBuilderFactory.getRecordBuilder("casjsajhasdhusdo;iasd", emptyProps); - } + Map props = new HashMap<>(placeholderProps); + props.put(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER, "casjsajhasdhusdo;iasd"); - @Test(expected = RecordBuilderException.class) - public void testGetRecordBuilder_NullProps() { - RecordBuilderFactory.getRecordBuilder("casjsajhasdhusdo;iasd", null); + RecordBuilderFactory.getRecordBuilder(props); } } \ No newline at end of file From b632743e4e19e72cd911a63ea9aa0456e1a6a1da Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Mon, 24 Nov 2025 22:47:35 +0000 Subject: [PATCH 02/10] test: tests and docs to show intention for JSON schema support I've chosen to support Kafka Connect's JSON schema support, rather than the different (and more widely understood) JSON schema. While supporting "standard" JSON schema would have simplified the user config in some respects, this would have left the MQ Connector with responsibility of performing the (ambiguous) conversion from the user-provided JSON schema to the schema used in Connect. As there is not a 1:1 mapping between these two schema types, I think it would be difficult to do such a conversion in a way that always meets user expectations. Instead, by making the user provide a Connect JSON schema, I'm proposing forcing the user to manually convert any json schema they may already have into a Connect schema - forcing them to make the appropriate choices in mapping between the two type systems. This was a difficult trade-off to make, as I'm favouring unambiguity of config over ease of config (if we assume that more users are comfortable writing "standard" JSON schemas than Connect JSON schemas). To try and catch confusions in this, I've included a unit test to ensure that we reject non-Connect schemas. Signed-off-by: Dale Lane --- README.md | 4 +- .../connect/mqsource/MQSourceTaskIT.java | 184 ++++++++++++++++++ .../mqsource/MQSourceConnectorTest.java | 78 ++++++++ 3 files changed, 265 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 50a1e4f..fc21710 100644 --- a/README.md +++ b/README.md @@ -306,12 +306,14 @@ The configuration options for the Kafka Connect source connector for IBM MQ are | `mq.max.poll.blocked.time.ms` | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. | | `mq.client.reconnect.options` | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED | | `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater | -| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater | +| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater | | `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater | | `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater | | `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater | | `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. | | `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. | +| `mq.record.builder.json.schemas.enable` | (JSON record builder only) Include schemas within Kafka messages. This is used as the `schemas.enable` config for the JsonConverter used by the JSON record builder | boolean | false | | +| `mq.record.builder.json.schema.content` | (JSON record builder only) Schema to use for all messages. This is used as the `schema.content` config for the JsonConverter used by the JSON record builder. | string | | This should be a Kafka Connect schema, as used by JsonConverter. | ### Using a CCDT file diff --git a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java index 50d2651..51c38ac 100644 --- a/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java +++ b/src/integration/java/com/ibm/eventstreams/connect/mqsource/MQSourceTaskIT.java @@ -50,6 +50,7 @@ import javax.jms.TextMessage; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.runtime.ConnectorConfig; @@ -167,6 +168,189 @@ public void verifyJmsJsonMessages() throws Exception { } } + // verify that user can use the standard approach for the JsonConverter + // of embedding schemas in message payloads (enabling this using a + // record builder config option) + @Test + public void verifyJmsSchemaMessages() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true"); + + connectTask.start(connectorConfigProps); + + final List messages = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + messages.add(getJmsContext().createTextMessage( + "{\n" + + "\"schema\": {\n" + + " \"type\": \"struct\", \n" + + " \"fields\": [\n" + + " {\n" + + " \"field\": \"idx\", \n" + + " \"type\": \"int64\"\n" + + " },\n" + + " {\n" + + " \"field\": \"test\", \n" + + " \"type\": \"string\"\n" + + " }" + + " ]\n" + + "}, " + + "\"payload\": { " + + " \"idx\": " + i + ", " + + " \"test\" : \"abcdef\" " + + "}" + + "}")); + } + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages); + + final List kafkaMessages = connectTask.poll(); + assertEquals(5, kafkaMessages.size()); + + for (int i = 0; i < 5; i++) { + final SourceRecord kafkaMessage = kafkaMessages.get(i); + assertNull(kafkaMessage.key()); + + assertNotNull(kafkaMessage.valueSchema()); + assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("idx").schema()); + assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("test").schema()); + + final Struct value = (Struct) kafkaMessage.value(); + assertEquals(Long.valueOf(i), value.getInt64("idx")); + assertEquals("abcdef", value.getString("test")); + + connectTask.commitRecord(kafkaMessage, null); + } + } + + // verify that a reusable schema can be provided to the JSON record builder + // as part of the connector config, so that this can be reused across + // multiple MQ messages + @Test + public void verifyJmsReusableSchemaMessages() throws Exception { + connectTask = getSourceTaskWithEmptyKafkaOffset(); + + final String SCHEMA = "{\n" + + " \"type\": \"struct\", \n" + + " \"fields\": [\n" + + " {\n" + + " \"field\": \"idx\", \n" + + " \"type\": \"int32\"\n" + + " },\n" + + " {\n" + + " \"field\": \"a\", \n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"field\": \"b\", \n" + + " \"type\": \"int64\"\n" + + " },\n" + + " {\n" + + " \"field\": \"c\", \n" + + " \"type\": \"double\"\n" + + " },\n" + + " {\n" + + " \"field\": \"d\", \n" + + " \"type\": \"boolean\"\n" + + " },\n" + + " {\n" + + " \"field\": \"e\", \n" + + " \"type\": \"float\"\n" + + " },\n" + + " {\n" + + " \"field\": \"f\", \n" + + " \"type\": \"array\",\n" + + " \"items\": {\n" + + " \"type\": \"string\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"field\": \"g\", \n" + + " \"type\": \"array\", \n" + + " \"items\": {\n" + + " \"type\": \"int32\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"field\": \"h\", \n" + + " \"type\": \"struct\", \n" + + " \"fields\": [\n" + + " {\n" + + " \"field\": \"innerstr\", \n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"field\": \"innernum\", \n" + + " \"type\": \"int64\"\n" + + " }\n" + + " ]\n" + + " }\n" + + " ]\n" + + "}"; + + final Map connectorConfigProps = createDefaultConnectorProperties(); + connectorConfigProps.put("mq.message.body.jms", "true"); + connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true"); + connectorConfigProps.put("mq.record.builder.json.schema.content", SCHEMA); + + connectTask.start(connectorConfigProps); + + final List messages = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + messages.add(getJmsContext().createTextMessage( + "{ " + + "\"idx\": " + i + ", \n" + + "\"a\" : \"test\", \n" + + "\"b\" : 1234, \n" + + "\"c\" : 5.67, \n" + + "\"d\" : false, \n" + + "\"e\" : 12.34, \n" + + "\"f\" : [ \"a\", \"b\", \"c\" ], \n" + + "\"g\" : [ 1, 2, 3 ], \n" + + "\"h\" : { \"innerstr\" : \"testing\", \"innernum\" : 89 }" + + "}")); + } + putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages); + + final List kafkaMessages = connectTask.poll(); + assertEquals(5, kafkaMessages.size()); + + for (int i = 0; i < 5; i++) { + final SourceRecord kafkaMessage = kafkaMessages.get(i); + assertNull(kafkaMessage.key()); + + assertNotNull(kafkaMessage.valueSchema()); + assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("idx").schema()); + assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("a").schema()); + assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("b").schema()); + assertEquals(Schema.FLOAT64_SCHEMA, kafkaMessage.valueSchema().field("c").schema()); + assertEquals(Schema.BOOLEAN_SCHEMA, kafkaMessage.valueSchema().field("d").schema()); + assertEquals(Schema.FLOAT32_SCHEMA, kafkaMessage.valueSchema().field("e").schema()); + assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("f").schema().valueSchema()); + assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("g").schema().valueSchema()); + assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innerstr").schema()); + assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innernum").schema()); + + final Struct value = (Struct) kafkaMessage.value(); + assertEquals(Integer.valueOf(i), value.getInt32("idx")); + assertEquals("test", value.getString("a")); + assertEquals(Long.valueOf(1234), value.getInt64("b")); + assertEquals(Double.valueOf(5.67), value.getFloat64("c")); + assertEquals(false, value.getBoolean("d")); + assertEquals(Float.valueOf(12.34f), value.getFloat32("e")); + assertArrayEquals(new String[]{ "a", "b", "c"}, value.getArray("f").toArray(new String[]{})); + assertArrayEquals(new Integer[] { 1, 2, 3 }, value.getArray("g").toArray(new Integer[]{})); + assertEquals("testing", value.getStruct("h").getString("innerstr")); + assertEquals(Long.valueOf(89), value.getStruct("h").getInt64("innernum")); + + connectTask.commitRecord(kafkaMessage, null); + } + } + @Test public void verifyMQMessage() throws Exception { connectTask = getSourceTaskWithEmptyKafkaOffset(); diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java index 654465f..d1db8f1 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java @@ -186,4 +186,82 @@ public void testValidateRetryDelayConfigWithDefaultValues() { .flatMap(cv -> cv.errorMessages().stream()) .anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'."))); } + + // verify that providing a schema that isn't JSON will be rejected + @Test + public void testValidateJsonSchemaConfig() { + final Map configProps = new HashMap(); + configProps.put("mq.record.builder.json.schemas.enable", "true"); + configProps.put("mq.record.builder.json.schema.content", "Hello world"); + + final Config config = new MQSourceConnector().validate(configProps); + + assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); + assertTrue(config.configValues().stream() + .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT)) + .flatMap(cv -> cv.errorMessages().stream()) + .anyMatch(msg -> msg.contains("should be a Kafka Connect schema"))); + } + + // verify that providing JSON (such as JSON schema) that isn't a + // Kafka Connect JSON Converter schema will be rejected + @Test + public void testValidateJsonConnectSchemaConfig() { + final Map configProps = new HashMap(); + configProps.put("mq.record.builder.json.schemas.enable", "true"); + configProps.put("mq.record.builder.json.schema.content", "{\n" + + " \"$id\": \"https:example.com/person.schema.json\",\n" + + " \"$schema\": \"https:json-schema.org/draft/2020-12/schema\",\n" + + " \"title\": \"Person\",\n" + + " \"type\": \"object\",\n" + + " \"properties\": {\n" + + " \"firstName\": {\n" + + " \"type\": \"string\",\n" + + " \"description\": \"The person's first name.\"\n" + + " },\n" + + " \"lastName\": {\n" + + " \"type\": \"string\",\n" + + " \"description\": \"The person's last name.\"\n" + + " },\n" + + " \"age\": {\n" + + " \"description\": \"Age in years which must be equal to or greater than zero.\",\n" + + " \"type\": \"integer\",\n" + + " \"minimum\": 0\n" + + " }\n" + + " }\n" + + "}"); + + final Config config = new MQSourceConnector().validate(configProps); + + assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); + assertTrue(config.configValues().stream() + .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT)) + .flatMap(cv -> cv.errorMessages().stream()) + .anyMatch(msg -> msg.contains("Unknown schema type"))); + } + + // verify that Kafka Connect JSON Converter schemas containing + // invalid types will be rejected + @Test + public void testValidateJsonSchemaTypesConfig() { + final Map configProps = new HashMap(); + configProps.put("mq.record.builder.json.schemas.enable", "true"); + configProps.put("mq.record.builder.json.schema.content", "{\n" + + " \"type\": \"struct\", \n" + + " \"fields\": [\n" + + " {\n" + + " \"field\": \"test\", \n" + + " \"type\": \"not-a-real-type\"\n" + + " }\n" + + " ]\n" + + "}"); + + final Config config = new MQSourceConnector().validate(configProps); + + assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); + assertTrue(config.configValues().stream() + .filter(cv -> cv.name().equals(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT)) + .flatMap(cv -> cv.errorMessages().stream()) + .anyMatch(msg -> msg.contains("Unknown schema type: not-a-real-type"))); + } } From 553eb742e1aa55ffb204a6cd028ad00aab416ea6 Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Mon, 24 Nov 2025 22:55:13 +0000 Subject: [PATCH 03/10] feat: support for schemas in JSON record builder This commit introduces support for emitting structured records from the JSON record builder. This will allow the MQ Source Connector to read JSON string messages from MQ, and produce them to Kafka using any standard Converter (e.g. to produce them in Avro or Protobuf formats if desired). The JsonConverter dependency used in JSON record builder has support for this from Kafka Connect v4.2, so the simplest implementation would be to update the dependency in pom.xml to version 4.2, and just pass through the schemas.enable and schema.content configuration properties to the converter and leave the Converter to do everything. This felt like an overly aggressive dependency jump, so in the interest of continuing to support Connect 3.x versions, I've implemented a fall-back implementation that reuses the schema "envelope" approach present in JsonConverter 3.x The additional string operations this will incur for every message will almost certainly impact performance, so I see this as a temporary workaround that we should remove as soon as we feel that Connect 4.x adoption is sufficient. Signed-off-by: Dale Lane --- .../connect/mqsource/MQSourceConnector.java | 69 +++++++++++++++++++ .../mqsource/builders/JsonRecordBuilder.java | 62 +++++++++++++++-- 2 files changed, 127 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index 0337954..b09fcb0 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -33,12 +33,19 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; +import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.connect.source.ConnectorTransactionBoundaries; import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.storage.ConverterType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.databind.JsonNode; + public class MQSourceConnector extends SourceConnector { private static final Logger log = LoggerFactory.getLogger(MQSourceConnector.class); @@ -212,6 +219,15 @@ public class MQSourceConnector extends SourceConnector { "keys, all error context header keys will start with __connect.errors."; private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers"; + public static final String CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE = "mq.record.builder.json.schemas.enable"; + public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE = "Include schemas within the Kafka messages produced by the JSON record builder."; + public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE = "Enable Schemas"; + + public static final String CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "mq.record.builder.json.schema.content"; + public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "When set, this is used as the schema for all messages. This should be a Kafka Connect schema, as used by JsonConverter."; + public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "Schema Content"; + + // Define valid reconnect options public static final String[] CONFIG_VALUE_MQ_VALID_RECONNECT_OPTIONS = { CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_ASDEF, @@ -666,6 +682,22 @@ null, new ReadableFile(), 32, ConfigDef.Width.MEDIUM, CONFIG_DISPLAY_MAX_POLL_TIME); + CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE, + Type.BOOLEAN, + false, new ConfigDef.NonNullValidator(), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE, + CONFIG_GROUP_MQ, 33, + Width.SHORT, + CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE); + CONFIGDEF.define(CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT, + Type.STRING, + null, new SchemaValidator(), + Importance.LOW, + CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT, + CONFIG_GROUP_MQ, 34, + Width.MEDIUM, + CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT); CONFIGDEF.define(CONFIG_NAME_TOPIC, Type.STRING, @@ -716,6 +748,43 @@ public void ensureValid(final String name, final Object value) { } } + private static class SchemaValidator implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + final String strValue = (String) value; + if (value == null || strValue.trim().isEmpty()) { + // only validate non-empty schemas + return; + } + + // Start with a quick and simple "sniff test" on the provided schema + // by checking if it starts and ends in curly-parentheses + // This will quickly catch obvious configuration errors, such as + // providing a name, id, or file location for a schema + final String trimmedStr = strValue.trim(); + if (!trimmedStr.startsWith("{") || !trimmedStr.endsWith("}")) { + throw new ConfigException(name, value, "Value should be a Kafka Connect schema"); + } + + // Create a temporary JsonDeserializer/JsonConverter to parse the + // provided schema. + // The aim for doing this is to catch any invalid schemas at + // startup time, rather than allow this to go unnoticed until + // the first MQ message is received (potentially a long time + // later). + try ( + final JsonDeserializer deserializer = new JsonDeserializer(); + final JsonConverter conv = new JsonConverter() + ) { + final JsonNode jsonStr = deserializer.deserialize(trimmedStr, trimmedStr.getBytes()); + conv.configure(Map.of(JsonConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName())); + conv.asConnectSchema(jsonStr); + } catch (final DataException exc) { + throw new ConfigException(name, value, exc.getMessage()); + } + } + } + /** * Signals that this connector is not capable of defining other transaction boundaries. * A new transaction will be started and committed for every batch of records returned by {@link MQSourceTask#poll()}. diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java index cf25d03..169e6a3 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java @@ -18,17 +18,23 @@ import static java.nio.charset.StandardCharsets.UTF_8; import java.util.HashMap; +import java.util.Map; + import javax.jms.BytesMessage; import javax.jms.JMSContext; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; +import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.json.JsonConverterConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.ibm.eventstreams.connect.mqsource.MQSourceConnector; + /** * Builds Kafka Connect SourceRecords from messages. It parses the bytes of the payload of JMS * BytesMessage and TextMessage as JSON and creates a SourceRecord with a null schema. @@ -38,13 +44,53 @@ public class JsonRecordBuilder extends BaseRecordBuilder { private JsonConverter converter; + // From Kafka Connect 4.2 onwards, JsonConverter includes schema support + // To support earlier versions of the dependency, the record builder includes a + // workaround implementation. + // This variable should be true where the workaround implementation is required. + private boolean recordBuilderSchemaSupport = false; + + // Workaround for supporting schemas is to embed the schema in the message payload + // given to the JsonConverter. This variable contains a String to concatenate with + // the string received from MQ in order to achieve this. + private String schemaSupportEnvelope = null; + public JsonRecordBuilder() { log.info("Building records using com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); converter = new JsonConverter(); + } + + /** + * Configure this class. In addition to the MQ message handling config + * used by BaseRecordBuilder, this also configures the JsonConverter + * used by this record builder to parse JSON messages from MQ. + */ + @Override + public void configure(final Map props) { + super.configure(props); + + final AbstractConfig config = new AbstractConfig(MQSourceConnector.CONFIGDEF, props); + final boolean schemasEnable = config.getBoolean(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE); + String schemaContent = null; + if (schemasEnable) { + schemaContent = config.getString(MQSourceConnector.CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT); + if (schemaContent != null) { + schemaContent = schemaContent.trim(); + } + } + + if (schemasEnable && schemaContent != null && + !JsonConverterConfig.configDef().names().contains("schema.content")) { - // We just want the payload, not the schema in the output message - final HashMap m = new HashMap<>(); - m.put("schemas.enable", "false"); + // support for schemas provided separately from message payloads is requested + // but not available natively within the JsonConverter present in the classpath + recordBuilderSchemaSupport = true; + schemaSupportEnvelope = "{\"schema\": " + schemaContent + ", \"payload\": "; + } + + final Map m = new HashMap<>(); + m.put("schemas.enable", Boolean.toString(schemasEnable)); + m.put("schema.content", schemaContent); // Convert the value, not the key (isKey == false) converter.configure(m, false); @@ -77,6 +123,14 @@ public SchemaAndValue getValue(final JMSContext context, final String topic, fin throw new RecordBuilderException("Unsupported JMS message type"); } - return converter.toConnectData(topic, payload); + if (recordBuilderSchemaSupport) { + return converter.toConnectData(topic, + // embed schema in the event payload + (schemaSupportEnvelope + new String(payload) + "}").getBytes()); + } else { + return converter.toConnectData(topic, + // submit the payload as-is to the converter + payload); + } } } \ No newline at end of file From 892c7f84a813bdef813280b8fc0a1aa59a6900e3 Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Mon, 24 Nov 2025 23:02:04 +0000 Subject: [PATCH 04/10] chore: prepare new release version Signed-off-by: Dale Lane --- .github/ISSUE_TEMPLATE/BUG-REPORT.yml | 2 +- pom.xml | 2 +- .../ibm/eventstreams/connect/mqsource/MQSourceConnector.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml index 1eccc3f..4588606 100644 --- a/.github/ISSUE_TEMPLATE/BUG-REPORT.yml +++ b/.github/ISSUE_TEMPLATE/BUG-REPORT.yml @@ -58,7 +58,7 @@ body: description: What version of our software are you running? options: - latest - - 2.6.0 (Default) + - 2.7.0 (Default) - 1.3.5 - older (<1.3.5) validations: diff --git a/pom.xml b/pom.xml index ca65181..6de42e8 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ com.ibm.eventstreams.connect kafka-connect-mq-source jar - 2.6.0 + 2.7.0 kafka-connect-mq-source IBM Corporation diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index b09fcb0..ea964db 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -240,7 +240,7 @@ public class MQSourceConnector extends SourceConnector { CONFIG_VALUE_MQ_CLIENT_RECONNECT_OPTION_DISABLED.toLowerCase(Locale.ENGLISH) }; - public static String version = "2.6.0"; + public static String version = "2.7.0"; private Map configProps; From 9ad88a1831235f40421680883f72e90b93692495 Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Tue, 25 Nov 2025 09:01:57 +0000 Subject: [PATCH 05/10] chore: revert to Java 8 syntax I didn't realise we were still using Java 8 to build the connector. Given that it has been deprecated in Kafka since v3.0 this doesn't seem like the right choice, but that feels like too significant a change to include in this pull request, so for now I'll just remove the newer syntax I'd used. Signed-off-by: Dale Lane --- .../connect/mqsource/MQSourceConnector.java | 6 +++++- .../builders/RecordBuilderFactoryTest.java | 14 +++++++++----- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index ea964db..a9b9331 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -19,6 +19,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -776,8 +777,11 @@ public void ensureValid(final String name, final Object value) { final JsonDeserializer deserializer = new JsonDeserializer(); final JsonConverter conv = new JsonConverter() ) { + final Map converterConfig = new HashMap<>(); + converterConfig.put(JsonConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName()); + conv.configure(converterConfig); + final JsonNode jsonStr = deserializer.deserialize(trimmedStr, trimmedStr.getBytes()); - conv.configure(Map.of(JsonConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName())); conv.asConnectSchema(jsonStr); } catch (final DataException exc) { throw new ConfigException(name, value, exc.getMessage()); diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java index 50a22e0..fabd1ef 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/builders/RecordBuilderFactoryTest.java @@ -16,6 +16,7 @@ package com.ibm.eventstreams.connect.mqsource.builders; import org.assertj.core.api.Assertions; +import org.junit.Before; import org.junit.Test; import com.ibm.eventstreams.connect.mqsource.MQSourceConnector; @@ -25,11 +26,14 @@ public class RecordBuilderFactoryTest { - final Map placeholderProps = Map.of( - "mq.queue.manager", "placeholder", - "mq.queue", "placeholder", - "topic", "placeholder" - ); + final Map placeholderProps = new HashMap<>(); + + @Before + public void prepareProperties() { + placeholderProps.put("mq.queue.manager", "placeholder"); + placeholderProps.put("mq.queue", "placeholder"); + placeholderProps.put("topic", "placeholder"); + } @Test public void testGetRecordBuilder_ForJsonRecordBuilder() { From 860a23980528c38c325aa6be1a0303db9ce13ed9 Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Tue, 25 Nov 2025 09:37:36 +0000 Subject: [PATCH 06/10] chore: specify charset when converting string to bytes For consistency with other places where this is done Signed-off-by: Dale Lane --- .../connect/mqsource/builders/JsonRecordBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java index 169e6a3..f3ae82b 100755 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java @@ -126,7 +126,7 @@ public SchemaAndValue getValue(final JMSContext context, final String topic, fin if (recordBuilderSchemaSupport) { return converter.toConnectData(topic, // embed schema in the event payload - (schemaSupportEnvelope + new String(payload) + "}").getBytes()); + (schemaSupportEnvelope + new String(payload) + "}").getBytes(UTF_8)); } else { return converter.toConnectData(topic, // submit the payload as-is to the converter From 4067cbb582b038c85bf344ec6d0eefa890685919 Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Wed, 26 Nov 2025 11:20:34 +0000 Subject: [PATCH 07/10] test: unit test for valid JSON schema config Signed-off-by: Dale Lane --- .../mqsource/MQSourceConnectorTest.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java index d1db8f1..c792863 100644 --- a/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java +++ b/src/test/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnectorTest.java @@ -187,6 +187,29 @@ public void testValidateRetryDelayConfigWithDefaultValues() { .anyMatch(msg -> msg.contains("The value of 'mq.reconnect.delay.max.ms' must be greater than or equal to the value of 'mq.reconnect.delay.min.ms'."))); } + // verify that valid JSON schema config will be accepted + @Test + public void testValidJsonSchemaConfig() { + final Map configProps = new HashMap(); + configProps.put("mq.queue.manager", "placeholder"); + configProps.put("mq.queue", "placeholder"); + configProps.put("topic", "placeholder"); + configProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder"); + configProps.put("mq.record.builder.json.schemas.enable", "true"); + configProps.put("mq.record.builder.json.schema.content", "{\n" + + " \"type\": \"struct\", \n" + + " \"fields\": [\n" + + " {\n" + + " \"field\": \"test\", \n" + + " \"type\": \"string\"\n" + + " }\n" + + " ]\n" + + "}"); + + final Config config = new MQSourceConnector().validate(configProps); + assertTrue(config.configValues().stream().allMatch(cv -> cv.errorMessages().size() == 0)); + } + // verify that providing a schema that isn't JSON will be rejected @Test public void testValidateJsonSchemaConfig() { From 0f1099639e770c0e6990388d6d8958697ae0540b Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Wed, 26 Nov 2025 11:26:43 +0000 Subject: [PATCH 08/10] docs: additional explanation about schema in README Signed-off-by: Dale Lane --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fc21710..ee6612f 100644 --- a/README.md +++ b/README.md @@ -312,7 +312,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are | `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater | | `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. | | `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. | -| `mq.record.builder.json.schemas.enable` | (JSON record builder only) Include schemas within Kafka messages. This is used as the `schemas.enable` config for the JsonConverter used by the JSON record builder | boolean | false | | +| `mq.record.builder.json.schemas.enable` | (JSON record builder only) Include schemas within Kafka messages. This is used as the `schemas.enable` config for the JsonConverter used by the JSON record builder. If true, a schema must be provided - either using `mq.record.builder.json.schema.content` or by embedding a schema within each MQ message payload. | boolean | false | | | `mq.record.builder.json.schema.content` | (JSON record builder only) Schema to use for all messages. This is used as the `schema.content` config for the JsonConverter used by the JSON record builder. | string | | This should be a Kafka Connect schema, as used by JsonConverter. | ### Using a CCDT file From 30e5221432f899a04b1bf1c9d791027de17b57c5 Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Wed, 26 Nov 2025 13:35:41 +0000 Subject: [PATCH 09/10] docs: add clarification to README Signed-off-by: Dale Lane --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ee6612f..976a606 100644 --- a/README.md +++ b/README.md @@ -313,7 +313,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are | `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. | | `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. | | `mq.record.builder.json.schemas.enable` | (JSON record builder only) Include schemas within Kafka messages. This is used as the `schemas.enable` config for the JsonConverter used by the JSON record builder. If true, a schema must be provided - either using `mq.record.builder.json.schema.content` or by embedding a schema within each MQ message payload. | boolean | false | | -| `mq.record.builder.json.schema.content` | (JSON record builder only) Schema to use for all messages. This is used as the `schema.content` config for the JsonConverter used by the JSON record builder. | string | | This should be a Kafka Connect schema, as used by JsonConverter. | +| `mq.record.builder.json.schema.content` | (JSON record builder only) Schema to use for all messages. This is used as the `schema.content` config for the JsonConverter used by the JSON record builder. If provided, this will be used in preference to any schema embedded in MQ messages. | string | | This should be a Kafka Connect schema, as used by JsonConverter. | ### Using a CCDT file From cdb924c82bd371bec59fc6c4521abcaae645a388 Mon Sep 17 00:00:00 2001 From: Dale Lane Date: Wed, 26 Nov 2025 18:33:14 +0000 Subject: [PATCH 10/10] docs: update README Co-authored-by: Mark S Taylor Signed-off-by: Dale Lane --- .../ibm/eventstreams/connect/mqsource/MQSourceConnector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java index a9b9331..359e966 100644 --- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java +++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceConnector.java @@ -225,7 +225,7 @@ public class MQSourceConnector extends SourceConnector { public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMAS_ENABLE = "Enable Schemas"; public static final String CONFIG_NAME_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "mq.record.builder.json.schema.content"; - public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "When set, this is used as the schema for all messages. This should be a Kafka Connect schema, as used by JsonConverter."; + public static final String CONFIG_DOCUMENTATION_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "When set, this is used as the schema for all messages. This must be a Kafka Connect schema, as used by JsonConverter."; public static final String CONFIG_DISPLAY_MQ_RECORD_BUILDER_JSON_SCHEMA_CONTENT = "Schema Content";