Skip to content

Commit 8aaf71f

Browse files
committed
Bootstrap semantics join with other Kafka Streams.
1 parent b15784a commit 8aaf71f

File tree

1 file changed

+41
-21
lines changed

1 file changed

+41
-21
lines changed

kotlin-kafka-streams-examples/README.md

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,11 @@ docker run --rm --net=host confluentinc/cp-kafka:latest kafka-console-consumer
108108
docker exec -it kafka-3 kafka-console-producer --broker-list kafka-2:29092 --topic name --property "parse.key=true" --property "key.separator=:"
109109
```
110110

111-
### Test
111+
### Test semantics
112112

113113
For the first test we will run just a KTable that consumes the messages off a compacted topic after two messages with
114-
the same key have been placed on a topic.
114+
the same key have been placed on a topic. I would expect that this topology will process all messages on start up
115+
including duplicate keys so we see the full history following streaming semantics.
115116

116117
```shell
117118
Topic: name PartitionCount: 3 ReplicationFactor: 3 Configs: cleanup.policy=compact
@@ -130,7 +131,7 @@ Topic: name PartitionCount: 3 ReplicationFactor: 3 Configs: cleanup.policy=compa
130131
.to("name-formatted")
131132
```
132133
133-
Put two messages on the topic with the same key
134+
Put two messages on the `name` topic with the same key
134135
135136
```shell
136137
tom perks
@@ -139,15 +140,15 @@ tom matthews
139140
140141
If you run the application now as expected it will process both messages.
141142
142-
Now lets add a globaltable into the mix and join on it. This should only join the latest values.
143-
144143
```shell
145144
docker exec -it kafka-3 kafka-streams-application-reset --application-id OrderProcessing \
146145
--input-topics name \
147146
--bootstrap-servers kafka-1:29091,kafka-2:29092,kafka-3:29093 \
148147
--zookeeper zookeeper-1:22181,zookeeper-2:22182,zookeeper-3:22183
149148
```
150149
150+
Now lets add a join to itself using the KTable.
151+
151152
```shell
152153
val nameKTable = streamsBuilder
153154
.table("name", Consumed.with(Serdes.String(), Serdes.String()))
@@ -164,37 +165,56 @@ docker exec -it kafka-3 kafka-streams-application-reset --application-id OrderPr
164165
.to("name-formatted", Produced.with(Serdes.String(), Serdes.String()))
165166
```
166167
167-
Now if we (inner) join the stream to the table itself and we put two messages
168+
Now if we (inner) join the stream to the table and send these messages and then start the application up we more
169+
messages
168170
169171
```shell
170-
stuart:c
171-
stuart:d
172-
max:a
173-
stuart:e
172+
zara:a
173+
zara:b
174+
zara:c
175+
paul:a
174176
```
175177
176-
We now get a result of
178+
We now get a result of processing just the last for a key. Interestingly the last message is processed first, most
179+
likely due to the compaction.
177180
178181
```shell
179-
stuart:e
182+
Processing paul, a
183+
Joining the Stream Name a to the KTable Name a
184+
Processing zara, c
185+
Joining the Stream Name c to the KTable Name c
180186
```
181187
182188
Now if we left join the stream to the table itself and we put two messages
183189
184190
```shell
185-
perkss:a
186-
perkss:b
187-
sam:a
188-
perkss:c
191+
zara:d
192+
zara:e
193+
zara:f
194+
paul:b
195+
```
196+
197+
As expected a left join makes no difference.
198+
199+
```shell
200+
Processing paul, b
201+
Joining the Stream Name b to the KTable Name b
202+
Processing zara, f
203+
Joining the Stream Name f to the KTable Name f
189204
```
190205
206+
Now lets drop a live message onto the KTable backed by topic `name`
207+
191208
```shell
192-
sam:a
193-
perkss:c
209+
paul:c
194210
```
195211
196-
You cannot join a global k table on it self
197-
as `Invalid topology: Topic name has already been registered by another source.`
212+
This results in:
213+
214+
```shell
215+
Processing paul, c
216+
Joining the Stream Name c to the KTable Name c
217+
```
198218
199219
If we were to rekey and join with a different key how are the semantics well let see
200220
@@ -223,7 +243,7 @@ sarah:mark3
223243
mark:sarah2
224244
```
225245
226-
Results in
246+
Results are that we take the latest value like above of the tables and only process that on startup.
227247
228248
```shell
229249
Processing sarah, mark3

0 commit comments

Comments
 (0)