Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/com/app/controller/KafkaProducerController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.app.controller;

import com.app.service.KafkaProducerService;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
@RequiredArgsConstructor
public class KafkaProducerController {

private final KafkaProducerService producerService;

@PostMapping("/publish")
public String publishMessage(@RequestParam("message") String message) {
producerService.sendMessage(message);
return "Message published successfully!";
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/app/service/KafkaConsumerService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.app.service;

public interface KafkaConsumerService {
void listen(String message);
}
14 changes: 14 additions & 0 deletions src/main/java/com/app/service/KafkaConsumerServiceImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.app.service;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {

@KafkaListener(topics = "test-topic", groupId = "test-group")
@Override
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/app/service/KafkaProducerService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.app.service;

public interface KafkaProducerService {
void sendMessage(String message);
}
18 changes: 18 additions & 0 deletions src/main/java/com/app/service/KafkaProducerServiceImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.app.service;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class KafkaProducerServiceImpl implements KafkaProducerService{

private final KafkaTemplate<String, String> kafkaTemplate;

private static final String TOPIC = "test-topic";

public void sendMessage(String message) {
kafkaTemplate.send(TOPIC, message);
}
}
26 changes: 26 additions & 0 deletions src/main/resources/application-local.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,30 @@
app:
logs:
path: C:/${spring.application.name}/logs
spring:
kafka:
bootstrap-servers: localhost:29092

# Producer Configuration
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1 # Acknowledgment level: 0, 1, or all
retries: 3 # Retry count for sending messages
batch-size: 16384 # Batch size for producer messages
linger-ms: 1 # Delay in milliseconds before sending a batch
buffer-memory: 33554432 # Total buffer memory for producer

# Consumer Configuration
consumer:
group-id: test-group # Consumer group ID
auto-offset-reset: earliest # Where to start reading messages (earliest/latest/none)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: true # Automatically commit offsets
auto-commit-interval: 1000 # Interval in milliseconds to commit offsets

# Template Default Topic
template:
default-topic: test-topic # Default topic for KafkaTemplate