Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/BUG-REPORT.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ body:
description: What version of our software are you running?
options:
- latest
- 2.6.0 (Default)
- 2.7.0 (Default)
- 1.3.5
- older (<1.3.5)
validations:
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,14 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| `mq.max.poll.blocked.time.ms` | How long the connector will wait for the previous batch of messages to be delivered to Kafka before starting a new poll | integer | 2000 | It is important that this is less than the time defined for `task.shutdown.graceful.timeout.ms` as that is how long connect will wait for the task to perform lifecycle operations. |
| `mq.client.reconnect.options` | Options governing MQ reconnection. | string | ASDEF | ASDEF, ANY, QMGR, DISABLED |
| `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
| `mq.receive.subsequent.timeout.ms` | The timeout (in milliseconds) for receiving messages from the queue manager on the subsequent receives before returning to Kafka Connect. | long | 0 | 1 or greater |
| `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater |
| `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater |
| `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater |
| `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. |
| `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. |
| `mq.record.builder.json.schemas.enable` | (JSON record builder only) Include schemas within Kafka messages. This is used as the `schemas.enable` config for the JsonConverter used by the JSON record builder. If true, a schema must be provided - either using `mq.record.builder.json.schema.content` or by embedding a schema within each MQ message payload. | boolean | false | |
Copy link
Contributor

@Joel-hanson Joel-hanson Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have said this in my first review, can we just use one config, that is any content in schema.content to indicate if the json schema is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would be a good idea, for two reasons:

  1. Functionality
    • Schemas can be embedded within messages - therefore, it needs to be possible to enable use of schemas without providing a schema.content config
  2. Consistency
    • JsonRecordBuilder is a thin wrapper around Kafka's JsonConverter. I used these config option names intentionally to be consistent with Kafka's own JSON config. I think the value we would get from having our own configuration behaviour, different to Kafka's, would be outweighed by the benefit and clarity of being able to say "You know the Kafka JSON options? It's the same as that"

| `mq.record.builder.json.schema.content` | (JSON record builder only) Schema to use for all messages. This is used as the `schema.content` config for the JsonConverter used by the JSON record builder. If provided, this will be used in preference to any schema embedded in MQ messages. | string | | This should be a Kafka Connect schema, as used by JsonConverter. |

### Using a CCDT file

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>2.6.0</version>
<version>2.7.0</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder;
import com.ibm.eventstreams.connect.mqsource.utils.MQTestUtil;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
Expand Down Expand Up @@ -102,6 +103,7 @@ public static Map<String, String> getDefaultConnectorProperties() {
props.put("mq.channel.name", CHANNEL_NAME);
props.put("mq.queue", DEFAULT_SOURCE_QUEUE);
props.put("mq.user.authentication.mqcsp", "false");
props.put("mq.record.builder", DefaultRecordBuilder.class.getCanonicalName());
props.put("topic", "mytopic");
return props;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import javax.jms.TextMessage;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectorConfig;
Expand Down Expand Up @@ -167,6 +168,189 @@ public void verifyJmsJsonMessages() throws Exception {
}
}

// verify that user can use the standard approach for the JsonConverter
// of embedding schemas in message payloads (enabling this using a
// record builder config option)
@Test
public void verifyJmsSchemaMessages() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");

connectTask.start(connectorConfigProps);

final List<Message> messages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
messages.add(getJmsContext().createTextMessage(
"{\n" +
"\"schema\": {\n" +
" \"type\": \"struct\", \n" +
" \"fields\": [\n" +
" {\n" +
" \"field\": \"idx\", \n" +
" \"type\": \"int64\"\n" +
" },\n" +
" {\n" +
" \"field\": \"test\", \n" +
" \"type\": \"string\"\n" +
" }" +
" ]\n" +
"}, " +
"\"payload\": { " +
" \"idx\": " + i + ", " +
" \"test\" : \"abcdef\" " +
"}" +
"}"));
}
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(5, kafkaMessages.size());

for (int i = 0; i < 5; i++) {
final SourceRecord kafkaMessage = kafkaMessages.get(i);
assertNull(kafkaMessage.key());

assertNotNull(kafkaMessage.valueSchema());
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("test").schema());

final Struct value = (Struct) kafkaMessage.value();
assertEquals(Long.valueOf(i), value.getInt64("idx"));
assertEquals("abcdef", value.getString("test"));

connectTask.commitRecord(kafkaMessage, null);
}
}

// verify that a reusable schema can be provided to the JSON record builder
// as part of the connector config, so that this can be reused across
// multiple MQ messages
@Test
public void verifyJmsReusableSchemaMessages() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final String SCHEMA = "{\n" +
" \"type\": \"struct\", \n" +
" \"fields\": [\n" +
" {\n" +
" \"field\": \"idx\", \n" +
" \"type\": \"int32\"\n" +
" },\n" +
" {\n" +
" \"field\": \"a\", \n" +
" \"type\": \"string\"\n" +
" },\n" +
" {\n" +
" \"field\": \"b\", \n" +
" \"type\": \"int64\"\n" +
" },\n" +
" {\n" +
" \"field\": \"c\", \n" +
" \"type\": \"double\"\n" +
" },\n" +
" {\n" +
" \"field\": \"d\", \n" +
" \"type\": \"boolean\"\n" +
" },\n" +
" {\n" +
" \"field\": \"e\", \n" +
" \"type\": \"float\"\n" +
" },\n" +
" {\n" +
" \"field\": \"f\", \n" +
" \"type\": \"array\",\n" +
" \"items\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"field\": \"g\", \n" +
" \"type\": \"array\", \n" +
" \"items\": {\n" +
" \"type\": \"int32\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"field\": \"h\", \n" +
" \"type\": \"struct\", \n" +
" \"fields\": [\n" +
" {\n" +
" \"field\": \"innerstr\", \n" +
" \"type\": \"string\"\n" +
" },\n" +
" {\n" +
" \"field\": \"innernum\", \n" +
" \"type\": \"int64\"\n" +
" }\n" +
" ]\n" +
" }\n" +
" ]\n" +
"}";

final Map<String, String> connectorConfigProps = createDefaultConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
connectorConfigProps.put("mq.record.builder.json.schemas.enable", "true");
connectorConfigProps.put("mq.record.builder.json.schema.content", SCHEMA);

connectTask.start(connectorConfigProps);

final List<Message> messages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
messages.add(getJmsContext().createTextMessage(
"{ " +
"\"idx\": " + i + ", \n" +
"\"a\" : \"test\", \n" +
"\"b\" : 1234, \n" +
"\"c\" : 5.67, \n" +
"\"d\" : false, \n" +
"\"e\" : 12.34, \n" +
"\"f\" : [ \"a\", \"b\", \"c\" ], \n" +
"\"g\" : [ 1, 2, 3 ], \n" +
"\"h\" : { \"innerstr\" : \"testing\", \"innernum\" : 89 }" +
"}"));
}
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);

final List<SourceRecord> kafkaMessages = connectTask.poll();
assertEquals(5, kafkaMessages.size());

for (int i = 0; i < 5; i++) {
final SourceRecord kafkaMessage = kafkaMessages.get(i);
assertNull(kafkaMessage.key());

assertNotNull(kafkaMessage.valueSchema());
assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("idx").schema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("a").schema());
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("b").schema());
assertEquals(Schema.FLOAT64_SCHEMA, kafkaMessage.valueSchema().field("c").schema());
assertEquals(Schema.BOOLEAN_SCHEMA, kafkaMessage.valueSchema().field("d").schema());
assertEquals(Schema.FLOAT32_SCHEMA, kafkaMessage.valueSchema().field("e").schema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("f").schema().valueSchema());
assertEquals(Schema.INT32_SCHEMA, kafkaMessage.valueSchema().field("g").schema().valueSchema());
assertEquals(Schema.STRING_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innerstr").schema());
assertEquals(Schema.INT64_SCHEMA, kafkaMessage.valueSchema().field("h").schema().field("innernum").schema());

final Struct value = (Struct) kafkaMessage.value();
assertEquals(Integer.valueOf(i), value.getInt32("idx"));
assertEquals("test", value.getString("a"));
assertEquals(Long.valueOf(1234), value.getInt64("b"));
assertEquals(Double.valueOf(5.67), value.getFloat64("c"));
assertEquals(false, value.getBoolean("d"));
assertEquals(Float.valueOf(12.34f), value.getFloat32("e"));
assertArrayEquals(new String[]{ "a", "b", "c"}, value.getArray("f").toArray(new String[]{}));
assertArrayEquals(new Integer[] { 1, 2, 3 }, value.getArray("g").toArray(new Integer[]{}));
assertEquals("testing", value.getStruct("h").getString("innerstr"));
assertEquals(Long.valueOf(89), value.getStruct("h").getInt64("innernum"));

connectTask.commitRecord(kafkaMessage, null);
}
}

@Test
public void verifyMQMessage() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void buildFromJmsTextMessage() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);

// verify the Kafka record
Expand All @@ -82,6 +83,7 @@ public void buildFromJmsBytesMessage() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final SourceRecord record = builder.toSourceRecord(getJmsContext(), topic, isJMS, message);

// verify the Kafka record
Expand All @@ -100,6 +102,7 @@ public void buildFromJmsMapMessage() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final RecordBuilderException exc = assertThrows(RecordBuilderException.class, () -> {
builder.toSourceRecord(getJmsContext(), topic, isJMS, message);
});
Expand All @@ -115,6 +118,7 @@ public void buildFromJmsTestJsonError() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
builder.configure(getDefaultConnectorProperties());
final DataException exec = assertThrows(DataException.class, () -> builder.toSourceRecord(getJmsContext(), topic, isJMS, message));
assertEquals("Converting byte[] to Kafka Connect data failed due to serialization error: ", exec.getMessage());
}
Expand Down Expand Up @@ -143,7 +147,7 @@ public void buildFromJmsTestErrorToleranceNone() throws Exception {

// use the builder to convert it to a Kafka record
final JsonRecordBuilder builder = new JsonRecordBuilder();
final HashMap<String, String> config = new HashMap<String, String>();
final Map<String, String> config = getDefaultConnectorProperties();
config.put("errors.tolerance", "none");
config.put("mq.message.body.jms", "true");
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.JsonRecordBuilder");
Expand Down Expand Up @@ -187,7 +191,7 @@ public void testToSourceRecord_JsonRecordBuilder_JsonMessage() throws Exception
assertThat(sourceRecord).isNotNull();
assertThat(sourceRecord.value()).isInstanceOf(Map.class);
assertNull(sourceRecord.valueSchema()); // JSON with no schema

// Verify JSON data
@SuppressWarnings("unchecked")
Map<String, Object> value = (Map<String, Object>) sourceRecord.value();
Expand Down
Loading