Skip to content

Commit da85414

Browse files
committed
Bootstrap semantics Kafka Streams.
1 parent 7dee708 commit da85414

File tree

4 files changed

+291
-52
lines changed

4 files changed

+291
-52
lines changed

kotlin-kafka-streams-examples/README.md

Lines changed: 178 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
# Kafka Streams and Kotlin
22

33
This module formed of two parts:
4-
5-
1) A fully fledged Kafka Streams Spring Boot application providing an ordering system which Avro, joins and output.
64

7-
2) Example package containing examples for [Windowing, Aggregations and Joining](https://github.com/perkss/kotlin-kafka-and-kafka-streams-examples/tree/master/kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/examples).
5+
1) A fully fledged Kafka Streams Spring Boot application providing an ordering system which Avro, joins and output.
86

9-
## Getting up and Running
7+
2) Example package containing examples
8+
for [Windowing, Aggregations and Joining](https://github.com/perkss/kotlin-kafka-and-kafka-streams-examples/tree/master/kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/examples)
9+
.
10+
11+
## Getting up and Running Order Processing Topology
1012

1113
Start up the Kafka and Zookeeper cluster. Three nodes so need at least two up.
1214

1315
`docker-compose up`
1416

15-
Create the topic for the
17+
Create the topic for the
1618

1719
```shell script
1820
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --create --zookeeper localhost:22181 --replication-factor 3 --partitions 3 --topic order-request
@@ -21,7 +23,9 @@ docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --create -
2123
```shell script
2224
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --create --zookeeper localhost:22181 --replication-factor 3 --partitions 3 --topic order-processed
2325
```
26+
2427
Create the table topics and they are required to be `compact` to work as tables
28+
2529
```shell script
2630
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --create --zookeeper localhost:22181 --replication-factor 3 --partitions 3 --topic customer --config cleanup.policy=compact,delete
2731
```
@@ -36,10 +40,11 @@ docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --zookeepe
3640

3741
### Populate Data
3842

39-
First we can populate a customer in the customer topic that will populate the KTable for customer, note the
40-
keys are kept the same between the examples to enable the streaming join with the KTable.
43+
First we can populate a customer in the customer topic that will populate the KTable for customer, note the keys are
44+
kept the same between the examples to enable the streaming join with the KTable.
4145

4246
Start the KafkaAvroConsoleProducer and pass the key value pair in to send a `Customer`.
47+
4348
```shell script
4449
docker run --rm -it --net=host confluentinc/cp-schema-registry:latest kafka-avro-console-producer --broker-list localhost:9092 --topic customer --property "parse.key=true" --property "key.separator=:" --property key.serializer=org.apache.kafka.common.serialization.StringSerializer --property value.schema='{ "namespace": "com.perkss", "type": "record", "name": "Customer", "fields": [ { "name": "id", "type": { "type": "string", "avro.java.string": "String" } }, { "name": "name", "type": { "type": "string", "avro.java.string": "String" } }, { "name": "city", "type": { "type": "string", "avro.java.string": "String" } } ] }'
4550
```
@@ -49,11 +54,13 @@ docker run --rm -it --net=host confluentinc/cp-schema-registry:latest kafka-avr
4954
```
5055

5156
View the schema registered in the SchemaRegistry
57+
5258
```shell script
5359
http://0.0.0.0:8081/subjects/customer-value/versions/1
5460
```
5561

5662
Now send a `OrderRequest` which is an event to join with the `Customer` table
63+
5764
```shell script
5865
docker run --rm -it --net=host confluentinc/cp-schema-registry:latest kafka-avro-console-producer --broker-list localhost:9092 --topic order-request --property "parse.key=true" --property "key.separator=:" --property key.serializer=org.apache.kafka.common.serialization.StringSerializer --property value.schema='{ "namespace": "com.perkss.order.model", "type": "record", "name": "OrderRequested", "fields": [ { "name": "id", "type": { "type": "string", "avro.java.string": "String" } }, { "name": "product_id", "type": { "type": "string", "avro.java.string": "String" } } ] }'
5966
```
@@ -65,6 +72,7 @@ docker run --rm -it --net=host confluentinc/cp-schema-registry:latest kafka-avr
6572
```shell script
6673
http://0.0.0.0:8081/subjects/order-request-value/versions/1
6774
```
75+
6876
You can consume the message written using the following console consumer that references the schema registry.
6977

7078
```shell script
@@ -75,15 +83,175 @@ docker run --rm -it --net=host confluentinc/cp-schema-registry:latest kafka-avr
7583
docker run --rm -it --net=host confluentinc/cp-schema-registry:latest kafka-avro-console-consumer --topic customer --bootstrap-server localhost:9092 --property schema.registry.url="http://0.0.0.0:8081" --from-beginning
7684
```
7785

86+
## Getting up and Running Boostrap Semantics Topology
87+
88+
`docker-compose up`
89+
90+
Create the topic for the
91+
92+
```shell script
93+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --create --zookeeper localhost:22181 --replication-factor 3 --partitions 3 --topic name
94+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --alter --zookeeper localhost:22181 --topic name --config cleanup.policy=compact
95+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --describe --zookeeper localhost:22181 --topic name
96+
```
97+
98+
```shell script
99+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --create --zookeeper localhost:22181 --replication-factor 3 --partitions 3 --topic name-formatted
100+
```
101+
102+
```shell
103+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9092 --topic namedocker run --rm --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9093 --topic name --property print.key=true --from-beginning
104+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9093 --topic name-formatted --property print.key=true --from-beginning
105+
```
106+
107+
```shell
108+
docker exec -it kafka-3 kafka-console-producer --broker-list kafka-2:29092 --topic name --property "parse.key=true" --property "key.separator=:"
109+
```
110+
111+
### Test
112+
113+
For the first test we will run just a KTable that consumes the messages off a compacted topic after two messages with
114+
the same key have been placed on a topic.
115+
116+
```shell
117+
Topic: name PartitionCount: 3 ReplicationFactor: 3 Configs: cleanup.policy=compact
118+
Topic: name Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
119+
Topic: name Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
120+
Topic: name Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
121+
```
122+
123+
```shell
124+
streamsBuilder
125+
.table("name", Consumed.with(Serdes.String(), Serdes.String()))
126+
.toStream()
127+
.peek { key, value ->
128+
logger.info("Processing {}, {}", key, value)
129+
}
130+
.to("name-formatted")
131+
```
132+
133+
Put two messages on the topic with the same key
134+
135+
```shell
136+
tom perks
137+
tom matthews
138+
```
139+
140+
If you run the application now as expected it will process both messages.
141+
142+
Now lets add a globaltable into the mix and join on it. This should only join the latest values.
143+
144+
```shell
145+
docker exec -it kafka-3 kafka-streams-application-reset --application-id OrderProcessing \
146+
--input-topics name \
147+
--bootstrap-servers kafka-1:29091,kafka-2:29092,kafka-3:29093 \
148+
--zookeeper zookeeper-1:22181,zookeeper-2:22182,zookeeper-3:22183
149+
```
150+
151+
```shell
152+
val nameKTable = streamsBuilder
153+
.table("name", Consumed.with(Serdes.String(), Serdes.String()))
154+
155+
nameKTable
156+
.toStream()
157+
.peek { key, value ->
158+
logger.info("Processing {}, {}", key, value)
159+
}
160+
.join(nameKTable, ValueJoiner { value1, value2 ->
161+
logger.info("Joining the Stream Name {} to the KTable Name {}", value1, value2)
162+
value2
163+
}, Joined.with(Serdes.String(), Serdes.String(), Serdes.String()))
164+
.to("name-formatted", Produced.with(Serdes.String(), Serdes.String()))
165+
```
166+
167+
Now if we (inner) join the stream to the table itself and we put two messages
168+
169+
```shell
170+
stuart:c
171+
stuart:d
172+
max:a
173+
stuart:e
174+
```
175+
176+
We now get a result of
177+
178+
```shell
179+
stuart:e
180+
```
181+
182+
Now if we left join the stream to the table itself and we put two messages
183+
184+
```shell
185+
perkss:a
186+
perkss:b
187+
sam:a
188+
perkss:c
189+
```
190+
191+
```shell
192+
sam:a
193+
perkss:c
194+
```
195+
196+
You cannot join a global k table on it self
197+
as `Invalid topology: Topic name has already been registered by another source.`
198+
199+
If we were to rekey and join with a different key how are the semantics well let see
200+
201+
```shell
202+
nameKTable
203+
.toStream()
204+
.peek { key, value ->
205+
logger.info("Processing {}, {}", key, value)
206+
}
207+
.selectKey { key, value ->
208+
val re = Regex("[^A-Za-z0-9 ]")
209+
re.replace(value, "")
210+
}
211+
.join(nameKTable, ValueJoiner { value1, value2 ->
212+
logger.info("Joining the Stream Name {} to the KTable Name {}", value1, value2)
213+
value2
214+
}, Joined.with(Serdes.String(), Serdes.String(), Serdes.String()))
215+
.to("name-formatted", Produced.with(Serdes.String(), Serdes.String()))
216+
```
217+
218+
```shell
219+
sarah:mark1
220+
mark:sarah1
221+
sarah:mark2
222+
sarah:mark3
223+
mark:sarah2
224+
```
225+
226+
Results in
227+
228+
```shell
229+
Processing sarah, mark3
230+
Processing mark, sarah2
231+
232+
Joining the Stream Name mark3 to the KTable Name sarah2
233+
Joining the Stream Name sarah2 to the KTable Name mark3
234+
235+
236+
OutputTopic >
237+
sarah2
238+
mark3
239+
```
240+
241+
Therefore we can see that using the KTable and joining with itself in this simple example will only take the latest
242+
value when processing the stream. To guarantee this we could even check the message timestamps if the joined version is
243+
newer use that, or drop the message and wait for the new version to come in.
244+
78245
## Tescontainers Integration Tests
79246
80-
Required Docker to be running.
247+
Required Docker to be running.
81248
82-
`StreamIntegrationTest` uses [Testcontainers](https://www.testcontainers.org/) to fire up a running instance of Kafka and Schema Registry and runs our application to drop messages on Kafka process them and read the output. Check it out a very powerful example.
249+
`StreamIntegrationTest` uses [Testcontainers](https://www.testcontainers.org/) to fire up a running instance of Kafka
250+
and Schema Registry and runs our application to drop messages on Kafka process them and read the output. Check it out a
251+
very powerful example.
83252
84253
## Examples Understanding KStream Windowing
85254
86-
87255
TODO a table of each event, event timestamp, the window its in and the
88256
89257

kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/AppConfig.kt

Lines changed: 63 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ class AppConfig {
3636
streamsConfiguration[KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = props.schemaRegistry
3737
streamsConfiguration[StreamsConfig.STATE_DIR_CONFIG] = props.stateDir
3838
streamsConfiguration[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
39-
streamsConfiguration[StreamsConfig.TOPOLOGY_OPTIMIZATION] = StreamsConfig.OPTIMIZE// do not create internal changelog have to have source topic as compact https://stackoverflow.com/questions/57164133/kafka-stream-topology-optimization
39+
streamsConfiguration[StreamsConfig.TOPOLOGY_OPTIMIZATION] =
40+
StreamsConfig.OPTIMIZE// do not create internal changelog have to have source topic as compact https://stackoverflow.com/questions/57164133/kafka-stream-topology-optimization
4041
return streamsConfiguration
4142
}
4243

@@ -45,58 +46,80 @@ class AppConfig {
4546

4647
@Bean
4748
fun stockSerde(props: AppProperties): SpecificAvroSerde<Stock> =
48-
SpecificAvroSerde<Stock>().apply {
49-
configure(mutableMapOf(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to props.schemaRegistry), false)
50-
}
49+
SpecificAvroSerde<Stock>().apply {
50+
configure(mutableMapOf(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to props.schemaRegistry), false)
51+
}
5152

5253
// stock table keyed by id of stock
5354
@Bean
5455
fun stockTable(
55-
streamsBuilder: StreamsBuilder,
56-
props: AppProperties,
57-
serde: SpecificAvroSerde<Stock>): KTable<String, Stock> =
58-
stock(streamsBuilder, serde, props)
56+
streamsBuilder: StreamsBuilder,
57+
props: AppProperties,
58+
serde: SpecificAvroSerde<Stock>
59+
): KTable<String, Stock> =
60+
stock(streamsBuilder, serde, props)
5961

6062
// keyed by product ID
6163
@Bean
6264
fun customerTable(
63-
streamsBuilder: StreamsBuilder,
64-
props: AppProperties): GlobalKTable<String, GenericRecord> =
65-
customer(streamsBuilder, props, Serdes.String(), GenericAvroSerde().apply {
66-
configure(mapOf(
67-
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to props.schemaRegistry
68-
), false) // TODO make bean
69-
})
65+
streamsBuilder: StreamsBuilder,
66+
props: AppProperties
67+
): GlobalKTable<String, GenericRecord> =
68+
customer(streamsBuilder, props, Serdes.String(), GenericAvroSerde().apply {
69+
configure(
70+
mapOf(
71+
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to props.schemaRegistry
72+
), false
73+
) // TODO make bean
74+
})
7075

7176
@Bean
7277
fun orderProcessingTopology(
73-
streamConfig: Properties,
74-
streamsBuilder: StreamsBuilder,
75-
props: AppProperties,
76-
customerTable: GlobalKTable<String, GenericRecord>,
77-
stockTable: KTable<String, Stock>,
78-
stockSerde: SpecificAvroSerde<Stock>): Topology {
79-
return orderProcessing(streamConfig, streamsBuilder, props, customerTable, stockTable, Serdes.String(),
80-
SpecificAvroSerde<OrderRequested>().apply {
81-
configure(mapOf(
82-
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to props.schemaRegistry
83-
), false)
84-
},
85-
SpecificAvroSerde<OrderRejected>().apply {
86-
configure(mapOf(
87-
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to props.schemaRegistry
88-
), false)
89-
},
90-
SpecificAvroSerde<OrderConfirmed>().apply {
91-
configure(mapOf(
92-
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to props.schemaRegistry
93-
), false)
94-
},
95-
stockSerde)
78+
streamConfig: Properties,
79+
streamsBuilder: StreamsBuilder,
80+
props: AppProperties,
81+
customerTable: GlobalKTable<String, GenericRecord>,
82+
stockTable: KTable<String, Stock>,
83+
stockSerde: SpecificAvroSerde<Stock>
84+
): Topology {
85+
return orderProcessing(
86+
streamConfig, streamsBuilder, props, customerTable, stockTable, Serdes.String(),
87+
SpecificAvroSerde<OrderRequested>().apply {
88+
configure(
89+
mapOf(
90+
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to props.schemaRegistry
91+
), false
92+
)
93+
},
94+
SpecificAvroSerde<OrderRejected>().apply {
95+
configure(
96+
mapOf(
97+
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to props.schemaRegistry
98+
), false
99+
)
100+
},
101+
SpecificAvroSerde<OrderConfirmed>().apply {
102+
configure(
103+
mapOf(
104+
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to props.schemaRegistry
105+
), false
106+
)
107+
},
108+
stockSerde
109+
)
96110
}
97111

98112
@Bean
99-
fun orderProcessingApp(orderProcessingTopology: Topology,
100-
streamConfig: Properties) = KafkaStreams(orderProcessingTopology, streamConfig)
113+
fun orderProcessingApp(
114+
orderProcessingTopology: Topology,
115+
streamConfig: Properties
116+
) = KafkaStreams(orderProcessingTopology, streamConfig)
117+
118+
@Bean
119+
fun bootstrapSemantics(
120+
streamsBuilder: StreamsBuilder,
121+
streamConfig: Properties
122+
) = KafkaStreams(BootstrapSemanticsTopology.build(streamsBuilder, streamConfig), streamConfig)
123+
101124

102125
}

0 commit comments

Comments
 (0)