From b2d72a14f607c9ee86c6bca70494c4a75b9183bd Mon Sep 17 00:00:00 2001 From: UserwithnoUsername Date: Fri, 11 Apr 2025 21:27:03 +0200 Subject: [PATCH 1/3] Refactor: Migrate to RxJava 3 across the codebase Updated all references from RxJava 2 to RxJava 3, including core classes, operators, and utility imports. Adjusted affected code to ensure compatibility with the new version. This enhances maintainability and leverages the latest features and improvements. --- build.gradle.kts | 28 +++++---- gradle.properties | 19 +++--- gradle/wrapper/gradle-wrapper.properties | 3 +- .../mqtt3/reactor/Mqtt3ReactorClientView.java | 17 ++--- .../mqtt/reactor/MqttReactorClient.java | 20 +++--- .../restrictions/Mqtt3SendMaximumIT.java | 62 ++++++++++++++++--- .../client/internal/mqtt/MqttAsyncClient.java | 4 +- .../internal/mqtt/MqttBlockingClient.java | 6 +- .../mqtt/MqttClientExecutorConfigImpl.java | 2 +- .../MqttClientExecutorConfigImplBuilder.java | 2 +- .../client/internal/mqtt/MqttRxClient.java | 22 ++++--- .../mqtt3/Mqtt3ExceptionFactory.java | 3 +- .../handler/auth/MqttReAuthCompletable.java | 6 +- .../mqtt/handler/connect/MqttConnAckFlow.java | 5 +- .../handler/connect/MqttConnAckSingle.java | 7 ++- .../disconnect/MqttDisconnectCompletable.java | 6 +- .../MqttGlobalIncomingPublishFlowable.java | 3 +- .../incoming/MqttIncomingPublishFlow.java | 7 ++- .../publish/outgoing/MqttAckFlowable.java | 4 +- .../publish/outgoing/MqttAckFlowableFlow.java | 4 +- .../publish/outgoing/MqttAckSingle.java | 10 +-- .../outgoing/MqttAckSingleFlowable.java | 4 +- .../outgoing/MqttOutgoingQosHandler.java | 4 +- .../outgoing/MqttPublishFlowableAckLink.java | 7 ++- .../outgoing/MqttPublishFlowables.java | 4 +- .../handler/subscribe/MqttSubAckSingle.java | 5 +- .../subscribe/MqttSubOrUnsubAckFlow.java | 4 +- .../handler/subscribe/MqttUnsubAckSingle.java | 4 +- .../connack/mqtt3/Mqtt3ConnAckView.java | 3 +- .../publish/mqtt3/Mqtt3PublishResultView.java | 3 +- .../publish/mqtt3/Mqtt3PublishView.java | 3 +- .../suback/mqtt3/Mqtt3SubAckView.java | 2 +- .../mqtt/mqtt3/Mqtt3RxClientView.java | 9 +-- .../client/internal/rx/CompletableFlow.java | 5 +- .../client/internal/rx/RxFutureConverter.java | 6 +- .../rx/WithSingleConditionalSubscriber.java | 2 +- .../rx/WithSingleStrictSubscriber.java | 5 +- .../operators/FlowableWithSingleCombine.java | 7 ++- .../rx/operators/FlowableWithSingleMap.java | 6 +- .../operators/FlowableWithSingleMapError.java | 11 ++-- .../FlowableWithSingleObserveOn.java | 3 +- .../client/mqtt/MqttClientExecutorConfig.java | 4 +- .../MqttClientExecutorConfigBuilderBase.java | 3 +- .../client/mqtt/mqtt3/Mqtt3RxClient.java | 7 ++- .../client/mqtt/mqtt5/Mqtt5RxClient.java | 6 +- .../hivemq/client/rx/FlowableWithSingle.java | 16 ++--- .../rx/FlowableWithSingleSubscriber.java | 2 +- .../client/example/Mqtt3ClientExample.java | 7 ++- .../Mqtt3RxClientViewExceptionsTest.java | 9 +-- .../internal/rx/RxFutureConverterTest.java | 7 ++- .../client/rx/FlowableWithSingleItem.java | 5 +- .../client/rx/FlowableWithSingleSplit.java | 3 +- .../client/rx/FlowableWithSingleTest.java | 15 ++--- 53 files changed, 254 insertions(+), 167 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 5fe82547c..b1519e985 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,11 +1,13 @@ +import java.net.URI + plugins { id("java-library") - id("com.github.johnrengelman.shadow") - id("biz.aQute.bnd.builder") + id("com.github.johnrengelman.shadow") version "8.1.1" + id("biz.aQute.bnd.builder") version "6.4.0" id("maven-publish") id("io.github.gradle-nexus.publish-plugin") id("signing") - id("com.github.hierynomus.license") + id("com.github.hierynomus.license") version "0.16.1" id("pmd") id("com.github.sgtsilvio.gradle.utf8") id("com.github.sgtsilvio.gradle.metadata") @@ -54,8 +56,8 @@ allprojects { allprojects { plugins.withId("java") { java { - sourceCompatibility = JavaVersion.VERSION_1_8 - targetCompatibility = JavaVersion.VERSION_1_8 + sourceCompatibility = JavaVersion.VERSION_11 + targetCompatibility = JavaVersion.VERSION_11 } plugins.apply("com.github.sgtsilvio.gradle.utf8") @@ -66,7 +68,7 @@ allprojects { /* ******************** dependencies ******************** */ dependencies { - api("io.reactivex.rxjava2:rxjava:${property("rxjava.version")}") + api("io.reactivex.rxjava3:rxjava:${property("rxjava.version")}") api("org.reactivestreams:reactive-streams:${property("reactive-streams.version")}") implementation("io.netty:netty-buffer:${property("netty.version")}") @@ -77,6 +79,7 @@ dependencies { implementation("org.jctools:jctools-core:${property("jctools.version")}") implementation("org.jetbrains:annotations:${property("annotations.version")}") implementation("com.google.dagger:dagger:${property("dagger.version")}") + implementation("io.projectreactor:reactor-core:${property("reactor-core.version")}")//3.5.7 compileOnly("org.slf4j:slf4j-api:${property("slf4j.version")}") @@ -122,8 +125,8 @@ dependencies { testImplementation("nl.jqno.equalsverifier:equalsverifier:${property("equalsverifier.version")}") testImplementation("org.mockito:mockito-core:${property("mockito.version")}") testImplementation("com.google.guava:guava:${property("guava.version")}") - testImplementation("org.bouncycastle:bcprov-jdk15on:${property("bouncycastle.version")}") - testImplementation("org.bouncycastle:bcpkix-jdk15on:${property("bouncycastle.version")}") + testImplementation("org.bouncycastle:bcprov-jdk18on:${property("bouncycastle.version")}") + testImplementation("org.bouncycastle:bcpkix-jdk18on:${property("bouncycastle.version")}") testImplementation("org.eclipse.paho:org.eclipse.paho.client.mqttv3:${property("paho.version")}") testRuntimeOnly("org.slf4j:slf4j-simple:${property("slf4j.version")}") } @@ -143,7 +146,9 @@ val integrationTestRuntimeOnly: Configuration by configurations.getting { } dependencies { - integrationTestImplementation("com.hivemq:hivemq-testcontainer-junit5:${property("hivemq-testcontainer.version")}") + integrationTestImplementation("org.testcontainers:testcontainers:${property("hivemq-testcontainer.version")}") + integrationTestImplementation("org.testcontainers:hivemq:${property("hivemq-testcontainer.version")}") + integrationTestImplementation("com.hivemq:hivemq-extension-sdk:${property("hivemq-extension-sdk.version")}") integrationTestImplementation("org.awaitility:awaitility:${property("awaitility.version")}") } @@ -324,7 +329,8 @@ allprojects { license { header = rootDir.resolve("HEADER") mapping("java", "SLASHSTAR_STYLE") - } + headerURI = URI("https://raw.githubusercontent.com/hivemq/hivemq-mqtt-client/refs/heads/master/HEADER") + } } allprojects { @@ -339,7 +345,7 @@ allprojects { } } -apply("$rootDir/gradle/japicc.gradle.kts") + /* ******************** build cache ******************** */ diff --git a/gradle.properties b/gradle.properties index 5f19da623..75cca4bb1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,35 +3,36 @@ prevVersion=1.3.4 # # main dependencies # -rxjava.version=2.2.21 +rxjava.version=3.1.10 reactive-streams.version=1.0.4 netty.version=4.1.119.Final -jctools.version=2.1.2 +jctools.version=4.0.5 annotations.version=26.0.2 -dagger.version=2.27 +dagger.version=2.56.1 slf4j.version=1.7.36 reactor.version=3.3.4.RELEASE reactor-adapter.version=3.3.3.RELEASE +reactor-core.version=3.5.7 # # test dependencies # -junit-jupiter.version=5.5.2 +junit-jupiter.version=5.8.1 equalsverifier.version=3.17.5 mockito.version=2.28.2 -guava.version=24.1.1-jre -bouncycastle.version=1.59 +guava.version=33.3.1-jre +bouncycastle.version=1.80 paho.version=1.2.5 # # integration test dependencies # -hivemq-testcontainer.version=2.0.0 -hivemq-extension-sdk.version=4.7.2 +hivemq-testcontainer.version=1.20.6 +hivemq-extension-sdk.version=4.38.0 awaitility.version=4.2.2 # # plugins # plugin.shadow.version=6.1.0 -plugin.bnd.version=5.3.0 +plugin.bnd.version=6.4.0 plugin.nexus-publish.version=1.3.0 plugin.license.version=0.15.0 plugin.utf8.version=0.1.0 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 28ff446a2..409536223 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ +#Fri Apr 11 18:00:08 CEST 2025 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/reactor/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/reactor/Mqtt3ReactorClientView.java b/reactor/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/reactor/Mqtt3ReactorClientView.java index 5cfb253bd..683bf6e60 100644 --- a/reactor/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/reactor/Mqtt3ReactorClientView.java +++ b/reactor/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/reactor/Mqtt3ReactorClientView.java @@ -34,10 +34,11 @@ import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe; import com.hivemq.client.mqtt.mqtt3.reactor.Mqtt3ReactorClient; import com.hivemq.client.rx.reactor.FluxWithSingle; -import io.reactivex.Flowable; +import io.reactivex.rxjava3.core.Flowable; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; -import reactor.adapter.rxjava.RxJava2Adapter; + +import reactor.adapter.rxjava.RxJava3Adapter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -59,7 +60,7 @@ public Mqtt3ReactorClientView(final @NotNull Mqtt3RxClient delegate) { @Override public @NotNull Mono connect(final @NotNull Mqtt3Connect connect) { - return RxJava2Adapter.singleToMono(delegate.connect(connect)); + return RxJava3Adapter.singleToMono(delegate.connect(connect)); } @Override @@ -69,7 +70,7 @@ public Mqtt3ReactorClientView(final @NotNull Mqtt3RxClient delegate) { @Override public @NotNull Mono subscribe(final @NotNull Mqtt3Subscribe subscribe) { - return RxJava2Adapter.singleToMono(delegate.subscribe(subscribe)); + return RxJava3Adapter.singleToMono(delegate.subscribe(subscribe)); } @Override @@ -105,12 +106,12 @@ public Mqtt3ReactorClientView(final @NotNull Mqtt3RxClient delegate) { public @NotNull Flux publishes( final @NotNull MqttGlobalPublishFilter filter, final boolean manualAcknowledgement) { - return RxJava2Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement)); + return RxJava3Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement)); } @Override public @NotNull Mono unsubscribe(final @NotNull Mqtt3Unsubscribe unsubscribe) { - return RxJava2Adapter.completableToMono(delegate.unsubscribe(unsubscribe)); + return RxJava3Adapter.completableToMono(delegate.unsubscribe(unsubscribe)); } @Override @@ -120,12 +121,12 @@ public Mqtt3ReactorClientView(final @NotNull Mqtt3RxClient delegate) { @Override public @NotNull Flux publish(final @NotNull Publisher publisher) { - return RxJava2Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher))); + return RxJava3Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher))); } @Override public @NotNull Mono disconnect() { - return RxJava2Adapter.completableToMono(delegate.disconnect()); + return RxJava3Adapter.completableToMono(delegate.disconnect()); } @Override diff --git a/reactor/src/main/java/com/hivemq/client/internal/mqtt/reactor/MqttReactorClient.java b/reactor/src/main/java/com/hivemq/client/internal/mqtt/reactor/MqttReactorClient.java index 351ed3ebe..574ce1cb3 100644 --- a/reactor/src/main/java/com/hivemq/client/internal/mqtt/reactor/MqttReactorClient.java +++ b/reactor/src/main/java/com/hivemq/client/internal/mqtt/reactor/MqttReactorClient.java @@ -38,10 +38,12 @@ import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; import com.hivemq.client.mqtt.mqtt5.reactor.Mqtt5ReactorClient; import com.hivemq.client.rx.reactor.FluxWithSingle; -import io.reactivex.Flowable; + +import io.reactivex.rxjava3.core.Flowable; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; -import reactor.adapter.rxjava.RxJava2Adapter; + +import reactor.adapter.rxjava.RxJava3Adapter; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -63,7 +65,7 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) { @Override public @NotNull Mono connect(final @NotNull Mqtt5Connect connect) { - return RxJava2Adapter.singleToMono(delegate.connect(connect)); + return RxJava3Adapter.singleToMono(delegate.connect(connect)); } @Override @@ -73,7 +75,7 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) { @Override public @NotNull Mono subscribe(final @NotNull Mqtt5Subscribe subscribe) { - return RxJava2Adapter.singleToMono(delegate.subscribe(subscribe)); + return RxJava3Adapter.singleToMono(delegate.subscribe(subscribe)); } @Override @@ -109,12 +111,12 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) { public @NotNull Flux publishes( final @NotNull MqttGlobalPublishFilter filter, final boolean manualAcknowledgement) { - return RxJava2Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement)); + return RxJava3Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement)); } @Override public @NotNull Mono unsubscribe(final @NotNull Mqtt5Unsubscribe unsubscribe) { - return RxJava2Adapter.singleToMono(delegate.unsubscribe(unsubscribe)); + return RxJava3Adapter.singleToMono(delegate.unsubscribe(unsubscribe)); } @Override @@ -124,12 +126,12 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) { @Override public @NotNull Flux publish(final @NotNull Publisher publisher) { - return RxJava2Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher))); + return RxJava3Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher))); } @Override public @NotNull Mono reauth() { - return RxJava2Adapter.completableToMono(delegate.reauth()); + return RxJava3Adapter.completableToMono(delegate.reauth()); } @Override @@ -139,7 +141,7 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) { @Override public @NotNull Mono disconnect(final @NotNull Mqtt5Disconnect disconnect) { - return RxJava2Adapter.completableToMono(delegate.disconnect(disconnect)); + return RxJava3Adapter.completableToMono(delegate.disconnect(disconnect)); } @Override diff --git a/src/integrationTest/java/com/hivemq/client/restrictions/Mqtt3SendMaximumIT.java b/src/integrationTest/java/com/hivemq/client/restrictions/Mqtt3SendMaximumIT.java index 7c587315f..cf9097aec 100644 --- a/src/integrationTest/java/com/hivemq/client/restrictions/Mqtt3SendMaximumIT.java +++ b/src/integrationTest/java/com/hivemq/client/restrictions/Mqtt3SendMaximumIT.java @@ -33,24 +33,29 @@ import com.hivemq.extension.sdk.api.parameter.ExtensionStopOutput; import com.hivemq.extension.sdk.api.services.Services; import com.hivemq.extension.sdk.api.services.intializer.ClientInitializer; -import com.hivemq.testcontainer.core.HiveMQExtension; -import com.hivemq.testcontainer.junit5.HiveMQTestContainerExtension; + import org.jetbrains.annotations.NotNull; +import org.junit.Rule; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; +import org.testcontainers.hivemq.HiveMQContainer; +import org.testcontainers.hivemq.HiveMQExtension; +import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.MountableFile; import java.time.Duration; +import java.util.Collections; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; /** * @author Yannick Weber */ -public class Mqtt3SendMaximumIT { + +class Mqtt3SendMaximumIT { public static final int RECEIVE_MAXIMUM = 10; public static final @NotNull HiveMQExtension NO_PUBACK_EXTENSION = HiveMQExtension.builder() @@ -61,14 +66,53 @@ public class Mqtt3SendMaximumIT { .mainClass(NoPubackExtension.class) .build(); - @RegisterExtension - public final @NotNull HiveMQTestContainerExtension hivemq = - new HiveMQTestContainerExtension().withExtension(NO_PUBACK_EXTENSION) - .withHiveMQConfig(MountableFile.forClasspathResource("/config.xml")); + @Rule + public HiveMQContainer hivemq = new HiveMQContainer(DockerImageName.parse("hivemq/hivemq-ce") + .withTag("2021.3")) + .withExposedPorts(1883) + .withExtension(NO_PUBACK_EXTENSION) + .withHiveMQConfig(MountableFile.forClasspathResource("/config.xml")); + + + public void StartContainer(){ + hivemq.start(); + hivemq.setExposedPorts(Collections.singletonList(hivemq.getMqttPort())); + } @Test - void mqtt3_sendMaximum_applied() throws InterruptedException { + public void test_mqtt() throws Exception { + StartContainer(); + var publisher = Mqtt5Client.builder() + .serverPort(hivemq.getMqttPort()) // 3 + .serverHost(hivemq.getHost()) + .identifier("publisher") + .buildBlocking(); + + publisher.connect(); + + var subscriber = Mqtt5Client.builder() + .serverPort(hivemq.getMqttPort()) // 3 + .serverHost(hivemq.getHost()) + .identifier("subscriber") + .buildBlocking(); + + var publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL); + subscriber.connect(); + subscriber.subscribeWith().topicFilter("topic/test").send(); + publisher.publishWith() + .topic("topic/test") + .payload("Hello World!".getBytes()).send(); + + var receive = publishes.receive(); + + assertNotNull(receive); // 4 + assertEquals("Hello World!", new String(receive.getPayloadAsBytes())); // 4 + } + + @Test + void mqtt3_sendMaximum_applied() throws InterruptedException { + StartContainer(); final Mqtt3Client publisher = Mqtt3Client.builder().serverPort(hivemq.getMqttPort()).build(); publisher.toBlocking().connectWith().restrictions().sendMaximum(RECEIVE_MAXIMUM).applyRestrictions().send(); diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java index e9c768431..8992ae9f2 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java @@ -40,8 +40,8 @@ import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; -import io.reactivex.FlowableSubscriber; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.rxjava3.core.FlowableSubscriber; +import io.reactivex.rxjava3.schedulers.Schedulers; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscription; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java index a5c86b83e..98f3e47ce 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java @@ -45,9 +45,9 @@ import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; -import io.reactivex.Flowable; -import io.reactivex.FlowableSubscriber; -import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableSubscriber; +import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscription; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImpl.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImpl.java index 4f1b290f9..b899f68f4 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImpl.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImpl.java @@ -17,7 +17,7 @@ package com.hivemq.client.internal.mqtt; import com.hivemq.client.mqtt.MqttClientExecutorConfig; -import io.reactivex.Scheduler; +import io.reactivex.rxjava3.core.Scheduler; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImplBuilder.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImplBuilder.java index 6c68726af..4bf1fb8c2 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImplBuilder.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImplBuilder.java @@ -18,7 +18,7 @@ import com.hivemq.client.internal.util.Checks; import com.hivemq.client.mqtt.MqttClientExecutorConfigBuilder; -import io.reactivex.Scheduler; +import io.reactivex.rxjava3.core.Scheduler; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java index c9fa963ea..41e1fd6c0 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java @@ -49,14 +49,15 @@ import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; import com.hivemq.client.rx.FlowableWithSingle; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Scheduler; -import io.reactivex.Single; -import io.reactivex.functions.Function; -import io.reactivex.internal.fuseable.ScalarCallable; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; + +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import reactor.core.Fuseable; /** * @author Silvio Giebl @@ -214,9 +215,14 @@ public MqttRxClient(final @NotNull MqttClientConfig clientConfig) { final @NotNull Flowable

publishFlowable, final @NotNull Function publishMapper) { final Scheduler applicationScheduler = clientConfig.getExecutorConfig().getApplicationScheduler(); - if (publishFlowable instanceof ScalarCallable) { + if (publishFlowable instanceof Fuseable.ScalarCallable) { //noinspection unchecked - final P publish = ((ScalarCallable

) publishFlowable).call(); + final P publish; + try { + publish = ((Fuseable.ScalarCallable

) publishFlowable).call(); + } catch (Exception e) { + return Flowable.error(e); + } if (publish == null) { return Flowable.empty(); } diff --git a/src/main/java/com/hivemq/client/internal/mqtt/exceptions/mqtt3/Mqtt3ExceptionFactory.java b/src/main/java/com/hivemq/client/internal/mqtt/exceptions/mqtt3/Mqtt3ExceptionFactory.java index 6c3083380..30cbb5b33 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/exceptions/mqtt3/Mqtt3ExceptionFactory.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/exceptions/mqtt3/Mqtt3ExceptionFactory.java @@ -24,7 +24,8 @@ import com.hivemq.client.mqtt.mqtt3.exceptions.*; import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5MessageException; import com.hivemq.client.mqtt.mqtt5.message.Mqtt5Message; -import io.reactivex.functions.Function; + +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/auth/MqttReAuthCompletable.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/auth/MqttReAuthCompletable.java index f8335c314..5d2ac3008 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/auth/MqttReAuthCompletable.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/auth/MqttReAuthCompletable.java @@ -22,9 +22,9 @@ import com.hivemq.client.internal.rx.CompletableFlow; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; -import io.reactivex.Completable; -import io.reactivex.CompletableObserver; -import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.internal.disposables.EmptyDisposable; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckFlow.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckFlow.java index b6d2215ca..e0f98167f 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckFlow.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckFlow.java @@ -17,8 +17,9 @@ package com.hivemq.client.internal.mqtt.handler.connect; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; -import io.reactivex.SingleObserver; -import io.reactivex.disposables.Disposable; + +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java index 31e8f13c0..af6c408d8 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java @@ -31,9 +31,10 @@ import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import io.netty.bootstrap.Bootstrap; import io.netty.channel.EventLoop; -import io.reactivex.Single; -import io.reactivex.SingleObserver; -import io.reactivex.internal.disposables.EmptyDisposable; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.internal.disposables.EmptyDisposable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectCompletable.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectCompletable.java index 6b70bac2e..5dbdd048d 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectCompletable.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/disconnect/MqttDisconnectCompletable.java @@ -22,9 +22,9 @@ import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnect; import com.hivemq.client.internal.rx.CompletableFlow; import io.netty.channel.Channel; -import io.reactivex.Completable; -import io.reactivex.CompletableObserver; -import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.internal.disposables.EmptyDisposable; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttGlobalIncomingPublishFlowable.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttGlobalIncomingPublishFlowable.java index 05a71dc17..c7d6e993c 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttGlobalIncomingPublishFlowable.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttGlobalIncomingPublishFlowable.java @@ -21,7 +21,8 @@ import com.hivemq.client.internal.mqtt.ioc.ClientComponent; import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; -import io.reactivex.Flowable; + +import io.reactivex.rxjava3.core.Flowable; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java index a1ed75d3a..7af0d8ed4 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlow.java @@ -20,9 +20,10 @@ import com.hivemq.client.internal.mqtt.MqttClientConfig; import com.hivemq.client.internal.mqtt.handler.util.FlowWithEventLoop; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; -import io.reactivex.Emitter; -import io.reactivex.internal.util.BackpressureHelper; -import io.reactivex.plugins.RxJavaPlugins; + +import io.reactivex.rxjava3.core.Emitter; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java index 6c0f4a274..fda8cfea2 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowable.java @@ -21,8 +21,8 @@ import com.hivemq.client.internal.mqtt.ioc.ClientComponent; import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -import io.reactivex.Flowable; -import io.reactivex.internal.subscriptions.EmptySubscription; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.internal.subscriptions.EmptySubscription; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowableFlow.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowableFlow.java index 44d3ef4fe..40c843fa5 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowableFlow.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckFlowableFlow.java @@ -21,8 +21,8 @@ import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowableAckLink.LinkedFlow; import com.hivemq.client.internal.mqtt.message.publish.MqttPublishResult; import com.hivemq.client.internal.util.collections.ChunkedArrayQueue; -import io.reactivex.internal.util.BackpressureHelper; -import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingle.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingle.java index 160d34afa..98c7db0a4 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingle.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingle.java @@ -23,11 +23,11 @@ import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; import com.hivemq.client.internal.mqtt.message.publish.MqttPublishResult; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -import io.reactivex.Flowable; -import io.reactivex.Single; -import io.reactivex.SingleObserver; -import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.internal.disposables.EmptyDisposable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingleFlowable.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingleFlowable.java index 0592e148e..7e3432a84 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingleFlowable.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttAckSingleFlowable.java @@ -23,8 +23,8 @@ import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; import com.hivemq.client.internal.mqtt.message.publish.MqttPublishResult; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -import io.reactivex.Flowable; -import io.reactivex.internal.subscriptions.EmptySubscription; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.internal.subscriptions.EmptySubscription; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttOutgoingQosHandler.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttOutgoingQosHandler.java index 7eb3e534e..614c9e9fe 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttOutgoingQosHandler.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttOutgoingQosHandler.java @@ -58,8 +58,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.EventLoop; -import io.reactivex.Flowable; -import io.reactivex.FlowableSubscriber; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableSubscriber; import org.jctools.queues.SpscUnboundedArrayQueue; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowableAckLink.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowableAckLink.java index b72298d2c..cfd5e11d8 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowableAckLink.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowableAckLink.java @@ -19,9 +19,10 @@ import com.hivemq.client.internal.logging.InternalLogger; import com.hivemq.client.internal.logging.InternalLoggerFactory; import com.hivemq.client.internal.mqtt.message.publish.MqttPublish; -import io.reactivex.Flowable; -import io.reactivex.FlowableSubscriber; -import io.reactivex.plugins.RxJavaPlugins; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableSubscriber; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowables.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowables.java index f7586fdc3..da0d1e11e 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowables.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/outgoing/MqttPublishFlowables.java @@ -19,8 +19,8 @@ import com.hivemq.client.internal.logging.InternalLogger; import com.hivemq.client.internal.logging.InternalLoggerFactory; import com.hivemq.client.internal.mqtt.ioc.ClientScope; -import io.reactivex.Flowable; -import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubAckSingle.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubAckSingle.java index ab914a897..b8f3240b7 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubAckSingle.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubAckSingle.java @@ -21,8 +21,9 @@ import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribe; import com.hivemq.client.internal.mqtt.message.subscribe.suback.MqttSubAck; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; -import io.reactivex.Single; -import io.reactivex.SingleObserver; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubOrUnsubAckFlow.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubOrUnsubAckFlow.java index 0dad079a3..61e716b2b 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubOrUnsubAckFlow.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubOrUnsubAckFlow.java @@ -18,8 +18,8 @@ import com.hivemq.client.internal.mqtt.MqttClientConfig; import com.hivemq.client.internal.mqtt.handler.util.FlowWithEventLoop; -import io.reactivex.SingleObserver; -import io.reactivex.disposables.Disposable; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttUnsubAckSingle.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttUnsubAckSingle.java index 197a87716..47836cdd5 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttUnsubAckSingle.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttUnsubAckSingle.java @@ -21,8 +21,8 @@ import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribe; import com.hivemq.client.internal.mqtt.message.unsubscribe.unsuback.MqttUnsubAck; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; -import io.reactivex.Single; -import io.reactivex.SingleObserver; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/connect/connack/mqtt3/Mqtt3ConnAckView.java b/src/main/java/com/hivemq/client/internal/mqtt/message/connect/connack/mqtt3/Mqtt3ConnAckView.java index 161315ec9..37b60fcaf 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/connect/connack/mqtt3/Mqtt3ConnAckView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/connect/connack/mqtt3/Mqtt3ConnAckView.java @@ -24,7 +24,8 @@ import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAckReturnCode; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode; -import io.reactivex.functions.Function; + +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishResultView.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishResultView.java index 938ccaa22..3041f2dbf 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishResultView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishResultView.java @@ -21,7 +21,8 @@ import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishResult; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -import io.reactivex.functions.Function; + +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java index e5861a92a..14568404e 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/publish/mqtt3/Mqtt3PublishView.java @@ -27,7 +27,8 @@ import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5WillPublish; -import io.reactivex.functions.Function; + +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/message/subscribe/suback/mqtt3/Mqtt3SubAckView.java b/src/main/java/com/hivemq/client/internal/mqtt/message/subscribe/suback/mqtt3/Mqtt3SubAckView.java index 940507d6c..2c98752ed 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/message/subscribe/suback/mqtt3/Mqtt3SubAckView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/message/subscribe/suback/mqtt3/Mqtt3SubAckView.java @@ -24,7 +24,7 @@ import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAckReturnCode; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode; -import io.reactivex.functions.Function; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientView.java b/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientView.java index b85701b6f..7e9bddf70 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientView.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientView.java @@ -47,10 +47,11 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.rx.FlowableWithSingle; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Single; -import io.reactivex.functions.Function; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/rx/CompletableFlow.java b/src/main/java/com/hivemq/client/internal/rx/CompletableFlow.java index c6acc42ee..d42a8fb3a 100644 --- a/src/main/java/com/hivemq/client/internal/rx/CompletableFlow.java +++ b/src/main/java/com/hivemq/client/internal/rx/CompletableFlow.java @@ -16,8 +16,9 @@ package com.hivemq.client.internal.rx; -import io.reactivex.CompletableObserver; -import io.reactivex.disposables.Disposable; + +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.disposables.Disposable; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/internal/rx/RxFutureConverter.java b/src/main/java/com/hivemq/client/internal/rx/RxFutureConverter.java index fa15aff6d..34dd20091 100644 --- a/src/main/java/com/hivemq/client/internal/rx/RxFutureConverter.java +++ b/src/main/java/com/hivemq/client/internal/rx/RxFutureConverter.java @@ -16,8 +16,10 @@ package com.hivemq.client.internal.rx; -import io.reactivex.*; -import io.reactivex.disposables.Disposable; + + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/internal/rx/WithSingleConditionalSubscriber.java b/src/main/java/com/hivemq/client/internal/rx/WithSingleConditionalSubscriber.java index a874ad7c2..07c6b997e 100644 --- a/src/main/java/com/hivemq/client/internal/rx/WithSingleConditionalSubscriber.java +++ b/src/main/java/com/hivemq/client/internal/rx/WithSingleConditionalSubscriber.java @@ -17,7 +17,7 @@ package com.hivemq.client.internal.rx; import com.hivemq.client.rx.FlowableWithSingleSubscriber; -import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.operators.ConditionalSubscriber; /** * @author Silvio Giebl diff --git a/src/main/java/com/hivemq/client/internal/rx/WithSingleStrictSubscriber.java b/src/main/java/com/hivemq/client/internal/rx/WithSingleStrictSubscriber.java index 587b7a840..f08fe9857 100644 --- a/src/main/java/com/hivemq/client/internal/rx/WithSingleStrictSubscriber.java +++ b/src/main/java/com/hivemq/client/internal/rx/WithSingleStrictSubscriber.java @@ -18,8 +18,9 @@ import com.hivemq.client.rx.FlowableWithSingleSubscriber; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.BackpressureHelper; + +import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; +import io.reactivex.rxjava3.internal.util.BackpressureHelper; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscription; diff --git a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleCombine.java b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleCombine.java index 2a6485a4f..5cb4c9048 100644 --- a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleCombine.java +++ b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleCombine.java @@ -20,9 +20,10 @@ import com.hivemq.client.rx.FlowableWithSingle; import com.hivemq.client.rx.FlowableWithSingleSubscriber; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.Flowable; -import io.reactivex.internal.fuseable.ConditionalSubscriber; -import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.core.Flowable; + +import io.reactivex.rxjava3.internal.util.BackpressureHelper; +import io.reactivex.rxjava3.operators.ConditionalSubscriber; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMap.java b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMap.java index 1c26f0646..fa84faeff 100644 --- a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMap.java +++ b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMap.java @@ -21,9 +21,9 @@ import com.hivemq.client.rx.FlowableWithSingle; import com.hivemq.client.rx.FlowableWithSingleSubscriber; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.exceptions.Exceptions; -import io.reactivex.functions.Function; -import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Function; +import io.reactivex.rxjava3.operators.ConditionalSubscriber; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMapError.java b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMapError.java index f27ba8cbb..4bf170051 100644 --- a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMapError.java +++ b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleMapError.java @@ -21,11 +21,12 @@ import com.hivemq.client.rx.FlowableWithSingle; import com.hivemq.client.rx.FlowableWithSingleSubscriber; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.FlowableSubscriber; -import io.reactivex.exceptions.CompositeException; -import io.reactivex.exceptions.Exceptions; -import io.reactivex.functions.Function; -import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.rxjava3.core.FlowableSubscriber; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.functions.Function; + +import io.reactivex.rxjava3.operators.ConditionalSubscriber; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleObserveOn.java b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleObserveOn.java index 0394f62da..c2302d610 100644 --- a/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleObserveOn.java +++ b/src/main/java/com/hivemq/client/internal/rx/operators/FlowableWithSingleObserveOn.java @@ -18,7 +18,8 @@ import com.hivemq.client.rx.FlowableWithSingle; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.Scheduler; + +import io.reactivex.rxjava3.core.Scheduler; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfig.java b/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfig.java index 5b98e764f..dfcaa24c5 100644 --- a/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfig.java +++ b/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfig.java @@ -18,8 +18,8 @@ import com.hivemq.client.annotations.DoNotImplement; import com.hivemq.client.internal.mqtt.MqttClientExecutorConfigImplBuilder; -import io.reactivex.Scheduler; -import io.reactivex.schedulers.Schedulers; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.schedulers.Schedulers; import org.jetbrains.annotations.NotNull; import java.util.Optional; diff --git a/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfigBuilderBase.java b/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfigBuilderBase.java index 1ac5696e0..fc502920b 100644 --- a/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfigBuilderBase.java +++ b/src/main/java/com/hivemq/client/mqtt/MqttClientExecutorConfigBuilderBase.java @@ -18,7 +18,8 @@ import com.hivemq.client.annotations.CheckReturnValue; import com.hivemq.client.annotations.DoNotImplement; -import io.reactivex.Scheduler; + +import io.reactivex.rxjava3.core.Scheduler; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3RxClient.java b/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3RxClient.java index 380bbd564..04d5ed35e 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3RxClient.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt3/Mqtt3RxClient.java @@ -30,9 +30,10 @@ import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe; import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3UnsubscribeBuilder; import com.hivemq.client.rx.FlowableWithSingle; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Single; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5RxClient.java b/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5RxClient.java index 9fae6f419..dd30a0a41 100644 --- a/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5RxClient.java +++ b/src/main/java/com/hivemq/client/mqtt/mqtt5/Mqtt5RxClient.java @@ -33,9 +33,9 @@ import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5UnsubscribeBuilder; import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; import com.hivemq.client.rx.FlowableWithSingle; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Single; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import org.jetbrains.annotations.NotNull; /** diff --git a/src/main/java/com/hivemq/client/rx/FlowableWithSingle.java b/src/main/java/com/hivemq/client/rx/FlowableWithSingle.java index 5eb5de2be..03f94a6ef 100644 --- a/src/main/java/com/hivemq/client/rx/FlowableWithSingle.java +++ b/src/main/java/com/hivemq/client/rx/FlowableWithSingle.java @@ -24,14 +24,14 @@ import com.hivemq.client.internal.util.Checks; import com.hivemq.client.rx.reactivestreams.PublisherWithSingle; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.Flowable; -import io.reactivex.Scheduler; -import io.reactivex.annotations.BackpressureKind; -import io.reactivex.annotations.BackpressureSupport; -import io.reactivex.annotations.SchedulerSupport; -import io.reactivex.functions.Action; -import io.reactivex.functions.Consumer; -import io.reactivex.functions.Function; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.annotations.BackpressureKind; +import io.reactivex.rxjava3.annotations.BackpressureSupport; +import io.reactivex.rxjava3.annotations.SchedulerSupport; +import io.reactivex.rxjava3.functions.Action; +import io.reactivex.rxjava3.functions.Consumer; +import io.reactivex.rxjava3.functions.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/main/java/com/hivemq/client/rx/FlowableWithSingleSubscriber.java b/src/main/java/com/hivemq/client/rx/FlowableWithSingleSubscriber.java index 2f54963b6..42a7e4e96 100644 --- a/src/main/java/com/hivemq/client/rx/FlowableWithSingleSubscriber.java +++ b/src/main/java/com/hivemq/client/rx/FlowableWithSingleSubscriber.java @@ -17,7 +17,7 @@ package com.hivemq.client.rx; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.FlowableSubscriber; +import io.reactivex.rxjava3.core.FlowableSubscriber; /** * Represents a Reactive-Streams inspired {@link WithSingleSubscriber} that is RxJava 2 only and weakens rules for diff --git a/src/test/java/com/hivemq/client/example/Mqtt3ClientExample.java b/src/test/java/com/hivemq/client/example/Mqtt3ClientExample.java index 1517ab22d..4f811acdd 100644 --- a/src/test/java/com/hivemq/client/example/Mqtt3ClientExample.java +++ b/src/test/java/com/hivemq/client/example/Mqtt3ClientExample.java @@ -29,9 +29,10 @@ import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishResult; import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscribe; import com.hivemq.client.mqtt.mqtt3.message.subscribe.Mqtt3Subscription; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Single; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; diff --git a/src/test/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientViewExceptionsTest.java b/src/test/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientViewExceptionsTest.java index 1b2a9fb41..c0062fe01 100644 --- a/src/test/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientViewExceptionsTest.java +++ b/src/test/java/com/hivemq/client/internal/mqtt/mqtt3/Mqtt3RxClientViewExceptionsTest.java @@ -32,9 +32,9 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck; import com.hivemq.client.rx.FlowableWithSingleSplit; -import io.reactivex.Completable; -import io.reactivex.Flowable; -import io.reactivex.Single; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Single; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -89,7 +89,8 @@ void subscribeWithStream() { final Mqtt5MessageException mqtt5MessageException = new Mqtt5DisconnectException(MqttDisconnect.DEFAULT, "reason from original exception"); given(mqtt5Client.subscribePublishes(any(), anyBoolean())).willReturn( - new FlowableWithSingleSplit<>(Flowable.error(mqtt5MessageException), Mqtt5Publish.class, + new FlowableWithSingleSplit<>( + Flowable.error(mqtt5MessageException), Mqtt5Publish.class, Mqtt5SubAck.class)); final Mqtt3Subscribe subscribe = Mqtt3Subscribe.builder() diff --git a/src/test/java/com/hivemq/client/internal/rx/RxFutureConverterTest.java b/src/test/java/com/hivemq/client/internal/rx/RxFutureConverterTest.java index 7c9b0df71..a0f8edfa0 100644 --- a/src/test/java/com/hivemq/client/internal/rx/RxFutureConverterTest.java +++ b/src/test/java/com/hivemq/client/internal/rx/RxFutureConverterTest.java @@ -16,9 +16,10 @@ package com.hivemq.client.internal.rx; -import io.reactivex.*; -import io.reactivex.disposables.Disposable; -import io.reactivex.schedulers.Schedulers; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.schedulers.Schedulers; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Test; diff --git a/src/test/java/com/hivemq/client/rx/FlowableWithSingleItem.java b/src/test/java/com/hivemq/client/rx/FlowableWithSingleItem.java index 2018e9d7e..9c073eacd 100644 --- a/src/test/java/com/hivemq/client/rx/FlowableWithSingleItem.java +++ b/src/test/java/com/hivemq/client/rx/FlowableWithSingleItem.java @@ -17,8 +17,9 @@ package com.hivemq.client.rx; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.Flowable; -import io.reactivex.FlowableSubscriber; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableSubscriber; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/test/java/com/hivemq/client/rx/FlowableWithSingleSplit.java b/src/test/java/com/hivemq/client/rx/FlowableWithSingleSplit.java index f13683c92..33fac68c7 100644 --- a/src/test/java/com/hivemq/client/rx/FlowableWithSingleSplit.java +++ b/src/test/java/com/hivemq/client/rx/FlowableWithSingleSplit.java @@ -17,7 +17,8 @@ package com.hivemq.client.rx; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.Flowable; + +import io.reactivex.rxjava3.core.Flowable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscriber; diff --git a/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java b/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java index 1ea70e94d..83e7425b0 100644 --- a/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java +++ b/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java @@ -18,9 +18,10 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber; -import io.reactivex.BackpressureStrategy; -import io.reactivex.Flowable; -import io.reactivex.schedulers.Schedulers; + +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.ThrowingSupplier; @@ -156,8 +157,8 @@ void observeOnBoth_delayError() throws InterruptedException { .awaitCount(3) .assertValueAt(2, "next2") .await() - .assertError(IllegalArgumentException.class) - .assertErrorMessage("test"); + .assertError(IllegalArgumentException.class); + //.assertErrorMessage("test"); executorService.shutdown(); } @@ -195,8 +196,8 @@ void observeOnBoth_delayError_bufferSize() throws InterruptedException { .awaitCount(1024) .assertValueCount(1024) .await() - .assertError(IllegalArgumentException.class) - .assertErrorMessage("test"); + .assertError(IllegalArgumentException.class); + //.assertErrorMessage("test"); executorService.shutdown(); } From 277598831fb135785ac86a76c6b35035de3bc808 Mon Sep 17 00:00:00 2001 From: UserwithnoUsername Date: Sat, 12 Apr 2025 01:56:12 +0200 Subject: [PATCH 2/3] Refine error assertion in FlowableWithSingleTest Updated test cases to validate the exception message using a predicate, ensuring the error messages are also checked alongside the exception type. This enhances the robustness of the test validations. --- .../com/hivemq/client/rx/FlowableWithSingleTest.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java b/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java index 83e7425b0..5c5ff1117 100644 --- a/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java +++ b/src/test/java/com/hivemq/client/rx/FlowableWithSingleTest.java @@ -157,8 +157,9 @@ void observeOnBoth_delayError() throws InterruptedException { .awaitCount(3) .assertValueAt(2, "next2") .await() - .assertError(IllegalArgumentException.class); - //.assertErrorMessage("test"); + .assertError(IllegalArgumentException.class) + .assertError(throwable -> throwable.getMessage().equals("test")); + executorService.shutdown(); } @@ -196,8 +197,9 @@ void observeOnBoth_delayError_bufferSize() throws InterruptedException { .awaitCount(1024) .assertValueCount(1024) .await() - .assertError(IllegalArgumentException.class); - //.assertErrorMessage("test"); + .assertError(IllegalArgumentException.class) + .assertError(throwable -> throwable.getMessage().equals("test")); + executorService.shutdown(); } From b19341671c2f9fcef071a9ab301f3ea5d2d14522 Mon Sep 17 00:00:00 2001 From: UserwithnoUsername Date: Sat, 12 Apr 2025 02:17:30 +0200 Subject: [PATCH 3/3] Refactor build script to update task dependency and apply plugin. Moved `dependsOn(integrationTest)` into a block for better readability. Added the application of the `japicc.gradle.kts` script for enhanced functionality in the build process. --- build.gradle.kts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index b1519e985..e083010f2 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -171,7 +171,9 @@ val integrationTest by tasks.registering(Test::class) { }) } -tasks.check { dependsOn(integrationTest) } +tasks.check { + + dependsOn(integrationTest) } /* ******************** jars ******************** */ @@ -346,7 +348,7 @@ allprojects { } - +apply("$rootDir/gradle/japicc.gradle.kts") /* ******************** build cache ******************** */