Skip to content

Commit 5c02176

Browse files
tlopesPTDonnerbart
authored andcommitted
Add configuration to resubscribe if session present
1 parent 3a26e48 commit 5c02176

File tree

8 files changed

+74
-1
lines changed

8 files changed

+74
-1
lines changed

src/main/java/com/hivemq/client/internal/mqtt/MqttClientConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class MqttClientConfig implements Mqtt5ClientConfig {
7070
private @Nullable SslContext currentSslContext;
7171
private boolean resubscribeIfSessionExpired;
7272
private boolean republishIfSessionExpired;
73+
private boolean resubscribeIfSessionPresent;
7374

7475
public MqttClientConfig(
7576
final @NotNull MqttVersion mqttVersion,
@@ -263,6 +264,14 @@ public void setResubscribeIfSessionExpired(final boolean resubscribeIfSessionExp
263264
this.resubscribeIfSessionExpired = resubscribeIfSessionExpired;
264265
}
265266

267+
public boolean isResubscribeIfSessionPresent() {
268+
return resubscribeIfSessionPresent;
269+
}
270+
271+
public void setResubscribeIfSessionPresent(final boolean resubscribeIfSessionPresent) {
272+
this.resubscribeIfSessionPresent = resubscribeIfSessionPresent;
273+
}
274+
266275
public boolean isRepublishIfSessionExpired() {
267276
return republishIfSessionExpired;
268277
}

src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ private static void reconnect(
175175
}, reconnector.getDelay(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
176176
clientConfig.setResubscribeIfSessionExpired(reconnector.isResubscribeIfSessionExpired());
177177
clientConfig.setRepublishIfSessionExpired(reconnector.isRepublishIfSessionExpired());
178+
clientConfig.setResubscribeIfSessionPresent(reconnector.isResubscribeIfSessionPresent());
178179
reconnector.afterOnDisconnected();
179180
} else {
180181
clientConfig.getRawState().set(DISCONNECTED);

src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubscriptionHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void onSessionStartOrResume(
100100

101101
subscriptionIdentifiersAvailable = connectionConfig.areSubscriptionIdentifiersAvailable();
102102

103-
if (!hasSession) {
103+
if (!hasSession || clientConfig.isResubscribeIfSessionPresent()) {
104104
incomingPublishFlows.getSubscriptions().forEach((subscriptionIdentifier, subscriptions) -> {
105105
final MqttSubscribe subscribe = new MqttSubscribe(ImmutableList.copyOf(subscriptions),
106106
MqttUserPropertiesImpl.NO_USER_PROPERTIES);

src/main/java/com/hivemq/client/internal/mqtt/lifecycle/MqttClientReconnector.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public class MqttClientReconnector implements Mqtt5ClientReconnector {
4444
private @Nullable CompletableFuture<?> future;
4545
private boolean resubscribeIfSessionExpired = DEFAULT_RESUBSCRIBE_IF_SESSION_EXPIRED;
4646
private boolean republishIfSessionExpired = DEFAULT_REPUBLISH_IF_SESSION_EXPIRED;
47+
private boolean resubscribeIfSessionPresent = DEFAULT_RESUBSCRIBE_IF_SESSION_PRESENT;
4748
private long delayNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_DELAY_MS);
4849
private @NotNull MqttClientTransportConfigImpl transportConfig;
4950
private @NotNull MqttConnect connect;
@@ -126,6 +127,19 @@ public boolean isRepublishIfSessionExpired() {
126127
return republishIfSessionExpired;
127128
}
128129

130+
@Override
131+
public boolean isResubscribeIfSessionPresent() {
132+
checkInEventLoop();
133+
return resubscribeIfSessionPresent;
134+
}
135+
136+
@Override
137+
public @NotNull MqttClientReconnector resubscribeIfSessionPresent(final boolean resubscribeIfSessionPresent) {
138+
checkInOnDisconnected("resubscribeIfSessionPresent");
139+
this.resubscribeIfSessionPresent = resubscribeIfSessionPresent;
140+
return this;
141+
}
142+
129143
@Override
130144
public @NotNull MqttClientReconnector delay(final long delay, final @Nullable TimeUnit timeUnit) {
131145
checkInOnDisconnected("delay");

src/main/java/com/hivemq/client/internal/mqtt/lifecycle/mqtt3/Mqtt3ClientReconnectorView.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,17 @@ public boolean isRepublishIfSessionExpired() {
8585
return delegate.isRepublishIfSessionExpired();
8686
}
8787

88+
@Override
89+
public @NotNull Mqtt3ClientReconnectorView resubscribeIfSessionPresent(final boolean resubscribeIfSessionPresent) {
90+
delegate.resubscribeIfSessionPresent(resubscribeIfSessionPresent);
91+
return this;
92+
}
93+
94+
@Override
95+
public boolean isResubscribeIfSessionPresent() {
96+
return delegate.isResubscribeIfSessionPresent();
97+
}
98+
8899
@Override
89100
public int getAttempts() {
90101
return delegate.getAttempts();

src/main/java/com/hivemq/client/mqtt/lifecycle/MqttClientReconnector.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ public interface MqttClientReconnector {
6060
* @since 1.2
6161
*/
6262
boolean DEFAULT_REPUBLISH_IF_SESSION_EXPIRED = false;
63+
/**
64+
* If resubscribe when the session is present when the client reconnected successfully is enabled by default.
65+
*
66+
* @since 1.3.7
67+
*/
68+
boolean DEFAULT_RESUBSCRIBE_IF_SESSION_PRESENT = false;
6369
/**
6470
* Default delay in milliseconds the client will wait for before trying to reconnect.
6571
*
@@ -156,6 +162,32 @@ public interface MqttClientReconnector {
156162
*/
157163
boolean isRepublishIfSessionExpired();
158164

165+
/**
166+
* Instructs the client to automatically restore its subscriptions when reconnected successfully and the
167+
* session is still present.
168+
* <p>
169+
* When the client reconnected successfully and its session is still present, the server still knows its
170+
* subscriptions, so resubscribing is optional.
171+
* <p>
172+
* This setting only has effect if the client will reconnect (at least one of the methods {@link
173+
* #reconnect(boolean)} or {@link #reconnectWhen(CompletableFuture, BiConsumer)} is called).
174+
* <p>
175+
* This method must only be called in {@link MqttClientDisconnectedListener#onDisconnected(MqttClientDisconnectedContext)}
176+
* and not in the callback supplied to {@link #reconnectWhen(CompletableFuture, BiConsumer)}.
177+
*
178+
* @param resubscribeIfSessionPresent whether to resubscribe if the session is present when the client reconnected successfully.
179+
* @return this reconnector.
180+
* @throws UnsupportedOperationException if called outside of {@link MqttClientDisconnectedListener#onDisconnected(MqttClientDisconnectedContext)}.
181+
* @since 1.3.7
182+
*/
183+
@NotNull MqttClientReconnector resubscribeIfSessionPresent(boolean resubscribeIfSessionPresent);
184+
185+
/**
186+
* @return whether the client will resubscribe if the session is present when it reconnects successfully.
187+
* @since 1.3.7
188+
*/
189+
boolean isResubscribeIfSessionPresent();
190+
159191
/**
160192
* Sets a delay the client will wait for before trying to reconnect.
161193
* <p>

src/main/java/com/hivemq/client/mqtt/mqtt3/lifecycle/Mqtt3ClientReconnector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public interface Mqtt3ClientReconnector extends MqttClientReconnector {
5353
@Override
5454
@NotNull Mqtt3ClientReconnector republishIfSessionExpired(boolean republish);
5555

56+
@Override
57+
@NotNull Mqtt3ClientReconnector resubscribeIfSessionPresent(boolean resubscribeIfSessionPresent);
58+
5659
@Override
5760
@NotNull Mqtt3ClientReconnector delay(long delay, @NotNull TimeUnit timeUnit);
5861

src/main/java/com/hivemq/client/mqtt/mqtt5/lifecycle/Mqtt5ClientReconnector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public interface Mqtt5ClientReconnector extends MqttClientReconnector {
5353
@Override
5454
@NotNull Mqtt5ClientReconnector republishIfSessionExpired(boolean republish);
5555

56+
@Override
57+
@NotNull Mqtt5ClientReconnector resubscribeIfSessionPresent(boolean resubscribeIfSessionPresent);
58+
5659
@Override
5760
@NotNull Mqtt5ClientReconnector delay(long delay, @NotNull TimeUnit timeUnit);
5861

0 commit comments

Comments
 (0)