Skip to content

Commit b15784a

Browse files
committed
Bootstrap semantics join with other Kafka Streams.
1 parent d12758d commit b15784a

File tree

5 files changed

+203
-5
lines changed

5 files changed

+203
-5
lines changed

kotlin-kafka-streams-examples/README.md

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,155 @@ Therefore we can see that using the KTable and joining with itself in this simpl
242242
value when processing the stream. To guarantee this we could even check the message timestamps if the joined version is
243243
newer use that, or drop the message and wait for the new version to come in.
244244
245+
### Join another table
246+
247+
```shell script
248+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --create --zookeeper localhost:22181 --replication-factor 3 --partitions 3 --topic first-name
249+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --alter --zookeeper localhost:22181 --topic first-name --config cleanup.policy=compact
250+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --describe --zookeeper localhost:22181 --topic first-name
251+
252+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --create --zookeeper localhost:22181 --replication-factor 3 --partitions 3 --topic last-name
253+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --alter --zookeeper localhost:22181 --topic last-name --config cleanup.policy=compact
254+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --describe --zookeeper localhost:22181 --topic last-name
255+
256+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --create --zookeeper localhost:22181 --replication-factor 3 --partitions 3 --topic joined-name
257+
```
258+
259+
Lets populate the topics before starting the application
260+
261+
```shell
262+
docker exec -it kafka-3 kafka-console-producer --broker-list kafka-2:29092 --topic first-name --property "parse.key=true" --property "key.separator=:"
263+
264+
1:tom
265+
1:matthew
266+
2:mark
267+
```
268+
269+
```shell
270+
docker exec -it kafka-3 kafka-console-producer --broker-list kafka-2:29092 --topic last-name --property "parse.key=true" --property "key.separator=:"
271+
272+
1:banks
273+
2:pears
274+
2:sanders
275+
```
276+
277+
This results in processing all three messages on the stream but no joins successful. Behaviour falls in line with it not
278+
waiting to populate the table and streaming all messages.
279+
280+
```shell
281+
Processing 2, mark
282+
Processing 1, tom
283+
Processing 1, matthew
284+
```
285+
286+
```shell
287+
nameKTable
288+
.toStream()
289+
.peek { key, value ->
290+
logger.info("Processing {}, {}", key, value)
291+
}
292+
.join(lastNameKTable, ValueJoiner { value1, value2 ->
293+
logger.info("Joining the Stream First Name {} to the KTable Last Name {}", value1, value2)
294+
"$value1 $value2"
295+
}, Joined.with(Serdes.String(), Serdes.String(), Serdes.String()))
296+
.to("joined-name", Produced.with(Serdes.String(), Serdes.String()))
297+
298+
```
299+
300+
If we send a last name then a first name like so
301+
302+
Last name
303+
304+
```shell
305+
3:last
306+
```
307+
308+
First name
309+
310+
```shell
311+
3:first
312+
```
313+
314+
Result we get the join successful.
315+
316+
```shell
317+
Processing 3, first
318+
Joining the Stream First Name first to the KTable Last Name last
319+
```
320+
321+
This is due to the timing semantics of KTable. Lets put another first name with the same key.
322+
323+
Now lets do it with a GlobalKTable I would expect the GlobalKtable to pause execution until populated and then join
324+
successfully but still stream all keys.
325+
326+
```shell
327+
3:first2
328+
```
329+
330+
Again it joins
331+
332+
```shell
333+
Processing 3, first2
334+
Joining the Stream First Name first2 to the KTable Last Name last
335+
```
336+
337+
If a late message came it would not join if its timestamp was before the table message timestamp. Here the timestamps
338+
are related.
339+
340+
```shell
341+
val nameKTable = streamsBuilder
342+
.table("first-name", Consumed.with(Serdes.String(), Serdes.String()))
343+
344+
val lastNameKTable = streamsBuilder
345+
.globalTable("last-name", Consumed.with(Serdes.String(), Serdes.String()))
346+
347+
nameKTable
348+
.toStream()
349+
.peek { key, value ->
350+
logger.info("Processing {}, {}", key, value)
351+
}
352+
.join(
353+
lastNameKTable,
354+
KeyValueMapper<String, String, String> { key, value -> key },
355+
ValueJoiner { value1: String, value2: String ->
356+
logger.info("Joining the Stream First Name {} to the KTable Last Name {}", value1, value2)
357+
"$value1 $value2"
358+
}, Named.`as`("global")
359+
)
360+
.to("joined-name", Produced.with(Serdes.String(), Serdes.String()))
361+
362+
```
363+
364+
```shell
365+
docker exec -it kafka-3 kafka-console-producer --broker-list kafka-2:29092 --topic first-name --property "parse.key=true" --property "key.separator=:"
366+
367+
1:peter
368+
1:jackson
369+
2:steven
370+
```
371+
372+
With existing
373+
374+
```shell
375+
docker exec -it kafka-3 kafka-console-producer --broker-list kafka-2:29092 --topic last-name --property "parse.key=true" --property "key.separator=:"
376+
377+
1:banks
378+
2:pears
379+
2:sanders
380+
2:holly
381+
```
382+
383+
Results
384+
385+
```shell
386+
peter banks
387+
jackson banks
388+
steven holly
389+
```
390+
391+
As expected from documentation the GlobalKTable will load up all the data first before starting the application. If this
392+
is the case then we will always join against the tables latest value.
393+
245394
## Tescontainers Integration Tests
246395
247396
Required Docker to be running.

kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/AppConfig.kt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,17 @@ class AppConfig {
114114
orderProcessingTopology: Topology,
115115
streamConfig: Properties
116116
) = KafkaStreams(orderProcessingTopology, streamConfig)
117-
117+
//
118118
// @Bean
119119
// fun bootstrapSemantics(
120120
// streamsBuilder: StreamsBuilder,
121121
// streamConfig: Properties
122-
// ) = KafkaStreams(BootstrapSemanticsTopology.build(streamsBuilder, streamConfig), streamConfig)
122+
// ) = KafkaStreams(BootstrapSemanticsSelfJoinTopology.build(streamsBuilder, streamConfig), streamConfig)
123123

124+
// @Bean
125+
// fun bootstrapSemantics(
126+
// streamsBuilder: StreamsBuilder,
127+
// streamConfig: Properties
128+
// ) = KafkaStreams(BootstrapSemanticsJoinOtherTableTopology.build(streamsBuilder, streamConfig), streamConfig)
124129

125130
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.perkss.kafka.reactive
2+
3+
import org.apache.kafka.common.serialization.Serdes
4+
import org.apache.kafka.streams.StreamsBuilder
5+
import org.apache.kafka.streams.Topology
6+
import org.apache.kafka.streams.kstream.Consumed
7+
import org.apache.kafka.streams.kstream.Joined
8+
import org.apache.kafka.streams.kstream.Produced
9+
import org.apache.kafka.streams.kstream.ValueJoiner
10+
import org.slf4j.LoggerFactory
11+
import java.util.*
12+
13+
object BootstrapSemanticsJoinOtherTableTopology {
14+
15+
private val logger = LoggerFactory.getLogger(BootstrapSemanticsJoinOtherTableTopology::class.java)
16+
17+
fun build(streamsBuilder: StreamsBuilder, properties: Properties): Topology {
18+
val nameKTable = streamsBuilder
19+
.table("first-name", Consumed.with(Serdes.String(), Serdes.String()))
20+
21+
val lastNameKTable = streamsBuilder
22+
.table("last-name", Consumed.with(Serdes.String(), Serdes.String()))
23+
24+
nameKTable
25+
.toStream()
26+
.peek { key, value ->
27+
logger.info("Processing {}, {}", key, value)
28+
}
29+
.join(lastNameKTable, ValueJoiner { value1, value2 ->
30+
logger.info("Joining the Stream First Name {} to the KTable Last Name {}", value1, value2)
31+
"$value1 $value2"
32+
}, Joined.with(Serdes.String(), Serdes.String(), Serdes.String()))
33+
.to("joined-name", Produced.with(Serdes.String(), Serdes.String()))
34+
35+
36+
val topology = streamsBuilder.build(properties)
37+
logger.info("Bootstrap Topology Describe:\n {}", topology.describe())
38+
39+
return topology
40+
}
41+
42+
43+
}
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ import org.apache.kafka.streams.kstream.ValueJoiner
1010
import org.slf4j.LoggerFactory
1111
import java.util.*
1212

13-
object BootstrapSemanticsTopology {
13+
// Results in consuming only the latest message from the Topic
14+
object BootstrapSemanticsSelfJoinTopology {
1415

15-
private val logger = LoggerFactory.getLogger(BootstrapSemanticsTopology::class.java)
16+
private val logger = LoggerFactory.getLogger(BootstrapSemanticsSelfJoinTopology::class.java)
1617

1718
fun build(streamsBuilder: StreamsBuilder, properties: Properties): Topology {
1819
val nameKTable = streamsBuilder

kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/KafkaStreamsApp.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.springframework.boot.runApplication
99
@SpringBootApplication
1010
class KafkaStreamsApp(
1111
private val orderProcessingApp: KafkaStreams,
12-
// private val bootstrapSemantics: KafkaStreams
12+
//private val bootstrapSemantics: KafkaStreams
1313
) : CommandLineRunner {
1414

1515
companion object {

0 commit comments

Comments
 (0)