From aa07e229c5fccc9740fa8384416faf3b24a10b60 Mon Sep 17 00:00:00 2001 From: Faiz Akram <156657523+faizorg@users.noreply.github.com> Date: Thu, 16 Jan 2025 21:03:40 +0530 Subject: [PATCH] Add Support for Kafka Integration --- pom.xml | 4 +++ .../controller/KafkaProducerController.java | 22 ++++++++++++++++ .../com/app/service/KafkaConsumerService.java | 5 ++++ .../app/service/KafkaConsumerServiceImpl.java | 14 ++++++++++ .../com/app/service/KafkaProducerService.java | 5 ++++ .../app/service/KafkaProducerServiceImpl.java | 18 +++++++++++++ src/main/resources/application-local.yml | 26 +++++++++++++++++++ 7 files changed, 94 insertions(+) create mode 100644 src/main/java/com/app/controller/KafkaProducerController.java create mode 100644 src/main/java/com/app/service/KafkaConsumerService.java create mode 100644 src/main/java/com/app/service/KafkaConsumerServiceImpl.java create mode 100644 src/main/java/com/app/service/KafkaProducerService.java create mode 100644 src/main/java/com/app/service/KafkaProducerServiceImpl.java diff --git a/pom.xml b/pom.xml index 2d8e109..7996e08 100644 --- a/pom.xml +++ b/pom.xml @@ -27,6 +27,10 @@ + + org.springframework.kafka + spring-kafka + org.springframework.boot spring-boot-starter-log4j2 diff --git a/src/main/java/com/app/controller/KafkaProducerController.java b/src/main/java/com/app/controller/KafkaProducerController.java new file mode 100644 index 0000000..03dffc0 --- /dev/null +++ b/src/main/java/com/app/controller/KafkaProducerController.java @@ -0,0 +1,22 @@ +package com.app.controller; + +import com.app.service.KafkaProducerService; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/kafka") +@RequiredArgsConstructor +public class KafkaProducerController { + + private final KafkaProducerService producerService; + + @PostMapping("/publish") + public String publishMessage(@RequestParam("message") String message) { + producerService.sendMessage(message); + return "Message published successfully!"; + } +} diff --git a/src/main/java/com/app/service/KafkaConsumerService.java b/src/main/java/com/app/service/KafkaConsumerService.java new file mode 100644 index 0000000..8ec5a3c --- /dev/null +++ b/src/main/java/com/app/service/KafkaConsumerService.java @@ -0,0 +1,5 @@ +package com.app.service; + +public interface KafkaConsumerService { + void listen(String message); +} diff --git a/src/main/java/com/app/service/KafkaConsumerServiceImpl.java b/src/main/java/com/app/service/KafkaConsumerServiceImpl.java new file mode 100644 index 0000000..fbc865f --- /dev/null +++ b/src/main/java/com/app/service/KafkaConsumerServiceImpl.java @@ -0,0 +1,14 @@ +package com.app.service; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +@Service +public class KafkaConsumerServiceImpl implements KafkaConsumerService { + + @KafkaListener(topics = "test-topic", groupId = "test-group") + @Override + public void listen(String message) { + System.out.println("Received message: " + message); + } +} diff --git a/src/main/java/com/app/service/KafkaProducerService.java b/src/main/java/com/app/service/KafkaProducerService.java new file mode 100644 index 0000000..ec6dc31 --- /dev/null +++ b/src/main/java/com/app/service/KafkaProducerService.java @@ -0,0 +1,5 @@ +package com.app.service; + +public interface KafkaProducerService { + void sendMessage(String message); +} diff --git a/src/main/java/com/app/service/KafkaProducerServiceImpl.java b/src/main/java/com/app/service/KafkaProducerServiceImpl.java new file mode 100644 index 0000000..0b68997 --- /dev/null +++ b/src/main/java/com/app/service/KafkaProducerServiceImpl.java @@ -0,0 +1,18 @@ +package com.app.service; + +import lombok.RequiredArgsConstructor; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class KafkaProducerServiceImpl implements KafkaProducerService{ + + private final KafkaTemplate kafkaTemplate; + + private static final String TOPIC = "test-topic"; + + public void sendMessage(String message) { + kafkaTemplate.send(TOPIC, message); + } +} diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index d2f8b2d..cc7a908 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -1,4 +1,30 @@ app: logs: path: C:/${spring.application.name}/logs +spring: + kafka: + bootstrap-servers: localhost:29092 + + # Producer Configuration + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + acks: 1 # Acknowledgment level: 0, 1, or all + retries: 3 # Retry count for sending messages + batch-size: 16384 # Batch size for producer messages + linger-ms: 1 # Delay in milliseconds before sending a batch + buffer-memory: 33554432 # Total buffer memory for producer + + # Consumer Configuration + consumer: + group-id: test-group # Consumer group ID + auto-offset-reset: earliest # Where to start reading messages (earliest/latest/none) + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + enable-auto-commit: true # Automatically commit offsets + auto-commit-interval: 1000 # Interval in milliseconds to commit offsets + + # Template Default Topic + template: + default-topic: test-topic # Default topic for KafkaTemplate