diff --git a/pom.xml b/pom.xml index 2d8e109..7996e08 100644 --- a/pom.xml +++ b/pom.xml @@ -27,6 +27,10 @@ + + org.springframework.kafka + spring-kafka + org.springframework.boot spring-boot-starter-log4j2 diff --git a/src/main/java/com/app/controller/KafkaProducerController.java b/src/main/java/com/app/controller/KafkaProducerController.java new file mode 100644 index 0000000..03dffc0 --- /dev/null +++ b/src/main/java/com/app/controller/KafkaProducerController.java @@ -0,0 +1,22 @@ +package com.app.controller; + +import com.app.service.KafkaProducerService; +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/kafka") +@RequiredArgsConstructor +public class KafkaProducerController { + + private final KafkaProducerService producerService; + + @PostMapping("/publish") + public String publishMessage(@RequestParam("message") String message) { + producerService.sendMessage(message); + return "Message published successfully!"; + } +} diff --git a/src/main/java/com/app/service/KafkaConsumerService.java b/src/main/java/com/app/service/KafkaConsumerService.java new file mode 100644 index 0000000..8ec5a3c --- /dev/null +++ b/src/main/java/com/app/service/KafkaConsumerService.java @@ -0,0 +1,5 @@ +package com.app.service; + +public interface KafkaConsumerService { + void listen(String message); +} diff --git a/src/main/java/com/app/service/KafkaConsumerServiceImpl.java b/src/main/java/com/app/service/KafkaConsumerServiceImpl.java new file mode 100644 index 0000000..fbc865f --- /dev/null +++ b/src/main/java/com/app/service/KafkaConsumerServiceImpl.java @@ -0,0 +1,14 @@ +package com.app.service; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; + +@Service +public class KafkaConsumerServiceImpl implements KafkaConsumerService { + + @KafkaListener(topics = "test-topic", groupId = "test-group") + @Override + public void listen(String message) { + System.out.println("Received message: " + message); + } +} diff --git a/src/main/java/com/app/service/KafkaProducerService.java b/src/main/java/com/app/service/KafkaProducerService.java new file mode 100644 index 0000000..ec6dc31 --- /dev/null +++ b/src/main/java/com/app/service/KafkaProducerService.java @@ -0,0 +1,5 @@ +package com.app.service; + +public interface KafkaProducerService { + void sendMessage(String message); +} diff --git a/src/main/java/com/app/service/KafkaProducerServiceImpl.java b/src/main/java/com/app/service/KafkaProducerServiceImpl.java new file mode 100644 index 0000000..0b68997 --- /dev/null +++ b/src/main/java/com/app/service/KafkaProducerServiceImpl.java @@ -0,0 +1,18 @@ +package com.app.service; + +import lombok.RequiredArgsConstructor; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class KafkaProducerServiceImpl implements KafkaProducerService{ + + private final KafkaTemplate kafkaTemplate; + + private static final String TOPIC = "test-topic"; + + public void sendMessage(String message) { + kafkaTemplate.send(TOPIC, message); + } +} diff --git a/src/main/resources/application-local.yml b/src/main/resources/application-local.yml index d2f8b2d..cc7a908 100644 --- a/src/main/resources/application-local.yml +++ b/src/main/resources/application-local.yml @@ -1,4 +1,30 @@ app: logs: path: C:/${spring.application.name}/logs +spring: + kafka: + bootstrap-servers: localhost:29092 + + # Producer Configuration + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + acks: 1 # Acknowledgment level: 0, 1, or all + retries: 3 # Retry count for sending messages + batch-size: 16384 # Batch size for producer messages + linger-ms: 1 # Delay in milliseconds before sending a batch + buffer-memory: 33554432 # Total buffer memory for producer + + # Consumer Configuration + consumer: + group-id: test-group # Consumer group ID + auto-offset-reset: earliest # Where to start reading messages (earliest/latest/none) + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + enable-auto-commit: true # Automatically commit offsets + auto-commit-interval: 1000 # Interval in milliseconds to commit offsets + + # Template Default Topic + template: + default-topic: test-topic # Default topic for KafkaTemplate