From aa07e229c5fccc9740fa8384416faf3b24a10b60 Mon Sep 17 00:00:00 2001
From: Faiz Akram <156657523+faizorg@users.noreply.github.com>
Date: Thu, 16 Jan 2025 21:03:40 +0530
Subject: [PATCH] Add Support for Kafka Integration
---
pom.xml | 4 +++
.../controller/KafkaProducerController.java | 22 ++++++++++++++++
.../com/app/service/KafkaConsumerService.java | 5 ++++
.../app/service/KafkaConsumerServiceImpl.java | 14 ++++++++++
.../com/app/service/KafkaProducerService.java | 5 ++++
.../app/service/KafkaProducerServiceImpl.java | 18 +++++++++++++
src/main/resources/application-local.yml | 26 +++++++++++++++++++
7 files changed, 94 insertions(+)
create mode 100644 src/main/java/com/app/controller/KafkaProducerController.java
create mode 100644 src/main/java/com/app/service/KafkaConsumerService.java
create mode 100644 src/main/java/com/app/service/KafkaConsumerServiceImpl.java
create mode 100644 src/main/java/com/app/service/KafkaProducerService.java
create mode 100644 src/main/java/com/app/service/KafkaProducerServiceImpl.java
diff --git a/pom.xml b/pom.xml
index 2d8e109..7996e08 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,6 +27,10 @@
+
+ org.springframework.kafka
+ spring-kafka
+
org.springframework.boot
spring-boot-starter-log4j2
diff --git a/src/main/java/com/app/controller/KafkaProducerController.java b/src/main/java/com/app/controller/KafkaProducerController.java
new file mode 100644
index 0000000..03dffc0
--- /dev/null
+++ b/src/main/java/com/app/controller/KafkaProducerController.java
@@ -0,0 +1,22 @@
+package com.app.controller;
+
+import com.app.service.KafkaProducerService;
+import lombok.RequiredArgsConstructor;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("/kafka")
+@RequiredArgsConstructor
+public class KafkaProducerController {
+
+ private final KafkaProducerService producerService;
+
+ @PostMapping("/publish")
+ public String publishMessage(@RequestParam("message") String message) {
+ producerService.sendMessage(message);
+ return "Message published successfully!";
+ }
+}
diff --git a/src/main/java/com/app/service/KafkaConsumerService.java b/src/main/java/com/app/service/KafkaConsumerService.java
new file mode 100644
index 0000000..8ec5a3c
--- /dev/null
+++ b/src/main/java/com/app/service/KafkaConsumerService.java
@@ -0,0 +1,5 @@
+package com.app.service;
+
+public interface KafkaConsumerService {
+ void listen(String message);
+}
diff --git a/src/main/java/com/app/service/KafkaConsumerServiceImpl.java b/src/main/java/com/app/service/KafkaConsumerServiceImpl.java
new file mode 100644
index 0000000..fbc865f
--- /dev/null
+++ b/src/main/java/com/app/service/KafkaConsumerServiceImpl.java
@@ -0,0 +1,14 @@
+package com.app.service;
+
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Service;
+
+@Service
+public class KafkaConsumerServiceImpl implements KafkaConsumerService {
+
+ @KafkaListener(topics = "test-topic", groupId = "test-group")
+ @Override
+ public void listen(String message) {
+ System.out.println("Received message: " + message);
+ }
+}
diff --git a/src/main/java/com/app/service/KafkaProducerService.java b/src/main/java/com/app/service/KafkaProducerService.java
new file mode 100644
index 0000000..ec6dc31
--- /dev/null
+++ b/src/main/java/com/app/service/KafkaProducerService.java
@@ -0,0 +1,5 @@
+package com.app.service;
+
+public interface KafkaProducerService {
+ void sendMessage(String message);
+}
diff --git a/src/main/java/com/app/service/KafkaProducerServiceImpl.java b/src/main/java/com/app/service/KafkaProducerServiceImpl.java
new file mode 100644
index 0000000..0b68997
--- /dev/null
+++ b/src/main/java/com/app/service/KafkaProducerServiceImpl.java
@@ -0,0 +1,18 @@
+package com.app.service;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Service;
+
+@Service
+@RequiredArgsConstructor
+public class KafkaProducerServiceImpl implements KafkaProducerService{
+
+ private final KafkaTemplate kafkaTemplate;
+
+ private static final String TOPIC = "test-topic";
+
+ public void sendMessage(String message) {
+ kafkaTemplate.send(TOPIC, message);
+ }
+}
diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml
index d2f8b2d..cc7a908 100644
--- a/src/main/resources/application-local.yml
+++ b/src/main/resources/application-local.yml
@@ -1,4 +1,30 @@
app:
logs:
path: C:/${spring.application.name}/logs
+spring:
+ kafka:
+ bootstrap-servers: localhost:29092
+
+ # Producer Configuration
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.apache.kafka.common.serialization.StringSerializer
+ acks: 1 # Acknowledgment level: 0, 1, or all
+ retries: 3 # Retry count for sending messages
+ batch-size: 16384 # Batch size for producer messages
+ linger-ms: 1 # Delay in milliseconds before sending a batch
+ buffer-memory: 33554432 # Total buffer memory for producer
+
+ # Consumer Configuration
+ consumer:
+ group-id: test-group # Consumer group ID
+ auto-offset-reset: earliest # Where to start reading messages (earliest/latest/none)
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ enable-auto-commit: true # Automatically commit offsets
+ auto-commit-interval: 1000 # Interval in milliseconds to commit offsets
+
+ # Template Default Topic
+ template:
+ default-topic: test-topic # Default topic for KafkaTemplate