11= Streaming data from Kafka to a Database using Kafka Connect JDBC Sink
22Robin Moffatt <robin @confluent.io >
3- v1.00, 20 March 2020
3+ v1.10, 11 March 2021
44
55🎥Video: https://rmoff.dev/kafka-jdbc-video
66
7+ _Tested with Confluent Platform 6.1, ksqlDB 0.15, Kafka Connect JDBC connector 5.5.3_
8+
79This uses Docker Compose to run the Kafka Connect worker.
810
9111. Bring the Docker Compose up
@@ -39,14 +41,14 @@ docker exec -it ksqldb ksql http://ksqldb:8088
3941+
4042[source,sql]
4143----
42- CREATE STREAM TEST01 (COL1 INT, COL2 VARCHAR)
44+ CREATE STREAM TEST01 (KEY_COL VARCHAR KEY, COL1 INT, COL2 VARCHAR)
4345 WITH (KAFKA_TOPIC='test01' , PARTITIONS=1, VALUE_FORMAT='AVRO' );
4446----
4547+
4648[source,sql]
4749----
48- INSERT INTO TEST01 (ROWKEY , COL1, COL2) VALUES ('X' ,1,'FOO' );
49- INSERT INTO TEST01 (ROWKEY , COL1, COL2) VALUES ('Y' ,2,'BAR' );
50+ INSERT INTO TEST01 (KEY_COL , COL1, COL2) VALUES ('X' ,1,'FOO' );
51+ INSERT INTO TEST01 (KEY_COL , COL1, COL2) VALUES ('Y' ,2,'BAR' );
5052----
5153+
5254[source,sql]
@@ -61,17 +63,19 @@ PRINT test01 FROM BEGINNING LIMIT 2;
6163----
6264curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
6365 -H "Content-Type: application/json" -d '{
64- "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
65- "connection.url": "jdbc:mysql://mysql:3306/demo",
66- "topics": "test01",
67- "key.converter": "org.apache.kafka.connect.storage.StringConverter",
68- "connection.user": "connect_user",
69- "connection.password": "asgard",
70- "auto.create": true,
71- "auto.evolve": true,
72- "insert.mode": "insert",
73- "pk.mode": "record_key",
74- "pk.fields": "MESSAGE_KEY"
66+ "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
67+ "connection.url" : "jdbc:mysql://mysql:3306/demo",
68+ "topics" : "test01",
69+ "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
70+ "value.converter" : "io.confluent.connect.avro.AvroConverter",
71+ "value.converter.schema.registry.url": "http://schema-registry:8081",
72+ "connection.user" : "connect_user",
73+ "connection.password" : "asgard",
74+ "auto.create" : true,
75+ "auto.evolve" : true,
76+ "insert.mode" : "insert",
77+ "pk.mode" : "record_key",
78+ "pk.fields" : "MESSAGE_KEY"
7579}'
7680----
7781+
@@ -80,14 +84,54 @@ Things to customise for your environment:
8084* `topics` : the source topic(s) you want to send to S3
8185* `key.converter` : match the serialisation of your source data (see https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/[here])
8286* `value.converter` : match the serialisation of your source data (see https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/[here])
83- * `transforms` : remove this if you don't want partition and offset added to each message
87+ +
88+ Check data in target db
89+ +
90+ [source,bash]
91+ ----
92+ docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
93+ ----
94+ +
95+ [source,sql]
96+ ----
97+ mysql> SHOW TABLES;
98+ +----------------+
99+ | Tables_in_demo |
100+ +----------------+
101+ | test01 |
102+ +----------------+
103+ 1 row in set (0.00 sec)
104+
105+ mysql> SELECT * FROM test01;
106+ +------+------+-------------+
107+ | COL1 | COL2 | MESSAGE_KEY |
108+ +------+------+-------------+
109+ | 1 | FOO | X |
110+ | 2 | BAR | Y |
111+ +------+------+-------------+
112+ 2 rows in set (0.00 sec)
113+ ----
114+
84115
851164. Insert some more data. Notice what happens if you re-use a key.
86117+
87118[source,sql]
88119----
89- INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Z' ,1,'WOO' );
90- INSERT INTO TEST01 (ROWKEY, COL1, COL2) VALUES ('Y' ,4,'PFF' );
120+ INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('Z' ,1,'WOO' );
121+ INSERT INTO TEST01 (KEY_COL, COL1, COL2) VALUES ('Y' ,4,'PFF' );
122+ ----
123+ +
124+ The Kafka Connect tasks status shows that there's a duplicate key error
125+ +
126+ [source,bash]
127+ ----
128+ $ curl -s http://localhost:8083/connectors/sink-jdbc-mysql-01/tasks/0/status | jq '.'
129+ {
130+ "id": 0,
131+ "state": "FAILED",
132+ "worker_id": "kafka-connect:8083",
133+ "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Duplicate entry 'Y' for key 'test01.PRIMARY'\njava.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'Y' for key 'test01.PRIMARY'\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)\n\t... 10 more\nCaused by: java.sql.SQLException: Exception chain:\njava.sql.BatchUpdateException: Duplicate entry 'Y' for key 'test01.PRIMARY'\njava.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'Y' for key 'test01.PRIMARY'\n\n\tat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)\n\t... 11 more\n"
134+ }
91135----
92136
931375. Update the Sink connector to use `UPSERT` mode
@@ -109,12 +153,33 @@ curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
109153 "pk.fields": "MESSAGE_KEY"
110154}'
111155----
156+ +
157+ Now the row for key `Y` updates, whilst `Z` is added:
158+ +
159+ [source,sql]
160+ ----
161+ mysql> SELECT * FROM test01;
162+ +------+------+-------------+
163+ | COL1 | COL2 | MESSAGE_KEY |
164+ +------+------+-------------+
165+ | 1 | FOO | X |
166+ | 4 | PFF | Y |
167+ | 1 | WOO | Z |
168+ +------+------+-------------+
169+ 3 rows in set (0.00 sec)
170+ ----
112171
1131726. Drop fields, add fields - note how the target schema evolves in-place
114173+
115174[source,javascript]
116175----
117- curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
176+ # Drop the existing connector
177+ curl -X DELETE http://localhost:8083/connectors/sink-jdbc-mysql-01
178+
179+ # Create a new one, reading from the same topic with new config
180+ # Because it's got a new name, the connector will re-read all the messages
181+ # from the topic.
182+ curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02/config \
118183 -H "Content-Type: application/json" -d '{
119184 "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
120185 "connection.url": "jdbc:mysql://mysql:3306/demo",
@@ -135,6 +200,31 @@ curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-01/config \
135200 "transforms.addSome.timestamp.field" : "RECORD_TS"
136201}'
137202----
203+ +
204+ [source,sql]
205+ ----
206+ mysql> describe test01;
207+ +-------------+--------------+------+-----+---------+-------+
208+ | Field | Type | Null | Key | Default | Extra |
209+ +-------------+--------------+------+-----+---------+-------+
210+ | COL1 | int | YES | | NULL | |
211+ | COL2 | varchar(256) | YES | | NULL | |
212+ | MESSAGE_KEY | varchar(256) | NO | PRI | NULL | |
213+ | _partition | int | YES | | NULL | |
214+ | RECORD_TS | datetime(3) | YES | | NULL | |
215+ +-------------+--------------+------+-----+---------+-------+
216+ 5 rows in set (0.00 sec)
217+
218+ mysql> select * from test01;
219+ +------+------+-------------+------------+-------------------------+
220+ | COL1 | COL2 | MESSAGE_KEY | _partition | RECORD_TS |
221+ +------+------+-------------+------------+-------------------------+
222+ | 1 | FOO | X | 0 | 2021-03-11 11:50:00.759 |
223+ | 4 | PFF | Y | 0 | 2021-03-11 11:50:47.761 |
224+ | 1 | WOO | Z | 0 | 2021-03-11 11:50:47.682 |
225+ +------+------+-------------+------------+-------------------------+
226+ 3 rows in set (0.00 sec)
227+ ----
138228
139229
1402307. Write some CSV and JSON to new topics
@@ -167,17 +257,17 @@ EOF
167257----
168258curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-json/config \
169259 -H "Content-Type: application/json" -d '{
170- "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
171- "connection.url": "jdbc:mysql://mysql:3306/demo",
172- "topics": "some_json_data_with_a_schema",
173- "key.converter": "org.apache.kafka.connect.storage.StringConverter",
174- "value.converter": "org.apache.kafka.connect.json.JsonConverter",
260+ "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
261+ "connection.url" : "jdbc:mysql://mysql:3306/demo",
262+ "topics" : "some_json_data_with_a_schema",
263+ "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
264+ "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
175265 "value.converter.schemas.enable": "true",
176- "connection.user": "connect_user",
177- "connection.password": "asgard",
178- "auto.create": true,
179- "auto.evolve": true,
180- "insert.mode": "insert"
266+ "connection.user" : "connect_user",
267+ "connection.password" : "asgard",
268+ "auto.create" : true,
269+ "auto.evolve" : true,
270+ "insert.mode" : "insert"
181271}'
182272----
183273
@@ -188,6 +278,8 @@ curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-json/config \
188278CREATE STREAM SOME_JSON (ID INT, ARTIST VARCHAR, SONG VARCHAR)
189279 WITH (KAFKA_TOPIC='some_json_data' , VALUE_FORMAT='JSON' );
190280
281+ SET 'auto.offset.reset' = 'earliest' ;
282+
191283CREATE STREAM SOME_JSON_AS_AVRO
192284 WITH (VALUE_FORMAT='AVRO' ) AS
193285 SELECT * FROM SOME_JSON;
@@ -200,21 +292,21 @@ INSERT INTO SOME_JSON_AS_AVRO SELECT * FROM SOME_CSV;
200292
20129310. Create a sink for the reserialized data
202294+
203- NOTE: The Kafka Connect worker configuration is set to use `AvroConverter` which is why it is not specified here.
204- +
205295[source,bash]
206296----
207297curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-avro/config \
208298 -H "Content-Type: application/json" -d '{
209- "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
210- "connection.url": "jdbc:mysql://mysql:3306/demo",
211- "topics": "SOME_JSON_AS_AVRO",
212- "key.converter": "org.apache.kafka.connect.storage.StringConverter",
213- "connection.user": "connect_user",
214- "connection.password": "asgard",
215- "auto.create": true,
216- "auto.evolve": true,
217- "insert.mode": "insert"
299+ "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
300+ "connection.url" : "jdbc:mysql://mysql:3306/demo",
301+ "topics" : "SOME_JSON_AS_AVRO",
302+ "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
303+ "value.converter" : "io.confluent.connect.avro.AvroConverter",
304+ "value.converter.schema.registry.url": "http://schema-registry:8081",
305+ "connection.user" : "connect_user",
306+ "connection.password" : "asgard",
307+ "auto.create" : true,
308+ "auto.evolve" : true,
309+ "insert.mode" : "insert"
218310}'
219311----
220312
0 commit comments