Skip to content

Commit 2f35861

Browse files
committed
Add in the solution of bootstrapping and using the latest timestamp.
1 parent bcf847d commit 2f35861

File tree

4 files changed

+298
-10
lines changed

4 files changed

+298
-10
lines changed

kotlin-kafka-streams-examples/README.md

Lines changed: 242 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ docker run --rm --net=host confluentinc/cp-kafka:latest kafka-topics --create -
100100
```
101101

102102
```shell
103-
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9092 --topic namedocker run --rm --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9093 --topic name --property print.key=true --from-beginning
103+
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9093 --topic name --property print.key=true --from-beginning
104104
docker run --rm --net=host confluentinc/cp-kafka:latest kafka-console-consumer --bootstrap-server localhost:9093 --topic name-formatted --property print.key=true --from-beginning
105105
```
106106

@@ -110,6 +110,13 @@ docker exec -it kafka-3 kafka-console-producer --broker-list kafka-2:29092 --to
110110

111111
### Test semantics
112112

113+
These tests will use the standard properties for cache and buffering. Later we will run the same tests with these turned
114+
off as these will compact the data in memory which may result in different results to with them on a reference
115+
on [memory management](https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html).
116+
117+
I expect that these initial tests with buffers and cache enabled will compact the data for us and only show the last
118+
key.
119+
113120
For the first test we will run just a KTable that consumes the messages off a compacted topic after two messages with
114121
the same key have been placed on a topic. I would expect that this topology will process all messages on start up
115122
including duplicate keys so we see the full history following streaming semantics.
@@ -134,8 +141,8 @@ Topic: name PartitionCount: 3 ReplicationFactor: 3 Configs: cleanup.policy=compa
134141
Put two messages on the `name` topic with the same key when the application is stopped.
135142
136143
```shell
137-
tom perks
138-
tom matthews
144+
tom:perks
145+
tom:matthews
139146
```
140147
141148
```shell
@@ -240,6 +247,7 @@ If we were to rekey and join with a different key how are the semantics well let
240247
```
241248
242249
Put these messages onto the compact topic `name` whilst the application is down.
250+
243251
```shell
244252
sarah:mark1
245253
mark:sarah1
@@ -419,6 +427,237 @@ steven holly
419427
As expected from documentation the GlobalKTable will load up all the data first before starting the application. If this
420428
is the case then we will always join against the tables latest value.
421429
430+
#### Turning off cache tests
431+
432+
Back to the simple self join example but with cache turned off.
433+
434+
```shell
435+
streamsConfiguration[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0
436+
```
437+
438+
KTable self join.
439+
440+
```shell
441+
val nameKTable = streamsBuilder
442+
.table("name", Consumed.with(Serdes.String(), Serdes.String()))
443+
444+
nameKTable
445+
.toStream()
446+
.peek { key, value ->
447+
logger.info("Processing {}, {}", key, value)
448+
}
449+
.leftJoin(nameKTable, ValueJoiner { value1, value2 ->
450+
logger.info("Joining the Stream Name {} to the KTable Name {}", value1, value2)
451+
value2
452+
}, Joined.with(Serdes.String(), Serdes.String(), Serdes.String()))
453+
.to("name-formatted", Produced.with(Serdes.String(), Serdes.String()))
454+
```
455+
456+
We send onto the name topic these values before starting the app with remember this topic is compacted.
457+
458+
```shell
459+
tom:perks
460+
tom:matthews
461+
tom:stevens
462+
sharon:news
463+
sharon:car
464+
tom:party
465+
```
466+
467+
As expected we now process all the values. The buffering and cache layer does not merge the records.
468+
469+
```shell
470+
Processing tom, perks
471+
Joining the Stream Name perks to the KTable Name perks
472+
Processing tom, matthews
473+
Joining the Stream Name matthews to the KTable Name matthews
474+
Processing tom, stevens
475+
Joining the Stream Name stevens to the KTable Name stevens
476+
Processing sharon, news
477+
Joining the Stream Name news to the KTable Name news
478+
Processing sharon, car
479+
Joining the Stream Name car to the KTable Name car
480+
Processing tom, party
481+
Joining the Stream Name party to the KTable Name party
482+
```
483+
484+
Output to the topic all the values.
485+
486+
```shell
487+
perks
488+
matthews
489+
stevens
490+
news
491+
car
492+
party
493+
```
494+
495+
Lets do this same example and turn the cache back on.
496+
497+
```shell
498+
tom:perks
499+
tom:matthews
500+
tom:stevens
501+
sharon:news
502+
sharon:car
503+
tom:party
504+
```
505+
506+
Results in the data being merged which is what we expected so there is no guarantee of compacting the data, it depends
507+
on the `streamsConfiguration[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG]` and consider `COMMIT_INTERVAL_MS_CONFIG`.
508+
509+
```shell
510+
Processing sharon, car
511+
Joining the Stream Name car to the KTable Name car
512+
Processing tom, party
513+
Joining the Stream Name party to the KTable Name party
514+
```
515+
516+
Now as per this [JIRA](https://issues.apache.org/jira/browse/KAFKA-4113) you can set the timestamps of messages to 0 and
517+
this will ensure the KTable behaves like a GlobalKTable.
518+
519+
Now lets follow this advice and use the custom timestamp extractor lets put the same data onto the topic. This time we
520+
expect even with no cache that the data will only join with the latest timestamp record.
521+
522+
The data will still stream in order but the join will only ever be with the latest.
523+
524+
Place the data on the topic done new data this time
525+
526+
```shell
527+
clark:perks
528+
clark:matthews
529+
clark:stevens
530+
sarah:news
531+
sarah:car
532+
clark:party
533+
```
534+
535+
Interesting with the cache disabled and this custom timestamp extractor using zero we still process all events and join
536+
with the same timstamp.
537+
538+
```shell
539+
Processing sarah, news
540+
Joining the Stream Name news to the KTable Name news
541+
Processing sarah, car
542+
Joining the Stream Name car to the KTable Name car
543+
Processing clark, perks
544+
Joining the Stream Name perks to the KTable Name perks
545+
Processing clark, matthews
546+
Joining the Stream Name matthews to the KTable Name matthews
547+
Processing clark, stevens
548+
Joining the Stream Name stevens to the KTable Name stevens
549+
Processing clark, party
550+
Joining the Stream Name party to the KTable Name party
551+
```
552+
553+
If read further up you see why:
554+
555+
```shell
556+
What you could do it, to write a custom timestamp extractor, and return `0` for each table side record and wall-clock time for each stream side record. In `extract()` to get a `ConsumerRecord` and can inspect the topic name to distinguish between both. Because `0` is smaller than wall-clock time, you can "bootstrap" the table to the end of the topic before any stream-side record gets processed.
557+
```
558+
559+
We need to set zero only for the bootstrap but here we are doing a self join.
560+
561+
Therefore we can implement a custom transformer and change the timestamp to the correct one on the stream flow whilst
562+
setting to zero on the KTable consume.
563+
564+
Here is the customer Timestamp extractor where all values are set to timestamp zero.
565+
566+
```shell
567+
class IgnoreTimestampExtractor : TimestampExtractor {
568+
override fun extract(record: ConsumerRecord<Any, Any>?, partitionTime: Long): Long {
569+
// Ignore the timestamp and then start up.
570+
return 0
571+
}
572+
}
573+
```
574+
575+
Now we set this on the KTable to consume.
576+
577+
```shell
578+
val nameKTable = streamsBuilder
579+
.table(
580+
"name",
581+
Consumed.with(Serdes.String(), Serdes.String()).withTimestampExtractor(IgnoreTimestampExtractor())
582+
)
583+
```
584+
585+
Now we setup the customer processor to change the stream timestamp.
586+
587+
```shell
588+
override fun transform(key: String?, value: String?): KeyValue<String, String>? {
589+
// In reality use the timestamp on the event
590+
context.forward(
591+
key,
592+
value,
593+
To.all().withTimestamp(LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli())
594+
)
595+
return null
596+
}
597+
```
598+
599+
Now we place these messages onto the compacted topic.
600+
601+
```shell
602+
clark:perks
603+
clark:matthews
604+
clark:stevens
605+
sarah:news
606+
sarah:car
607+
clark:party
608+
```
609+
610+
We start up the application and we can see it work correctly. We only join on the latest value and therefore we can
611+
ensure if we compare timestamps we only use the latest value on the key when we join against the KTable. and could
612+
filter older data in the stream.
613+
614+
```shell
615+
Stream Processing Key sarah, Value news
616+
Stream Processing Key sarah, Value car
617+
Stream Processing Key clark, Value perks
618+
Stream Processing Key clark, Value matthews
619+
Stream Processing Key clark, Value stevens
620+
Stream Processing Key clark, Value party
621+
Joining the Stream Name news to the KTable Name car
622+
Changed Timestamp Key sarah, Value car
623+
Joining the Stream Name car to the KTable Name car
624+
Changed Timestamp Key clark, Value perks
625+
Joining the Stream Name perks to the KTable Name party
626+
Changed Timestamp Key clark, Value matthews
627+
Joining the Stream Name matthews to the KTable Name party
628+
Changed Timestamp Key clark, Value stevens
629+
Joining the Stream Name stevens to the KTable Name party
630+
Changed Timestamp Key clark, Value party
631+
Joining the Stream Name party to the KTable Name party
632+
```
633+
634+
Now if we add back in the rekey example and run the data we get.
635+
636+
```shell
637+
mark:sarah1
638+
sarah:mark2
639+
sarah:mark3
640+
mark:sarah2
641+
```
642+
643+
```shell
644+
Stream Processing Key sarah, Value mark1
645+
Changed Timestamp Key sarah, Value mark1
646+
Stream Processing Key mark, Value sarah1
647+
Changed Timestamp Key mark, Value sarah1
648+
Stream Processing Key sarah, Value mark2
649+
Changed Timestamp Key sarah, Value mark2
650+
Stream Processing Key sarah, Value mark3
651+
Changed Timestamp Key sarah, Value mark3
652+
Stream Processing Key mark, Value sarah2
653+
Changed Timestamp Key mark, Value sarah2
654+
Joining the Stream Name mark1 to the KTable Name sarah2
655+
Joining the Stream Name sarah1 to the KTable Name mark3
656+
Joining the Stream Name mark2 to the KTable Name sarah2
657+
Joining the Stream Name mark3 to the KTable Name sarah2
658+
Joining the Stream Name sarah2 to the KTable Name mark3
659+
```
660+
422661
## Tescontainers Integration Tests
423662
424663
Required Docker to be running.

kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/AppConfig.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,13 @@ class AppConfig {
3636
streamsConfiguration[KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = props.schemaRegistry
3737
streamsConfiguration[StreamsConfig.STATE_DIR_CONFIG] = props.stateDir
3838
streamsConfiguration[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
39-
streamsConfiguration[StreamsConfig.TOPOLOGY_OPTIMIZATION] =
39+
streamsConfiguration[StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG] =
4040
StreamsConfig.OPTIMIZE// do not create internal changelog have to have source topic as compact https://stackoverflow.com/questions/57164133/kafka-stream-topology-optimization
41+
// disable cache for testing
42+
streamsConfiguration[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0
43+
streamsConfiguration[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String()::class.java
44+
streamsConfiguration[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String()::class.java
45+
4146
return streamsConfiguration
4247
}
4348

kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/BootstrapSemanticsSelfJoinTopology.kt

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,60 @@
11
package com.perkss.kafka.reactive
22

33
import org.apache.kafka.common.serialization.Serdes
4+
import org.apache.kafka.streams.KeyValue
45
import org.apache.kafka.streams.StreamsBuilder
56
import org.apache.kafka.streams.Topology
6-
import org.apache.kafka.streams.kstream.Consumed
7-
import org.apache.kafka.streams.kstream.Joined
8-
import org.apache.kafka.streams.kstream.Produced
9-
import org.apache.kafka.streams.kstream.ValueJoiner
7+
import org.apache.kafka.streams.kstream.*
8+
import org.apache.kafka.streams.processor.ProcessorContext
9+
import org.apache.kafka.streams.processor.To
1010
import org.slf4j.LoggerFactory
11+
import java.time.LocalDateTime
12+
import java.time.ZoneOffset
1113
import java.util.*
1214

15+
class TimestampTransformer : Transformer<String, String, KeyValue<String, String>?> {
16+
17+
private lateinit var context: ProcessorContext
18+
override fun init(context: ProcessorContext) {
19+
this.context = context
20+
}
21+
22+
override fun close() {
23+
}
24+
25+
override fun transform(key: String?, value: String?): KeyValue<String, String>? {
26+
// In reality use the timestamp on the event
27+
context.forward(
28+
key,
29+
value,
30+
To.all().withTimestamp(LocalDateTime.now().toInstant(ZoneOffset.UTC).toEpochMilli())
31+
)
32+
return null
33+
}
34+
}
35+
36+
1337
// Results in consuming only the latest message from the Topic
1438
object BootstrapSemanticsSelfJoinTopology {
1539

1640
private val logger = LoggerFactory.getLogger(BootstrapSemanticsSelfJoinTopology::class.java)
1741

1842
fun build(streamsBuilder: StreamsBuilder, properties: Properties): Topology {
1943
val nameKTable = streamsBuilder
20-
.table("name", Consumed.with(Serdes.String(), Serdes.String()))
44+
.table(
45+
"name",
46+
Consumed.with(Serdes.String(), Serdes.String()).withTimestampExtractor(IgnoreTimestampExtractor())
47+
)
2148

2249
nameKTable
2350
.toStream()
2451
.peek { key, value ->
25-
logger.info("Processing {}, {}", key, value)
52+
logger.info("Stream Processing Key {}, Value {}", key, value)
53+
}
54+
.transform(
55+
{ TimestampTransformer() })
56+
.peek { key, value ->
57+
logger.info("Changed Timestamp Key {}, Value {}", key, value)
2658
}
2759
.selectKey { key, value ->
2860
val re = Regex("[^A-Za-z]")
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.perkss.kafka.reactive
2+
3+
import org.apache.kafka.clients.consumer.ConsumerRecord
4+
import org.apache.kafka.streams.processor.TimestampExtractor
5+
6+
class IgnoreTimestampExtractor : TimestampExtractor {
7+
override fun extract(record: ConsumerRecord<Any, Any>?, partitionTime: Long): Long {
8+
// Ignore the timestamp to enable a KTable to act like a global KTable and bootstrap fully
9+
// before processing it against joins.
10+
return 0
11+
}
12+
}

0 commit comments

Comments
 (0)