Skip to content

Commit cffce15

Browse files
committed
Add in rekey with processor api into test case
1 parent 2f35861 commit cffce15

File tree

4 files changed

+50
-40
lines changed

4 files changed

+50
-40
lines changed

kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/AppConfig.kt

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -119,17 +119,5 @@ class AppConfig {
119119
orderProcessingTopology: Topology,
120120
streamConfig: Properties
121121
) = KafkaStreams(orderProcessingTopology, streamConfig)
122-
//
123-
// @Bean
124-
// fun bootstrapSemantics(
125-
// streamsBuilder: StreamsBuilder,
126-
// streamConfig: Properties
127-
// ) = KafkaStreams(BootstrapSemanticsSelfJoinTopology.build(streamsBuilder, streamConfig), streamConfig)
128-
129-
// @Bean
130-
// fun bootstrapSemantics(
131-
// streamsBuilder: StreamsBuilder,
132-
// streamConfig: Properties
133-
// ) = KafkaStreams(BootstrapSemanticsJoinOtherTableTopology.build(streamsBuilder, streamConfig), streamConfig)
134122

135123
}

kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/KafkaStreamsApp.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import org.springframework.boot.runApplication
99
@SpringBootApplication
1010
class KafkaStreamsApp(
1111
private val orderProcessingApp: KafkaStreams,
12-
//private val bootstrapSemantics: KafkaStreams
1312
) : CommandLineRunner {
1413

1514
companion object {
@@ -19,7 +18,6 @@ class KafkaStreamsApp(
1918
override fun run(vararg args: String) {
2019
logger.info("Running Kotlin Kakfa Streams")
2120
orderProcessingApp.start()
22-
//bootstrapSemantics.start()
2321
}
2422
}
2523

kotlin-kafka-streams-examples/src/main/kotlin/com/perkss/kafka/reactive/OrderProcessingTopology.kt

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.perkss.kafka.reactive
22

3+
import com.perkss.kafka.reactive.processor.RekeyStream
34
import com.perkss.order.model.OrderConfirmed
45
import com.perkss.order.model.OrderRejected
56
import com.perkss.order.model.OrderRequested
@@ -22,7 +23,6 @@ object OrderProcessingTopology {
2223
stockSerde: SpecificAvroSerde<Stock>,
2324
props: AppProperties): KTable<String, Stock> {
2425
return streamsBuilder.table(props.stockInventory, Consumed.with(Serdes.String(), stockSerde))
25-
//, Materialized.`as`(props.stockInventory)
2626
}
2727

2828

@@ -44,13 +44,18 @@ object OrderProcessingTopology {
4444
orderRejectedSerde: SpecificAvroSerde<OrderRejected>,
4545
orderConfirmedSerde: SpecificAvroSerde<OrderConfirmed>,
4646
stockSerde: SpecificAvroSerde<Stock>): Topology {
47+
48+
val rekeyStream = { RekeyStream<String, OrderRequested> { it.productId } }
49+
4750
val split = streamsBuilder
48-
.stream(props.orderRequest, Consumed.with(keySerde, orderRequestedSerde))
49-
.peek { key, value -> logger.info("Consumed {} {}", key, value) }
50-
// Rekey to the Product ID so we can join with Product Table
51-
.selectKey { _, value -> value.productId }
52-
.peek { key, value -> logger.info("Rekeyed to Product Id {} {}", key, value) }
53-
.leftJoin(stockTable, { orderRequest: OrderRequested, stock: Stock? ->
51+
.stream(props.orderRequest, Consumed.with(keySerde, orderRequestedSerde))
52+
.peek { key, value -> logger.info("Consumed {} {}", key, value) }
53+
// Rekey to the Product ID so we can join with Product Table
54+
//.selectKey { _, value -> value.productId }
55+
// Use the Processor API to rekey so we do dont create another topic
56+
.transform(rekeyStream)
57+
.peek { key, value -> logger.info("Rekeyed to Product Id {} {}", key, value) }
58+
.leftJoin(stockTable, { orderRequest: OrderRequested, stock: Stock? ->
5459
if (stock != null && stock.quantityAvailable > 0) {
5560
orderRequest
5661
} else {

kotlin-kafka-streams-examples/src/test/kotlin/com/perkss/kafka/reactive/integration/StreamIntegrationTest.kt

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,11 @@ internal class StreamIntegrationTest @Autowired constructor(private var appPrope
8080
val kafkaAdminClient: AdminClient = KafkaAdminClient.create(props)
8181
val result: CreateTopicsResult = kafkaAdminClient
8282
.createTopics(
83-
listOf("order-request", "order-processed",
84-
"stock", "customer", "order-rejected")
85-
.map { name -> NewTopic(name, 3, 1.toShort()) }
83+
listOf(
84+
"order-request", "order-processed",
85+
"stock", "customer", "order-rejected"
86+
)
87+
.map { name -> NewTopic(name, 12, 1.toShort()) }
8688
.toList())
8789
logger.info("Topics created {}", result.all().get())
8890

@@ -102,45 +104,62 @@ internal class StreamIntegrationTest @Autowired constructor(private var appPrope
102104
put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer")
103105
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
104106
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServers)
105-
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer::class.java)
106-
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer::class.java)
107+
put(
108+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
109+
org.apache.kafka.common.serialization.StringDeserializer::class.java
110+
)
111+
put(
112+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
113+
io.confluent.kafka.serializers.KafkaAvroDeserializer::class.java
114+
)
107115
put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true)
108-
put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://${schemaRegistry.containerIpAddress}:${schemaRegistry.getMappedPort(8081)}")
116+
put(
117+
KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
118+
"http://${schemaRegistry.containerIpAddress}:${schemaRegistry.getMappedPort(8081)}"
119+
)
109120
}
110121

111122

112123
val testProducer = KafkaProducer<String, Any>(producerProps)
113124
val testConsumer = KafkaConsumer<String, OrderConfirmed>(consumerProps)
114125

115-
val orderId = UUID.randomUUID().toString()
116-
val productId = UUID.randomUUID().toString()
117-
val customerId = UUID.randomUUID().toString()
126+
val orderId = "e125125125gasfaseegakjgsaka"
127+
val productId = "e124121ffgaavasraseveas"
128+
val customerId = "f124f25125212156"
118129
val value = OrderRequested(orderId, productId, customerId)
119130

120131
// Populate the stock table and the customer table
121132
testProducer.send(
122-
ProducerRecord(appProperties.customerInformation, customerId,
123-
Customer(customerId, "perkss", "london").toGenericRecord(SchemaLoader.loadSchema())))
133+
ProducerRecord(
134+
appProperties.customerInformation, customerId,
135+
Customer(customerId, "perkss", "london").toGenericRecord(SchemaLoader.loadSchema())
136+
)
137+
)
124138

125139
testProducer.send(
126-
ProducerRecord(appProperties.stockInventory, productId, Stock(productId, "Party Poppers", 5)))
140+
ProducerRecord(appProperties.stockInventory, productId, Stock(productId, "Party Poppers", 5))
141+
)
127142

128143
// send Order Request message to topology
129144
testProducer.send(
130-
ProducerRecord(appProperties.orderRequest, orderId, value))
145+
ProducerRecord(appProperties.orderRequest, orderId, value)
146+
)
131147

132148
testConsumer.subscribe(listOf(appProperties.orderProcessedTopic))
133149

134-
val actual = mutableMapOf<String, OrderConfirmed>()
135-
val expected = mapOf(orderId to OrderConfirmed(orderId, productId, customerId, true))
150+
val actual = mutableMapOf<String, Pair<Int, OrderConfirmed>>()
151+
val expectedPartition = 3
152+
val expected = mapOf(orderId to (expectedPartition to OrderConfirmed(orderId, productId, customerId, true)))
136153

137-
val timeout = System.currentTimeMillis() + 60000L;
138-
while (actual != expected && System.currentTimeMillis() < timeout) {
139-
val records: ConsumerRecords<String, OrderConfirmed> = testConsumer.poll(Duration.ofSeconds(1))
140-
records.forEach { record -> actual[record.key()] = record.value() }
154+
val timeout = System.currentTimeMillis() + 60000L
155+
while (actual.isEmpty() && System.currentTimeMillis() < timeout) {
156+
val records: ConsumerRecords<String, OrderConfirmed> = testConsumer.poll(Duration.ofSeconds(5))
157+
assertEquals(1, records.count())
158+
records.forEach { record -> actual[record.key()] = record.partition() to record.value() }
141159
}
142160

143161
assertEquals(expected[orderId], actual[orderId])
162+
assertEquals(expectedPartition, actual[orderId]?.first)
144163
orderProcessingApp.close()
145164
}
146165

0 commit comments

Comments
 (0)