Skip to content

Commit f62e2ba

Browse files
authored
Simplify UserState (#18)
* Simplify UserState * Improve README
1 parent 1f9848a commit f62e2ba

File tree

3 files changed

+14
-17
lines changed

3 files changed

+14
-17
lines changed

README.md

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ It offers an abstraction (the **binding**) that works the same whatever undernea
1717

1818
You can also check out [Spring Cloud Stream Kafka Streams first steps](https://github.com/rogervinas/spring-cloud-stream-kafka-streams-first-steps) where I got working a simple example using **Kafka Streams binder**.
1919

20-
In this one the goal is to use the **Kafka Streams binder** and the [Kafka Streams Processor API](https://kafka.apache.org/documentation/streams/developer-guide/processor-api.html) to implement the following scenario:
20+
In this one the goal is to use the **Kafka Streams binder** and the [Kafka Streams Processor API](https://kafka.apache.org/documentation/streams/developer-guide/processor-api.html) to implement the following scenario:
2121

2222
![Diagram](doc/diagram.png)
2323

@@ -97,7 +97,7 @@ fun `should publish expired event for one user`() {
9797

9898
We start first with our **UserStateStream** implementation as a **Function**:
9999
* Which input is a **KStream<String, UserTokenEvent>**, as we want a **String** as the Kafka message's key and a **UserTokenEvent** as the Kafka message's value
100-
* Which output is a **KStream<String, UserStateEvent>**, same here, **String** as the key and **UserStateEvent** as the value
100+
* Which output is a **KStream<String, UserStateEvent>**, same here, **String** as the key and **UserStateEvent** as the value
101101

102102
```kotlin
103103
class UserStateStream(
@@ -156,7 +156,7 @@ We can generate **completed** **UserStateEvents** straightaway once we receive t
156156
```kotlin
157157
data class UserState(val userId: String = "", val tokens: List<Int> = emptyList()) {
158158
// ...
159-
fun isCompleted() = tokens.containsAll(listOf(1, 2, 3, 4, 5))
159+
val completed = tokens.containsAll(listOf(1, 2, 3, 4, 5))
160160
}
161161

162162
class UserStateStream(
@@ -170,7 +170,7 @@ class UserStateStream(
170170
.mapValues { state ->
171171
logger.info("State $state")
172172
when {
173-
state.isCompleted() -> UserStateEvent(state.userId, COMPLETED)
173+
state.completed -> UserStateEvent(state.userId, COMPLETED)
174174
else -> null
175175
}
176176
}
@@ -211,9 +211,8 @@ class UserStateProcessor(
211211
Just apply the expiration logic this way:
212212

213213
```kotlin
214-
data class UserState(val userId: String = "", val tokens: List<Int> = emptyList(), private val expired: Boolean = false) {
214+
data class UserState(val userId: String = "", val tokens: List<Int> = emptyList(), val expired: Boolean = false) {
215215
// ...
216-
fun isExpired() = expired
217216
fun expire() = UserState(userId, tokens, true)
218217
}
219218

@@ -228,7 +227,7 @@ class UserStateProcessor(
228227
stateStore.all().forEachRemaining {
229228
val age = Duration.ofMillis(time - it.value.timestamp())
230229
if (age > expiration) {
231-
if (it.value.value().isExpired()) {
230+
if (it.value.value().expired) {
232231
// if it is already expired from a previous execution, we delete it
233232
logger.info("Delete ${it.key}")
234233
stateStore.delete(it.key)
@@ -264,9 +263,9 @@ class UserStateStream(
264263
// null states are sent downstream by UserStateProcessor when deleting entries from the store
265264
state == null -> null // "null" value generated by UserStateProcessor deleting values from the store
266265
// completed states are sent downstream from upstream
267-
state.isCompleted() -> UserStateEvent(state.userId, COMPLETED)
266+
state.completed -> UserStateEvent(state.userId, COMPLETED)
268267
// expired states are sent downstream by UserStateProcessor when updating entries from the store
269-
state.isExpired() -> UserStateEvent(state.userId, EXPIRED)
268+
state.expired -> UserStateEvent(state.userId, EXPIRED)
270269
else -> null
271270
}
272271
}
@@ -469,7 +468,7 @@ docker-compose up -d
469468
docker-compose down
470469
```
471470

472-
Then you can use [kcat](https://github.com/edenhill/kcat) (formerly know as **kafkacat**) to produce/consume to/from **Kafka**:
471+
Then you can use [kcat](https://github.com/edenhill/kcat) to produce/consume to/from **Kafka**:
473472
```shell
474473
# consume
475474
kcat -b localhost:9094 -C -t pub.user.token -f '%k %s\n'

src/main/kotlin/com/rogervinas/kafkastreams/stream/UserState.kt

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package com.rogervinas.kafkastreams.stream
22

3-
data class UserState(val userId: String = "", val tokens: List<Int> = emptyList(), private val expired: Boolean = false) {
3+
data class UserState(val userId: String = "", val tokens: List<Int> = emptyList(), val expired: Boolean = false) {
44

5-
fun isCompleted() = tokens.containsAll(listOf(1, 2, 3, 4, 5))
6-
7-
fun isExpired() = expired
5+
val completed = tokens.containsAll(listOf(1, 2, 3, 4, 5))
86

97
fun expire() = UserState(userId, tokens, true)
108

src/main/kotlin/com/rogervinas/kafkastreams/stream/UserStateStream.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ class UserStateStream(
4747
logger.info("State $state")
4848
when {
4949
state == null -> null
50-
state.isCompleted() -> UserStateEvent(state.userId, COMPLETED)
51-
state.isExpired() -> UserStateEvent(state.userId, EXPIRED)
50+
state.completed -> UserStateEvent(state.userId, COMPLETED)
51+
state.expired -> UserStateEvent(state.userId, EXPIRED)
5252
else -> null
5353
}
5454
}
@@ -73,7 +73,7 @@ class UserStateProcessor(
7373
stateStore.all().forEachRemaining {
7474
val age = Duration.ofMillis(time - it.value.timestamp())
7575
if (age > expiration) {
76-
if (it.value.value().isExpired()) {
76+
if (it.value.value().expired) {
7777
logger.info("Delete ${it.key}")
7878
stateStore.delete(it.key)
7979
} else {

0 commit comments

Comments
 (0)