diff --git a/.gitignore b/.gitignore index 7a161bc50..3e46546f4 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ target/ .project *.DS_Store *.log +devicehive-common/dependency-reduced-pom.xml diff --git a/devicehive-auth/src/main/java/com/devicehive/application/AppContextEventListener.java b/devicehive-auth/src/main/java/com/devicehive/application/AppContextEventListener.java new file mode 100644 index 000000000..05735eb85 --- /dev/null +++ b/devicehive-auth/src/main/java/com/devicehive/application/AppContextEventListener.java @@ -0,0 +1,70 @@ +package com.devicehive.application; + +/* + * #%L + * DeviceHive Frontend Logic + * %% + * Copyright (C) 2016 DataArt + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.springframework.boot.context.event.ApplicationPreparedEvent; +import org.springframework.boot.context.event.SpringApplicationEvent; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ApplicationContextEvent; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MapPropertySource; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class AppContextEventListener implements ApplicationListener { + + private void printActiveProperties(ConfigurableEnvironment env) { + + System.out.println("************************* ACTIVE APP PROPERTIES ******************************"); + + List propertySources = new ArrayList<>(); + + env.getPropertySources().forEach(it -> { + if (it instanceof MapPropertySource) { + propertySources.add((MapPropertySource) it); + } + }); + + propertySources.stream() + .map(propertySource -> propertySource.getSource().keySet()) + .flatMap(Collection::stream) + .distinct() + .sorted() + .forEach(key -> { + try { + System.out.println(key + "=" + env.getProperty(key)); + } catch (Exception e) { + System.out.println(e); + } + }); + System.out.println("******************************************************************************"); + } + + @Override + public void onApplicationEvent(ApplicationEvent event) { + if(event instanceof ApplicationPreparedEvent) { + printActiveProperties((ConfigurableEnvironment) ((ApplicationPreparedEvent) event).getApplicationContext().getEnvironment()); + } + } +} \ No newline at end of file diff --git a/devicehive-auth/src/main/java/com/devicehive/application/AuthProxyClientConfig.java b/devicehive-auth/src/main/java/com/devicehive/application/AuthProxyClientConfig.java index 121909c2a..e02da4e00 100644 --- a/devicehive-auth/src/main/java/com/devicehive/application/AuthProxyClientConfig.java +++ b/devicehive-auth/src/main/java/com/devicehive/application/AuthProxyClientConfig.java @@ -21,7 +21,7 @@ */ import com.devicehive.api.RequestResponseMatcher; -import com.devicehive.proxy.AuthProxyClient; +import com.devicehive.proxy.AuthRpcClient; import com.devicehive.proxy.ProxyResponseHandler; import com.devicehive.proxy.api.NotificationHandler; import com.devicehive.proxy.client.WebSocketKafkaProxyClient; @@ -59,10 +59,10 @@ public NotificationHandler notificationHandler(Gson gson, RequestResponseMatcher } @Bean - public AuthProxyClient rpcClient(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig, RequestResponseMatcher requestResponseMatcher, Gson gson) { + public AuthRpcClient rpcClient(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig, RequestResponseMatcher requestResponseMatcher, Gson gson) { WebSocketKafkaProxyClient proxyClient = new WebSocketKafkaProxyClient(notificationHandler); proxyClient.setWebSocketKafkaProxyConfig(proxyConfig); - AuthProxyClient client = new AuthProxyClient(REQUEST_TOPIC, responseTopic, proxyClient, requestResponseMatcher, gson); + AuthRpcClient client = new AuthRpcClient(REQUEST_TOPIC, responseTopic, proxyClient, requestResponseMatcher, gson); client.start(); return client; } diff --git a/devicehive-auth/src/main/java/com/devicehive/application/DeviceHiveAuthApplication.java b/devicehive-auth/src/main/java/com/devicehive/application/DeviceHiveAuthApplication.java index 640770db4..497d38ad5 100644 --- a/devicehive-auth/src/main/java/com/devicehive/application/DeviceHiveAuthApplication.java +++ b/devicehive-auth/src/main/java/com/devicehive/application/DeviceHiveAuthApplication.java @@ -57,6 +57,7 @@ public class DeviceHiveAuthApplication extends SpringBootServletInitializer { public static void main(String... args) { ConfigurableApplicationContext context = new SpringApplicationBuilder() + .listeners(new AppContextEventListener()) .sources(DeviceHiveAuthApplication.class) .web(WebApplicationType.SERVLET) .run(args); diff --git a/devicehive-auth/src/main/java/com/devicehive/proxy/AuthProxyClient.java b/devicehive-auth/src/main/java/com/devicehive/proxy/AuthRpcClient.java similarity index 92% rename from devicehive-auth/src/main/java/com/devicehive/proxy/AuthProxyClient.java rename to devicehive-auth/src/main/java/com/devicehive/proxy/AuthRpcClient.java index c211c2a6d..0864b5b59 100644 --- a/devicehive-auth/src/main/java/com/devicehive/proxy/AuthProxyClient.java +++ b/devicehive-auth/src/main/java/com/devicehive/proxy/AuthRpcClient.java @@ -44,8 +44,8 @@ import java.util.function.Consumer; @Profile("ws-kafka-proxy") -public class AuthProxyClient implements RpcClient { - private static final Logger logger = LoggerFactory.getLogger(AuthProxyClient.class); +public class AuthRpcClient implements RpcClient { + private static final Logger logger = LoggerFactory.getLogger(AuthRpcClient.class); private final String requestTopic; private final String replyToTopic; @@ -53,7 +53,7 @@ public class AuthProxyClient implements RpcClient { private final RequestResponseMatcher requestResponseMatcher; private final Gson gson; - public AuthProxyClient(String requestTopic, String replyToTopic, ProxyClient client, RequestResponseMatcher requestResponseMatcher, Gson gson) { + public AuthRpcClient(String requestTopic, String replyToTopic, ProxyClient client, RequestResponseMatcher requestResponseMatcher, Gson gson) { this.requestTopic = requestTopic; this.replyToTopic = replyToTopic; this.client = client; @@ -84,8 +84,8 @@ public void start() { client.start(); createTopic(Arrays.asList(requestTopic, replyToTopic)); subscribeToTopic(replyToTopic); - pingServer(); + } public void createTopic(List topics) { @@ -110,7 +110,8 @@ private void pingServer() { requestResponseMatcher.addRequestCallback(request.getCorrelationId(), pingFuture::complete); logger.debug("Request callback added for request: {}, correlationId: {}", request.getBody(), request.getCorrelationId()); - client.push(ProxyMessageBuilder.notification(new NotificationCreatePayload(requestTopic, gson.toJson(request)))); // toDo: use request partition key + client.push(ProxyMessageBuilder.notification( + new NotificationCreatePayload(requestTopic, gson.toJson(request), request.getPartitionKey()))); Response response = null; try { diff --git a/devicehive-auth/src/main/resources/application.properties b/devicehive-auth/src/main/resources/application.properties index 94b59cb2a..348106635 100644 --- a/devicehive-auth/src/main/resources/application.properties +++ b/devicehive-auth/src/main/resources/application.properties @@ -52,6 +52,6 @@ jwt.refresh-token-max-age=15724800000 jwt.access-token-max-age=1800000 -spring.flyway.baselineOnMigrate=false +spring.flyway.baselineOnMigrate=true spring.flyway.table=schema_version #spring.flyway.baseline-version=3.4.12 \ No newline at end of file diff --git a/devicehive-backend/src/test/java/com/devicehive/base/AbstractSpringTest.java b/devicehive-backend/src/test/java/com/devicehive/base/AbstractSpringTest.java index fb8cc4da6..79f3321b2 100644 --- a/devicehive-backend/src/test/java/com/devicehive/base/AbstractSpringTest.java +++ b/devicehive-backend/src/test/java/com/devicehive/base/AbstractSpringTest.java @@ -75,7 +75,7 @@ public abstract class AbstractSpringTest { } @ClassRule - public static KafkaEmbeddedRule kafkaRule = new KafkaEmbeddedRule(true, 1, REQUEST_TOPIC, RESPONSE_TOPIC); + public static KafkaEmbeddedRule kafkaRule = new KafkaEmbeddedRule(); @Rule public Timeout testTimeout = new Timeout(60000, TimeUnit.MILLISECONDS); // 60k ms = 1 minute diff --git a/devicehive-frontend/src/test/java/com/devicehive/base/AbstractSpringKafkaTest.java b/devicehive-frontend/src/test/java/com/devicehive/base/AbstractSpringKafkaTest.java index d445451e0..d6cef60e0 100644 --- a/devicehive-frontend/src/test/java/com/devicehive/base/AbstractSpringKafkaTest.java +++ b/devicehive-frontend/src/test/java/com/devicehive/base/AbstractSpringKafkaTest.java @@ -50,7 +50,7 @@ public abstract class AbstractSpringKafkaTest { } @ClassRule - public static KafkaEmbeddedRule kafkaRule = new KafkaEmbeddedRule(true, 5, REQUEST_TOPIC, RESPONSE_TOPIC); + public static KafkaEmbeddedRule kafkaRule = new KafkaEmbeddedRule(); @Rule public Timeout testTimeout = new Timeout(180000, TimeUnit.MILLISECONDS); // 180k ms = 3 minutes diff --git a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/FrontendProxyClient.java b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/FrontendRpcClient.java similarity index 87% rename from devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/FrontendProxyClient.java rename to devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/FrontendRpcClient.java index 2c7701f94..b80658fef 100644 --- a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/FrontendProxyClient.java +++ b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/FrontendRpcClient.java @@ -23,7 +23,6 @@ import com.devicehive.api.RequestResponseMatcher; import com.devicehive.model.ServerEvent; import com.devicehive.proxy.api.NotificationHandler; -import com.devicehive.proxy.api.ProxyClient; import com.devicehive.proxy.api.ProxyMessageBuilder; import com.devicehive.proxy.api.payload.NotificationCreatePayload; import com.devicehive.proxy.api.payload.SubscribePayload; @@ -44,8 +43,8 @@ import java.util.concurrent.*; import java.util.function.Consumer; -public class FrontendProxyClient implements RpcClient { - private static final Logger logger = LoggerFactory.getLogger(FrontendProxyClient.class); +public class FrontendRpcClient implements RpcClient { + private static final Logger logger = LoggerFactory.getLogger(FrontendRpcClient.class); private final String requestTopic; private final String replyToTopic; @@ -56,7 +55,7 @@ public class FrontendProxyClient implements RpcClient { private final Gson gson; private final RingBuffer ringBuffer; - public FrontendProxyClient(String requestTopic, String replyToTopic, WebSocketKafkaProxyConfig proxyConfig, NotificationHandler notificationHandler, RequestResponseMatcher requestResponseMatcher, Gson gson, RingBuffer ringBuffer) { + public FrontendRpcClient(String requestTopic, String replyToTopic, WebSocketKafkaProxyConfig proxyConfig, NotificationHandler notificationHandler, RequestResponseMatcher requestResponseMatcher, Gson gson, RingBuffer ringBuffer) { this.requestTopic = requestTopic; this.replyToTopic = replyToTopic; this.proxyConfig = proxyConfig; @@ -118,7 +117,7 @@ private void pingServer() { boolean connected = false; int attempts = 10; for (int i = 0; i < attempts; i++) { - logger.info("Ping WebSocket Proxy attempt {}", i); + logger.info("Ping WebSocket Proxy Server attempt {}", i); CompletableFuture pingFuture = new CompletableFuture<>(); @@ -132,9 +131,9 @@ private void pingServer() { try { response = pingFuture.get(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException e) { - logger.error("Exception occured while trying to ping Backend Server ", e); + logger.error("Exception occured while trying to ping WebSocket Proxy Server ", e); } catch (TimeoutException e) { - logger.warn("Backend Server didn't respond to ping request"); + logger.warn("WebSocket Proxy Server didn't respond to ping request"); continue; } finally { requestResponseMatcher.removeRequestCallback(request.getCorrelationId()); @@ -147,10 +146,10 @@ private void pingServer() { } } if (connected) { - logger.info("Successfully connected to Backend Server"); + logger.info("Successfully connected to WebSocket Proxy Server"); } else { - logger.error("Unable to reach out Backend Server in {} attempts", attempts); - throw new RuntimeException("Backend Server is not reachable"); + logger.error("Unable to reach out WebSocket Proxy Server in {} attempts", attempts); + throw new RuntimeException("WebSocket Proxy Server is not reachable"); } } } diff --git a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java index 2b6276f9f..b8bc12536 100644 --- a/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java +++ b/devicehive-proxy-ws-kafka-impl/src/main/java/com/devicehive/proxy/config/FrontendProxyClientConfig.java @@ -22,7 +22,7 @@ import com.devicehive.api.RequestResponseMatcher; import com.devicehive.model.ServerEvent; -import com.devicehive.proxy.FrontendProxyClient; +import com.devicehive.proxy.FrontendRpcClient; import com.devicehive.proxy.ProxyResponseHandler; import com.devicehive.proxy.api.NotificationHandler; import com.devicehive.shim.api.client.RpcClient; @@ -97,7 +97,7 @@ public WorkerPool workerPool(Gson gson, RequestResponseMatcher requ public RpcClient rpcClient(NotificationHandler notificationHandler, WebSocketKafkaProxyConfig proxyConfig, RequestResponseMatcher requestResponseMatcher, Gson gson, WorkerPool workerPool) { final ExecutorService execService = Executors.newFixedThreadPool(proxyConfig.getWorkerThreads()); RingBuffer ringBuffer = workerPool.start(execService); - RpcClient client = new FrontendProxyClient(REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, notificationHandler, requestResponseMatcher, gson, ringBuffer); + RpcClient client = new FrontendRpcClient(REQUEST_TOPIC, RESPONSE_TOPIC, proxyConfig, notificationHandler, requestResponseMatcher, gson, ringBuffer); client.start(); return client; } diff --git a/devicehive-rdbms-dao/pom.xml b/devicehive-rdbms-dao/pom.xml index 655468a98..16461949a 100644 --- a/devicehive-rdbms-dao/pom.xml +++ b/devicehive-rdbms-dao/pom.xml @@ -16,7 +16,7 @@ ${project.parent.basedir} - 3.17.7 + 3.23.5 @@ -56,16 +56,16 @@ org.hibernate.validator hibernate-validator - - javax.el - javax.el-api - ${javax.el.version} - - - org.glassfish.web - javax.el - ${javax.el.version} - + + + + + + + + + + \ No newline at end of file diff --git a/devicehive-rdbms-dao/src/main/resources/db/migration/V3_0_10__drop_network_auto_creation.sql b/devicehive-rdbms-dao/src/main/resources/db/migration/V3_0_10__drop_network_auto_creation.sql index b5043c8f7..c48478024 100644 --- a/devicehive-rdbms-dao/src/main/resources/db/migration/V3_0_10__drop_network_auto_creation.sql +++ b/devicehive-rdbms-dao/src/main/resources/db/migration/V3_0_10__drop_network_auto_creation.sql @@ -2,7 +2,7 @@ -- #%L -- DeviceHive Dao RDBMS Implementation -- %% --- Copyright (C) 2016-2017 DataArt +-- Copyright (C) 2016 DataArt -- %% -- Licensed under the Apache License, Version 2.0 (the "License"); -- you may not use this file except in compliance with the License. diff --git a/devicehive-rdbms-dao/src/main/resources/redisson.yaml b/devicehive-rdbms-dao/src/main/resources/redisson.yaml index b86ccb0d1..9e18e020f 100644 --- a/devicehive-rdbms-dao/src/main/resources/redisson.yaml +++ b/devicehive-rdbms-dao/src/main/resources/redisson.yaml @@ -2,6 +2,7 @@ codec: class: org.redisson.codec.SerializationCodec singleServerConfig: address: "redis://${REDIS_MASTER_HOST}:${REDIS_MASTER_PORT}" + password: "${REDIS_MASTER_PASSWORD}" idleConnectionTimeout: 10000 connectTimeout: 10000 timeout: 3000 diff --git a/devicehive-shim-kafka-impl/src/main/java/com/devicehive/shim/config/KafkaRpcConfig.java b/devicehive-shim-kafka-impl/src/main/java/com/devicehive/shim/config/KafkaRpcConfig.java index 17a322362..39d7ec533 100644 --- a/devicehive-shim-kafka-impl/src/main/java/com/devicehive/shim/config/KafkaRpcConfig.java +++ b/devicehive-shim-kafka-impl/src/main/java/com/devicehive/shim/config/KafkaRpcConfig.java @@ -149,4 +149,8 @@ public int getConnectionTimeout() { public int getHandlerThreads() { return handlerThreads; } + + public String getBootstrapServers() { + return bootstrapServers; + } } diff --git a/devicehive-shim-kafka-impl/src/main/java/com/devicehive/shim/kafka/topic/KafkaRpcTopicService.java b/devicehive-shim-kafka-impl/src/main/java/com/devicehive/shim/kafka/topic/KafkaRpcTopicService.java index e1b06e2aa..79a24a235 100644 --- a/devicehive-shim-kafka-impl/src/main/java/com/devicehive/shim/kafka/topic/KafkaRpcTopicService.java +++ b/devicehive-shim-kafka-impl/src/main/java/com/devicehive/shim/kafka/topic/KafkaRpcTopicService.java @@ -9,9 +9,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,40 +21,50 @@ */ import com.devicehive.shim.config.KafkaRpcConfig; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; -import org.springframework.beans.factory.annotation.Autowired; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; +import java.util.Collections; +import java.util.HashMap; import java.util.Properties; +import java.util.stream.Collectors; @Component @Profile("!ws-kafka-proxy") public class KafkaRpcTopicService implements KafkaTopicService { - @Autowired - private KafkaRpcConfig kafkaRpcConfig; - - public void createTopic(String topic) { - ZkClient zkClient = new ZkClient( - kafkaRpcConfig.getZookeeperConnect(), - kafkaRpcConfig.getSessionTimeout(), - kafkaRpcConfig.getConnectionTimeout(), - ZKStringSerializer$.MODULE$); - try { - ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(kafkaRpcConfig.getZookeeperConnect()), false); - Properties topicConfig = kafkaRpcConfig.topicProps(); - if (!AdminUtils.topicExists(zkUtils, topic)) { - AdminUtils.createTopic(zkUtils, topic, kafkaRpcConfig.getNumPartitions(), - kafkaRpcConfig.getReplicationFactor(), topicConfig, RackAwareMode.Enforced$.MODULE$); - } - } finally { - zkClient.close(); + private final KafkaRpcConfig kafkaRpcConfig; + + public KafkaRpcTopicService(KafkaRpcConfig kafkaRpcConfig) { + this.kafkaRpcConfig = kafkaRpcConfig; + } + + public void createTopic(String topicName) { + Properties properties = new Properties(); + properties.put( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaRpcConfig.getBootstrapServers() + ); + + try (Admin admin = Admin.create(properties)) { + NewTopic newTopic = new NewTopic(topicName, kafkaRpcConfig.getNumPartitions(), (short) kafkaRpcConfig.getReplicationFactor()); + HashMap newTopicConfigs = kafkaRpcConfig.topicProps().entrySet().stream().collect( + Collectors.toMap( + property -> String.valueOf(property.getKey()), + property -> String.valueOf(property.getValue()), + (prev, next) -> next, + HashMap::new + ) + ); + newTopic.configs(newTopicConfigs); + + //result saved for future use if needed + CreateTopicsResult result = admin.createTopics( + Collections.singleton(newTopic) + ); } } } diff --git a/devicehive-shim-kafka-impl/src/test/java/com/devicehive/shim/kafka/test/KafkaRpcClientServerCommunicationTest.java b/devicehive-shim-kafka-impl/src/test/java/com/devicehive/shim/kafka/test/KafkaRpcClientServerCommunicationTest.java index 1d7ce270f..2be4ba675 100644 --- a/devicehive-shim-kafka-impl/src/test/java/com/devicehive/shim/kafka/test/KafkaRpcClientServerCommunicationTest.java +++ b/devicehive-shim-kafka-impl/src/test/java/com/devicehive/shim/kafka/test/KafkaRpcClientServerCommunicationTest.java @@ -70,7 +70,7 @@ public class KafkaRpcClientServerCommunicationTest { private static final String RESPONSE_TOPIC = "response_topic"; @ClassRule - public static KafkaEmbeddedRule kafkaRule = new KafkaEmbeddedRule(true, 1, REQUEST_TOPIC, RESPONSE_TOPIC); + public static KafkaEmbeddedRule kafkaRule = new KafkaEmbeddedRule(); @Rule public Timeout testTimeout = new Timeout(180000, TimeUnit.MILLISECONDS); // 180k ms = 3 minutes @@ -93,8 +93,8 @@ public static void setUp() throws Exception { .create(); server = new ServerBuilder() - .withConsumerProps(kafkaRule.getConsumerProperties()) - .withProducerProps(kafkaRule.getProducerProperties()) +// .withConsumerProps(kafkaRule.getConsumerProperties()) +// .withProducerProps(kafkaRule.getProducerProperties()) .withConsumerValueDeserializer(new RequestSerializer(gson)) .withProducerValueSerializer(new ResponseSerializer(gson)) .withConsumerThreads(1) @@ -104,8 +104,8 @@ public static void setUp() throws Exception { server.start(); client = new ClientBuilder() - .withProducerProps(kafkaRule.getProducerProperties()) - .withConsumerProps(kafkaRule.getConsumerProperties()) +// .withProducerProps(kafkaRule.getProducerProperties()) +// .withConsumerProps(kafkaRule.getConsumerProperties()) .withProducerValueSerializer(new RequestSerializer(gson)) .withConsumerValueDeserializer(new ResponseSerializer(gson)) .withReplyTopic(RESPONSE_TOPIC) diff --git a/devicehive-test-utils/src/main/java/com/devicehive/test/rule/KafkaEmbeddedRule.java b/devicehive-test-utils/src/main/java/com/devicehive/test/rule/KafkaEmbeddedRule.java index 685d0c7f5..d4d8ea56e 100644 --- a/devicehive-test-utils/src/main/java/com/devicehive/test/rule/KafkaEmbeddedRule.java +++ b/devicehive-test-utils/src/main/java/com/devicehive/test/rule/KafkaEmbeddedRule.java @@ -20,168 +20,145 @@ * #L% */ -import kafka.admin.AdminUtils; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.server.NotRunning; -import kafka.utils.CoreUtils; -import kafka.utils.SystemTime$; -import kafka.utils.TestUtils; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.exception.ZkInterruptedException; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kafka.common.utils.Utils; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; import org.junit.rules.ExternalResource; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Optional; -import java.util.Properties; - //TODO: migrate from JUnit4's ExternalResource to JUnit 5 public class KafkaEmbeddedRule extends ExternalResource { - - private static final int KAFKA_DEFAULT_PORT = 9092; - private static final int ZOOKEEPER_DEFAULT_PORT = 2181; - - private boolean controlledShutdown; - private int partitions; - private String[] topics; - - private String zkConnect; - private EmbeddedZookeeperInternal zookeeper; - private ZkClient zookeeperClient; - - private KafkaServer kafkaServer; - - public KafkaEmbeddedRule(boolean controlledShutdown, int partitions, String... topics) { - this.controlledShutdown = controlledShutdown; - this.partitions = partitions; - this.topics = topics; - } - - static class EmbeddedZookeeperInternal { - - private final ZooKeeperServer zooKeeperServer; - private final NIOServerCnxnFactory nioServerCnxnFactory; - private final java.io.File logDir; - private final java.io.File snapshotDir; - - private final int port; - - public EmbeddedZookeeperInternal(int port) throws IOException, InterruptedException { - this.port = port; - logDir = TestUtils.tempDir(); - snapshotDir = TestUtils.tempDir(); - zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, 500); - nioServerCnxnFactory = new NIOServerCnxnFactory(); - InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", port); - nioServerCnxnFactory.configure(inetSocketAddress, 0); - nioServerCnxnFactory.startup(zooKeeperServer); - } - - public void shutdown() { - zooKeeperServer.shutdown(); - nioServerCnxnFactory.shutdown(); - Utils.delete(logDir); - Utils.delete(snapshotDir); - } - - public int getPort() { - return port; - } - - } - - @Override - protected void before() throws Throwable { - - - int zkConnectionTimeout = 6000; - int zkSessionTimeout = 6000; - - int zookeeperPort = Optional.ofNullable(System.getProperty("zookeeper.port")) - .filter(s -> !s.isEmpty()) - .map(Integer::parseInt) - .orElse(ZOOKEEPER_DEFAULT_PORT); - this.zookeeper = new EmbeddedZookeeperInternal(zookeeperPort); - this.zkConnect = "127.0.0.1:" + this.zookeeper.getPort(); - this.zookeeperClient = new ZkClient(this.zkConnect, zkSessionTimeout, zkConnectionTimeout, - ZKStringSerializer$.MODULE$); - - int kafkaPort = Optional.ofNullable(System.getProperty("kafka.port")) - .filter(s -> !s.isEmpty()) - .map(Integer::parseInt) - .orElse(KAFKA_DEFAULT_PORT); - Properties brokerConfigProperties = TestUtils.createBrokerConfig(0, this.zkConnect, this.controlledShutdown, - true, kafkaPort, - scala.Option.apply(null), - scala.Option.apply(null), - scala.Option.apply(null), - true, false, 0, false, 0, false, 0, scala.Option.apply(null)); - brokerConfigProperties.setProperty("replica.socket.timeout.ms", "1000"); - brokerConfigProperties.setProperty("controller.socket.timeout.ms", "1000"); - brokerConfigProperties.setProperty("offsets.topic.replication.factor", "1"); - this.kafkaServer = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), SystemTime$.MODULE$); - - ZkUtils zkUtils = new ZkUtils(this.zookeeperClient, null, false); - Properties properties = new Properties(); - for (String topic : this.topics) { - if (!AdminUtils.topicExists(zkUtils, topic)) { - AdminUtils.createTopic(zkUtils, topic, partitions, 1, properties, null); - } - } - } - - @Override - protected void after() { - try { - if (this.kafkaServer.brokerState().currentState() != (NotRunning.state())) { - this.kafkaServer.shutdown(); - this.kafkaServer.awaitShutdown(); - } - } catch (Exception e) { } - try { - CoreUtils.delete(this.kafkaServer.config().logDirs()); - } catch (Exception e) { } - - try { - this.zookeeperClient.close(); - } catch (ZkInterruptedException e) { } - - try { - this.zookeeper.shutdown(); - } catch (Exception e) { } - } - - public String getZkConnect() { - return this.zkConnect; - } - - public String getBrokerAddress() { - return "localhost:" + kafkaServer.config().port(); - } - - public Properties getProducerProperties() { - Properties producerProps = new Properties(); - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerAddress()); - producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); - return producerProps; - } - - public Properties getConsumerProperties() { - Properties consumerProps = new Properties(); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerAddress()); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "request-group"); - consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); - return consumerProps; - } +// +// private static final int KAFKA_DEFAULT_PORT = 9092; +// private static final int ZOOKEEPER_DEFAULT_PORT = 2181; +// +// private boolean controlledShutdown; +// private int partitions; +// private String[] topics; +// +// private String zkConnect; +// private EmbeddedZookeeperInternal zookeeper; +// private ZkClient zookeeperClient; +// +// private KafkaServer kafkaServer; +// +// public KafkaEmbeddedRule(boolean controlledShutdown, int partitions, String... topics) { +// this.controlledShutdown = controlledShutdown; +// this.partitions = partitions; +// this.topics = topics; +// } +// +// static class EmbeddedZookeeperInternal { +// +// private final ZooKeeperServer zooKeeperServer; +// private final NIOServerCnxnFactory nioServerCnxnFactory; +// private final java.io.File logDir; +// private final java.io.File snapshotDir; +// +// private final int port; +// +// public EmbeddedZookeeperInternal(int port) throws IOException, InterruptedException { +// this.port = port; +// logDir = TestUtils.tempDir(); +// snapshotDir = TestUtils.tempDir(); +// zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, 500); +// nioServerCnxnFactory = new NIOServerCnxnFactory(); +// InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", port); +// nioServerCnxnFactory.configure(inetSocketAddress, 0); +// nioServerCnxnFactory.startup(zooKeeperServer); +// } +// +// public void shutdown() { +// zooKeeperServer.shutdown(); +// nioServerCnxnFactory.shutdown(); +// Utils.delete(logDir); +// Utils.delete(snapshotDir); +// } +// +// public int getPort() { +// return port; +// } +// +// } +// +// @Override +// protected void before() throws Throwable { +// +// +// int zkConnectionTimeout = 6000; +// int zkSessionTimeout = 6000; +// +// int zookeeperPort = Optional.ofNullable(System.getProperty("zookeeper.port")) +// .filter(s -> !s.isEmpty()) +// .map(Integer::parseInt) +// .orElse(ZOOKEEPER_DEFAULT_PORT); +// this.zookeeper = new EmbeddedZookeeperInternal(zookeeperPort); +// this.zkConnect = "127.0.0.1:" + this.zookeeper.getPort(); +// this.zookeeperClient = new ZkClient(this.zkConnect, zkSessionTimeout, zkConnectionTimeout, +// ZKStringSerializer$.MODULE$); +// +// int kafkaPort = Optional.ofNullable(System.getProperty("kafka.port")) +// .filter(s -> !s.isEmpty()) +// .map(Integer::parseInt) +// .orElse(KAFKA_DEFAULT_PORT); +// Properties brokerConfigProperties = TestUtils.createBrokerConfig(0, this.zkConnect, this.controlledShutdown, +// true, kafkaPort, +// scala.Option.apply(null), +// scala.Option.apply(null), +// scala.Option.apply(null), +// true, false, 0, false, 0, false, 0, scala.Option.apply(null)); +// brokerConfigProperties.setProperty("replica.socket.timeout.ms", "1000"); +// brokerConfigProperties.setProperty("controller.socket.timeout.ms", "1000"); +// brokerConfigProperties.setProperty("offsets.topic.replication.factor", "1"); +// this.kafkaServer = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), SystemTime$.MODULE$); +// +// ZkUtils zkUtils = new ZkUtils(this.zookeeperClient, null, false); +// Properties properties = new Properties(); +// for (String topic : this.topics) { +// if (!AdminUtils.topicExists(zkUtils, topic)) { +// AdminUtils.createTopic(zkUtils, topic, partitions, 1, properties, null); +// } +// } +// } +// +// @Override +// protected void after() { +// try { +// if (this.kafkaServer.brokerState().currentState() != (NotRunning.state())) { +// this.kafkaServer.shutdown(); +// this.kafkaServer.awaitShutdown(); +// } +// } catch (Exception e) { } +// try { +// CoreUtils.delete(this.kafkaServer.config().logDirs()); +// } catch (Exception e) { } +// +// try { +// this.zookeeperClient.close(); +// } catch (ZkInterruptedException e) { } +// +// try { +// this.zookeeper.shutdown(); +// } catch (Exception e) { } +// } +// +// public String getZkConnect() { +// return this.zkConnect; +// } +// +// public String getBrokerAddress() { +// return "localhost:" + kafkaServer.config().port(); +// } +// +// public Properties getProducerProperties() { +// Properties producerProps = new Properties(); +// producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerAddress()); +// producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); +// return producerProps; +// } +// +// public Properties getConsumerProperties() { +// Properties consumerProps = new Properties(); +// consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerAddress()); +// consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "request-group"); +// consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); +// return consumerProps; +// } } diff --git a/dockerfiles/devicehive-auth/devicehive-start.sh b/dockerfiles/devicehive-auth/devicehive-start.sh index b3d633418..f9db23109 100644 --- a/dockerfiles/devicehive-auth/devicehive-start.sh +++ b/dockerfiles/devicehive-auth/devicehive-start.sh @@ -16,7 +16,8 @@ if [ -z "$DH_POSTGRES_ADDRESS" ] \ || [ -z "$DH_POSTGRES_PASSWORD" ] \ || [ -z "$DH_POSTGRES_DB" ] \ || [ -z "$REDIS_MASTER_HOST" ] \ - || [ -z "$REDIS_MASTER_PORT" ] + || [ -z "$REDIS_MASTER_PORT" ] \ + || [ -z "$REDIS_MASTER_PASSWORD" ] then echo "Some of required environment variables are not set or empty." echo "Please check following vars are passed to container:" @@ -26,6 +27,7 @@ then echo "- DH_POSTGRES_DB" echo "- REDIS_MASTER_HOST" echo "- REDIS_MASTER_PORT" + echo "- REDIS_MASTER_PASSWORD" exit 1 fi diff --git a/dockerfiles/devicehive-frontend/devicehive-start.sh b/dockerfiles/devicehive-frontend/devicehive-start.sh index 76dff1571..e35bdbc1b 100644 --- a/dockerfiles/devicehive-frontend/devicehive-start.sh +++ b/dockerfiles/devicehive-frontend/devicehive-start.sh @@ -16,8 +16,11 @@ if [ -z "$DH_POSTGRES_ADDRESS" ] \ || [ -z "$DH_POSTGRES_PASSWORD" ] \ || [ -z "$DH_POSTGRES_DB" ] \ || [ -z "$DH_ZK_ADDRESS" ] \ + || [ -z "$DH_BACKEND_ADDRESS" ] \ + || [ -z "$DH_BACKEND_PORT" ] \ || [ -z "$REDIS_MASTER_HOST" ] \ || [ -z "$REDIS_MASTER_PORT" ] \ + || [ -z "$REDIS_MASTER_PASSWORD" ] \ || [ -z "$DH_AUTH_URL" ] \ || ( [ -z "$DH_KAFKA_BOOTSTRAP_SERVERS" ] && [ -z "$DH_KAFKA_ADDRESS" ] ) then @@ -28,8 +31,11 @@ then echo "- DH_POSTGRES_PASSWORD" echo "- DH_POSTGRES_DB" echo "- DH_ZK_ADDRESS" + echo "- DH_BACKEND_ADDRESS" + echo "- DH_BACKEND_PORT" echo "- REDIS_MASTER_HOST" echo "- REDIS_MASTER_PORT" + echo "- REDIS_MASTER_PASSWORD" echo "- DH_AUTH_URL" echo "And one of variants of Kafka bootstrap parameters:" echo "- DH_KAFKA_BOOTSTRAP_SERVERS for multiple servers" @@ -43,7 +49,7 @@ then DH_KAFKA_BOOTSTRAP_SERVERS="${DH_KAFKA_ADDRESS}:${DH_KAFKA_PORT:-9092}" fi -# Check if Zookeper, Kafka and Postgres are ready +# Check if Zookeper, Kafka, Backend and Postgres are ready while true; do nc -v -z -w1 "$DH_ZK_ADDRESS" "${DH_ZK_PORT:=2181}" result_zk=$? @@ -52,8 +58,9 @@ while true; do result_kafka=$? nc -v -z -w1 "$DH_POSTGRES_ADDRESS" "${DH_POSTGRES_PORT:=5432}" result_postgres=$? - - if [ "$result_kafka" -eq 0 ] && [ "$result_postgres" -eq 0 ] && [ "$result_zk" -eq 0 ]; then + nc -v -z -w1 "$DH_BACKEND_ADDRESS" "${DH_BACKEND_PORT:=8000}" + result_backend=$? + if [ "$result_kafka" -eq 0 ] && [ "$result_postgres" -eq 0 ] && [ "$result_zk" -eq 0 ] && [ "$result_backend" -eq 0 ]; then break fi sleep 3 diff --git a/dockerfiles/devicehive-kafka.Dockerfile b/dockerfiles/devicehive-kafka.Dockerfile index eed87a8c5..d1dc73076 100644 --- a/dockerfiles/devicehive-kafka.Dockerfile +++ b/dockerfiles/devicehive-kafka.Dockerfile @@ -1,8 +1,8 @@ -FROM docker.io/bitnami/kafka:3.1 +FROM docker.io/bitnami/kafka:3.2.0 MAINTAINER devicehive -ENV DH_VERSION="4.1.0" +ENV DH_VERSION="5.0.1" LABEL org.label-schema.url="https://devicehive.com" \ org.label-schema.vendor="DeviceHive" \ diff --git a/pom.xml b/pom.xml index b145d6ff9..01ef304c4 100644 --- a/pom.xml +++ b/pom.xml @@ -34,21 +34,21 @@ UTF-8 - 2.7.5 + 2.7.18 3.8.0 1.6.6 2.1.1 1.1 2.2.4 - 0.10.0.1 - 2.10 + 3.5.1 + 2.13 1.7.5 - 1.1.3 + 1.2.9 4.11 2.12.0 2.3.2 - 9.4-1201-jdbc4 + 42.5.1 1.2.3.RELEASE 4.5.14 0.8 @@ -65,7 +65,7 @@ 3.10.1 3.0.0-M2 - 3.0.0-M7 + 3.2.1 3.2.2 3.3.0 3.1.0 @@ -503,68 +503,6 @@ - - jacoco.coverage - - - checkCoverage - - - - 0.80 - - - - - org.jacoco - jacoco-maven-plugin - ${jacoco-maven-plugin.version} - - ${basedir}/target/coverage-reports/jacoco-unit.exec - ${basedir}/target/coverage-reports/jacoco-unit.exec - - **/model/**/*.* - **/mapper/*.* - - - - - jacoco-initialize - - prepare-agent - - - - jacoco-site - verify - - report - - - - jacoco-check - - check - - - - - BUNDLE - - - INSTRUCTION - COVEREDRATIO - ${bundle-coverage-ratio} - - - - - - - - - - - +