Skip to content

Commit a0a653e

Browse files
committed
Upgrade Kafka 3.0.0
1 parent cffce15 commit a0a653e

File tree

43 files changed

+1148
-739
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1148
-739
lines changed

.github/workflows/build.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
name: Java CI
2+
3+
on: [ push ]
4+
5+
jobs:
6+
build:
7+
runs-on: ubuntu-latest
8+
9+
steps:
10+
- uses: actions/checkout@v2
11+
- name: Build with Maven
12+
run: mvn install -Dmaven.compiler.target=11 -Dmaven.compiler.source=11

.travis.yml

Lines changed: 0 additions & 10 deletions
This file was deleted.

README.md

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,48 @@
11
# Reactive Kotlin Kafka and Kafka Streams with Kotlin
2+
23
Kafka and Kafka Stream Examples in Kotlin with Project Reactor Kafka
34

45
Please check out the blog posts for more details.
5-
* [Kotlin Kafka Streams](https://perkss.github.io/#/DistributedSystems/Streaming#KafkaStreamsKotlin)
6+
7+
* [Kotlin Kafka Streams](https://perkss.github.io/#/DistributedSystems/Streaming#KafkaStreamsKotlin)
68

79
## Docker Environment
8-
Please use the docker-compose file in the root of each module project to create the Kafka Brokers and Zookeeper and where appropriate
9-
the Schema Registry.
10+
11+
Please use the docker-compose file in the root of each module project to create the Kafka Brokers and Zookeeper and
12+
where appropriate the Schema Registry.
1013

1114
Please check the directory README for details how to run this example.
1215

1316
## Integration Tests
14-
Integration tests can be found for each module project and these require Docker to be running and use [Testcontainers](https://www.testcontainers.org/) these are powerful tests that fire up Kafka instances and our application and fully test the flow of messages through our streaming application.
17+
18+
Integration tests can be found for each module project and these require Docker to be running and
19+
use [Testcontainers](https://www.testcontainers.org/) these are powerful tests that fire up Kafka instances and our
20+
application and fully test the flow of messages through our streaming application.
1521

1622
## Kafka Reactive Producer Consumer
17-
This example shows how you can use the reactive API to build a consumer from a `lowercase-topic` map the data and output it
18-
with the same key to a `uppercase-topic` with the data converted to uppercase. Please check the sub module README for
19-
how to execute this. Its a very interesting yet simple example, as you can see when the consume is lazily instantiated when
20-
it connects and then once a message is received it lazily instantiates the producer to send on.
23+
24+
This example shows how you can use the reactive API to build a consumer from a `lowercase-topic` map the data and output
25+
it with the same key to a `uppercase-topic` with the data converted to uppercase. Please check the sub module README for
26+
how to execute this. It is a very interesting yet simple example, as you can see when the consume is lazily instantiated
27+
when it connects and then once a message is received it lazily instantiates the producer to send on.
2128

2229
## Kafka Reactive Secure Producer Consumer
23-
Shows how you can run a secured broker cluster using TLS and a application that will will consume and produce with this secure
24-
transport layer to the brokers. Details can be found in the sub folder README.
30+
31+
Shows how you can run a secured broker cluster using TLS and a application that will will consume and produce with this
32+
secure transport layer to the brokers. Details can be found in the sub folder README.
2533

2634
## Kafka Streams and Kotlin Examples
27-
This module is for examples of using Kafka Streams with Kotlin and Avro. Here we build a stock ordering system that has the concept
28-
of customers to place orders. We use Avro to define schemas for the main topics and use changelog tables to store down product and
29-
customer information which is joined to the OrderRequests. This module depends on the Avro code generation in the `avro-schemas` module
30-
so that needs building before compiling this module.
31-
32-
### Kafka Streams Examples
33-
We also have a package for examples to show the following features [found here.](kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/examples)
34-
* Windowing
35-
* Aggregates
36-
* Joins
35+
36+
This module is for examples of using Kafka Streams with Kotlin and Avro. Here we build a stock ordering system that has
37+
the concept of customers to place orders. We use Avro to define schemas for the main topics and use changelog tables to
38+
store down product and customer information which is joined to the OrderRequests. This module depends on the Avro code
39+
generation in the `avro-schemas` module so that needs building before compiling this module.
40+
41+
### Kafka Streams Examples
42+
43+
We also have a package for examples to show the following
44+
features [found here.](kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/examples)
45+
46+
* Windowing
47+
* Aggregates
48+
* Joins

avro-schemas/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Avro Schemas
22

3-
This module is used to show the automatic Class creation using the `avro-maven-plugin` from a `avsc` file.
4-
Here we create the order request data type to be used by the `SpecificAvroSerde`.
3+
This module is used to show the automatic Class creation using the `avro-maven-plugin` from a `avsc` file. Here we
4+
create the order request data type to be used by the `SpecificAvroSerde`.
55

66
`kotlin-kafka-streams` depends on this module.

kafka-reactive-producer-consumer/README.md

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
# Kafka Reactive Producer Consumer Example
22

33
## Check the TestContainers Test
4-
To quickly run the application without any setup you just need a running Docker process. This test will fire up a Kafka Container,
5-
and then the topology as a spring boot application, push a message on the `lowercase-topic`, process it
6-
in the topology to uppercase and output it to the `uppercase-topic`
4+
5+
To quickly run the application without any setup you just need a running Docker process. This test will fire up a Kafka
6+
Container, and then the topology as a spring boot application, push a message on the `lowercase-topic`, process it in
7+
the topology to uppercase and output it to the `uppercase-topic`
78

89
## Running the Example
910

@@ -27,12 +28,13 @@ Check the topic was created:
2728
docker exec reactive-kafka-example-broker kafka-topics --zookeeper zookeeper:2181 --list
2829
```
2930

30-
Produce some example data to the topic and with the Spring Boot Kafka Reactive App Running it will log the consumption of these messages:
31+
Produce some example data to the topic and with the Spring Boot Kafka Reactive App Running it will log the consumption
32+
of these messages:
3133

3234
```bash
3335
docker exec -it reactive-kafka-example-broker kafka-console-producer --broker-list broker:9092 --topic lowercase-topic --property "parse.key=true" --property "key.separator=:"
3436
```
35-
37+
3638
For example
3739

3840
```
@@ -45,16 +47,18 @@ Check the topology output is uppercase with the console consumer
4547
docker exec reactive-kafka-example-broker kafka-console-consumer --bootstrap-server broker:9092 --topic uppercase-topic --property print.key=true --property key.separator="-" --from-beginning
4648
```
4749

48-
4950
## Bad message fixes just like prod support
51+
5052
If you input a bad message onto a topic you can move the offset to the latest
53+
5154
```bash
5255
docker exec -it reactive-kafka-example-broker kafka-consumer-groups --bootstrap-server broker:9092 --group sample-group --reset-offsets --to-latest --topic lowercase-topic --execute
5356
```
5457

55-
5658
## Shell into the Container
57-
If you need to shell into containers you can do like this.
59+
60+
If you need to shell into containers you can do like this.
61+
5862
```bash
5963
sudo docker exec -i -t reactive-kafka-example-zookeeper /bin/bash
6064
```

kafka-reactive-producer-consumer/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
<dependency>
6464
<groupId>org.testcontainers</groupId>
6565
<artifactId>testcontainers</artifactId>
66+
<version>${testcontainers.version}</version>
6667
<scope>test</scope>
6768
</dependency>
6869
<dependency>
@@ -73,7 +74,6 @@
7374
<dependency>
7475
<groupId>org.testcontainers</groupId>
7576
<artifactId>kafka</artifactId>
76-
<version>1.12.1</version>
7777
<scope>test</scope>
7878
</dependency>
7979
<dependency>
@@ -85,7 +85,7 @@
8585
<dependency>
8686
<groupId>org.awaitility</groupId>
8787
<artifactId>awaitility-kotlin</artifactId>
88-
<version>4.0.2</version>
88+
<version>4.1.1</version>
8989
<scope>test</scope>
9090
</dependency>
9191
</dependencies>

kafka-reactive-producer-consumer/src/main/kotlin/com/perkss/kafka/reactive/KafkaReactiveApp.kt

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import reactor.core.publisher.Mono
1010
import reactor.kafka.sender.SenderRecord
1111

1212
@SpringBootApplication
13-
class KafkaReactiveApp(private var consumer: KafkaReactiveConsumer<String, String>,
14-
private var producer: KafkaReactiveProducer<String, String>,
15-
private var reactiveKafkaAppProperties: ReactiveKafkaAppProperties) : CommandLineRunner {
13+
class KafkaReactiveApp(
14+
private var consumer: KafkaReactiveConsumer<String, String>,
15+
private var producer: KafkaReactiveProducer<String, String>,
16+
private var reactiveKafkaAppProperties: ReactiveKafkaAppProperties
17+
) : CommandLineRunner {
1618

1719
companion object {
1820
private val logger = LoggerFactory.getLogger(KafkaReactiveApp::class.java)
@@ -25,16 +27,16 @@ class KafkaReactiveApp(private var consumer: KafkaReactiveConsumer<String, Strin
2527
val outputTopic = reactiveKafkaAppProperties.outputTopic
2628

2729
consumer.consume()
28-
.map {
29-
val producerRecord = ProducerRecord(outputTopic, it.key(), it.value().toUpperCase())
30-
logger.info("Building uppercase message. Key: ${it.key()} Message: ${it.value().toUpperCase()}")
31-
SenderRecord.create(producerRecord, it.key())
32-
}
33-
.map { producer.send(Mono.just(it)).subscribe() }
34-
.doOnError { logger.error("An error has occurred $it") }
35-
.subscribe {
36-
logger.info("Subscribing to Consumer and Producer")
37-
}
30+
.map {
31+
val producerRecord = ProducerRecord(outputTopic, it.key(), it.value().toUpperCase())
32+
logger.info("Building uppercase message. Key: ${it.key()} Message: ${it.value().toUpperCase()}")
33+
SenderRecord.create(producerRecord, it.key())
34+
}
35+
.map { producer.send(Mono.just(it)).subscribe() }
36+
.doOnError { logger.error("An error has occurred $it") }
37+
.subscribe {
38+
logger.info("Subscribing to Consumer and Producer")
39+
}
3840
}
3941

4042
}

kafka-reactive-producer-consumer/src/main/kotlin/com/perkss/kafka/reactive/KafkaReactiveConsumer.kt

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import reactor.kafka.receiver.ReceiverOptions
99
import reactor.kafka.receiver.ReceiverRecord
1010
import java.util.*
1111

12-
class KafkaReactiveConsumer<K, V>(bootstrapServers: String,
13-
topic: String,
14-
consumerGroupId: String,
15-
autoOffsetReset: String = "earliest") {
12+
class KafkaReactiveConsumer<K, V>(
13+
bootstrapServers: String,
14+
topic: String,
15+
consumerGroupId: String,
16+
autoOffsetReset: String = "earliest"
17+
) {
1618

1719
companion object {
1820
private val logger = LoggerFactory.getLogger(KafkaReactiveConsumer::class.java)
@@ -30,12 +32,12 @@ class KafkaReactiveConsumer<K, V>(bootstrapServers: String,
3032
val consumerOptions = ReceiverOptions.create<K, V>(consumerProps).subscription(Collections.singleton(topic))
3133

3234
receiver = KafkaReceiver.create(consumerOptions)
33-
.receive()
34-
.map {
35-
logger.info("Received message: $it")
36-
it.receiverOffset().commit()
37-
it
38-
}
35+
.receive()
36+
.map {
37+
logger.info("Received message: $it")
38+
it.receiverOffset().commit()
39+
it
40+
}
3941
}
4042

4143
fun consume() = receiver

kafka-reactive-producer-consumer/src/main/kotlin/com/perkss/kafka/reactive/KafkaReactiveProducer.kt

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,13 @@ class KafkaReactiveProducer<K, V>(bootstrapServers: String) {
3131

3232
fun send(outboundFlux: Mono<SenderRecord<K, V, String>>): Flux<SenderResult<String>> {
3333
return sender.send(outboundFlux)
34-
.doOnError { e -> logger.error("Send failed", e) }
35-
.doOnNext { r -> logger.info("Message Key ${r.correlationMetadata()} send response: ${r.recordMetadata().topic()}") }
34+
.doOnError { e -> logger.error("Send failed", e) }
35+
.doOnNext { r ->
36+
logger.info(
37+
"Message Key ${r.correlationMetadata()} send response: ${
38+
r.recordMetadata().topic()
39+
}"
40+
)
41+
}
3642
}
3743
}

kafka-reactive-producer-consumer/src/main/kotlin/com/perkss/kafka/reactive/config/AppConfig.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ class AppConfig {
1313

1414
@Bean
1515
fun reactiveProducer(propertiesReactiveKafka: ReactiveKafkaAppProperties) =
16-
KafkaReactiveProducer<String, String>(propertiesReactiveKafka.bootstrapServers)
16+
KafkaReactiveProducer<String, String>(propertiesReactiveKafka.bootstrapServers)
1717

1818
@Bean
1919
fun reactiveConsumer(propertiesReactiveKafka: ReactiveKafkaAppProperties) =
20-
KafkaReactiveConsumer<String, String>(
21-
propertiesReactiveKafka.bootstrapServers,
22-
propertiesReactiveKafka.inputTopic,
23-
propertiesReactiveKafka.consumerGroupId)
20+
KafkaReactiveConsumer<String, String>(
21+
propertiesReactiveKafka.bootstrapServers,
22+
propertiesReactiveKafka.inputTopic,
23+
propertiesReactiveKafka.consumerGroupId
24+
)
2425

2526

2627
}

0 commit comments

Comments
 (0)