Skip to content

Commit ea40d0d

Browse files
author
Aleksandar Osenov
committed
Renamed two classes, changed ping server to be the same for broth Rpc clients, moved kafka version back to 3.2.0
1 parent b596dcc commit ea40d0d

File tree

6 files changed

+22
-22
lines changed

6 files changed

+22
-22
lines changed

devicehive-auth/src/main/java/com/devicehive/application/AuthProxyClientConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
*/
2222

2323
import com.devicehive.api.RequestResponseMatcher;
24-
import com.devicehive.proxy.AuthProxyClient;
24+
import com.devicehive.proxy.AuthRpcClient;
2525
import com.devicehive.proxy.ProxyResponseHandler;
2626
import com.devicehive.proxy.api.NotificationHandler;
2727
import com.devicehive.proxy.client.WebSocketKafkaProxyClient;
@@ -59,10 +59,10 @@ public NotificationHandler notificationHandler(Gson gson, RequestResponseMatcher
5959
}
6060

6161
@Bean
62-
public AuthProxyClient rpcClient(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig, RequestResponseMatcher requestResponseMatcher, Gson gson) {
62+
public AuthRpcClient rpcClient(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig, RequestResponseMatcher requestResponseMatcher, Gson gson) {
6363
WebSocketKafkaProxyClient proxyClient = new WebSocketKafkaProxyClient(notificationHandler);
6464
proxyClient.setWebSocketKafkaProxyConfig(proxyConfig);
65-
AuthProxyClient client = new AuthProxyClient(REQUEST_TOPIC, responseTopic, proxyClient, requestResponseMatcher, gson);
65+
AuthRpcClient client = new AuthRpcClient(REQUEST_TOPIC, responseTopic, proxyClient, requestResponseMatcher, gson);
6666
client.start();
6767
return client;
6868
}

devicehive-auth/src/main/java/com/devicehive/proxy/AuthProxyClient.java renamed to devicehive-auth/src/main/java/com/devicehive/proxy/AuthRpcClient.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,16 @@
4444
import java.util.function.Consumer;
4545

4646
@Profile("ws-kafka-proxy")
47-
public class AuthProxyClient implements RpcClient {
48-
private static final Logger logger = LoggerFactory.getLogger(AuthProxyClient.class);
47+
public class AuthRpcClient implements RpcClient {
48+
private static final Logger logger = LoggerFactory.getLogger(AuthRpcClient.class);
4949

5050
private final String requestTopic;
5151
private final String replyToTopic;
5252
private final ProxyClient client;
5353
private final RequestResponseMatcher requestResponseMatcher;
5454
private final Gson gson;
5555

56-
public AuthProxyClient(String requestTopic, String replyToTopic, ProxyClient client, RequestResponseMatcher requestResponseMatcher, Gson gson) {
56+
public AuthRpcClient(String requestTopic, String replyToTopic, ProxyClient client, RequestResponseMatcher requestResponseMatcher, Gson gson) {
5757
this.requestTopic = requestTopic;
5858
this.replyToTopic = replyToTopic;
5959
this.client = client;
@@ -82,9 +82,9 @@ public void push(Request request) {
8282
@Override
8383
public void start() {
8484
client.start();
85-
pingServer();
8685
createTopic(Arrays.asList(requestTopic, replyToTopic));
8786
subscribeToTopic(replyToTopic);
87+
pingServer();
8888

8989
}
9090

@@ -110,7 +110,8 @@ private void pingServer() {
110110
requestResponseMatcher.addRequestCallback(request.getCorrelationId(), pingFuture::complete);
111111
logger.debug("Request callback added for request: {}, correlationId: {}", request.getBody(), request.getCorrelationId());
112112

113-
client.push(ProxyMessageBuilder.notification(new NotificationCreatePayload(requestTopic, gson.toJson(request)))); // toDo: use request partition key
113+
client.push(ProxyMessageBuilder.notification(
114+
new NotificationCreatePayload(requestTopic, gson.toJson(request), request.getPartitionKey())));
114115

115116
Response response = null;
116117
try {
Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.devicehive.api.RequestResponseMatcher;
2424
import com.devicehive.model.ServerEvent;
2525
import com.devicehive.proxy.api.NotificationHandler;
26-
import com.devicehive.proxy.api.ProxyClient;
2726
import com.devicehive.proxy.api.ProxyMessageBuilder;
2827
import com.devicehive.proxy.api.payload.NotificationCreatePayload;
2928
import com.devicehive.proxy.api.payload.SubscribePayload;
@@ -44,8 +43,8 @@
4443
import java.util.concurrent.*;
4544
import java.util.function.Consumer;
4645

47-
public class FrontendProxyClient implements RpcClient {
48-
private static final Logger logger = LoggerFactory.getLogger(FrontendProxyClient.class);
46+
public class FrontendRpcClient implements RpcClient {
47+
private static final Logger logger = LoggerFactory.getLogger(FrontendRpcClient.class);
4948

5049
private final String requestTopic;
5150
private final String replyToTopic;
@@ -56,7 +55,7 @@ public class FrontendProxyClient implements RpcClient {
5655
private final Gson gson;
5756
private final RingBuffer<ServerEvent> ringBuffer;
5857

59-
public FrontendProxyClient(String requestTopic, String replyToTopic, WebSocketKafkaProxyConfig proxyConfig, NotificationHandler notificationHandler, RequestResponseMatcher requestResponseMatcher, Gson gson, RingBuffer<ServerEvent> ringBuffer) {
58+
public FrontendRpcClient(String requestTopic, String replyToTopic, WebSocketKafkaProxyConfig proxyConfig, NotificationHandler notificationHandler, RequestResponseMatcher requestResponseMatcher, Gson gson, RingBuffer<ServerEvent> ringBuffer) {
6059
this.requestTopic = requestTopic;
6160
this.replyToTopic = replyToTopic;
6261
this.proxyConfig = proxyConfig;
@@ -118,7 +117,7 @@ private void pingServer() {
118117
boolean connected = false;
119118
int attempts = 10;
120119
for (int i = 0; i < attempts; i++) {
121-
logger.info("Ping WebSocket Proxy attempt {}", i);
120+
logger.info("Ping WebSocket Proxy Server attempt {}", i);
122121

123122
CompletableFuture<Response> pingFuture = new CompletableFuture<>();
124123

@@ -132,9 +131,9 @@ private void pingServer() {
132131
try {
133132
response = pingFuture.get(3000, TimeUnit.MILLISECONDS);
134133
} catch (InterruptedException | ExecutionException e) {
135-
logger.error("Exception occured while trying to ping Backend Server ", e);
134+
logger.error("Exception occured while trying to ping WebSocket Proxy Server ", e);
136135
} catch (TimeoutException e) {
137-
logger.warn("Backend Server didn't respond to ping request");
136+
logger.warn("WebSocket Proxy Server didn't respond to ping request");
138137
continue;
139138
} finally {
140139
requestResponseMatcher.removeRequestCallback(request.getCorrelationId());
@@ -147,10 +146,10 @@ private void pingServer() {
147146
}
148147
}
149148
if (connected) {
150-
logger.info("Successfully connected to Backend Server");
149+
logger.info("Successfully connected to WebSocket Proxy Server");
151150
} else {
152-
logger.error("Unable to reach out Backend Server in {} attempts", attempts);
153-
throw new RuntimeException("Backend Server is not reachable");
151+
logger.error("Unable to reach out WebSocket Proxy Server in {} attempts", attempts);
152+
throw new RuntimeException("WebSocket Proxy Server is not reachable");
154153
}
155154
}
156155
}

devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import com.devicehive.api.RequestResponseMatcher;
2424
import com.devicehive.model.ServerEvent;
25-
import com.devicehive.proxy.FrontendProxyClient;
25+
import com.devicehive.proxy.FrontendRpcClient;
2626
import com.devicehive.proxy.ProxyResponseHandler;
2727
import com.devicehive.proxy.api.NotificationHandler;
2828
import com.devicehive.shim.api.client.RpcClient;
@@ -97,7 +97,7 @@ public WorkerPool<ServerEvent> workerPool(Gson gson, RequestResponseMatcher requ
9797
public RpcClient rpcClient(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig, RequestResponseMatcher requestResponseMatcher, Gson gson, WorkerPool<ServerEvent> workerPool) {
9898
final ExecutorService execService = Executors.newFixedThreadPool(proxyConfig.getWorkerThreads());
9999
RingBuffer<ServerEvent> ringBuffer = workerPool.start(execService);
100-
RpcClient client = new FrontendProxyClient(REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, notificationHandler, requestResponseMatcher, gson, ringBuffer);
100+
RpcClient client = new FrontendRpcClient(REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, notificationHandler, requestResponseMatcher, gson, ringBuffer);
101101
client.start();
102102
return client;
103103
}

dockerfiles/devicehive-kafka.Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM docker.io/bitnami/kafka:3.5.1
1+
FROM docker.io/bitnami/kafka:3.2.0
22

33
MAINTAINER devicehive
44

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
<javax.ws.rs-api.version>2.1.1</javax.ws.rs-api.version>
4242
<javax.websocket.version>1.1</javax.websocket.version>
4343
<javax.el.version>2.2.4</javax.el.version>
44-
<kafka.version>3.5.1</kafka.version>
44+
<kafka.version>3.2.0</kafka.version>
4545
<scala-binaries.version>2.13</scala-binaries.version>
4646
<slf4j.version>1.7.5</slf4j.version>
4747
<logback.version>1.1.3</logback.version>

0 commit comments

Comments
 (0)