Skip to content

Commit 8900618

Browse files
authored
ZOOKEEPER-4508: Expire session in client side to avoid endless connection loss
Reviewers: anmolnar Author: kezhuw Closes #2058 from kezhuw/ZOOKEEPER-4508-client-side-session-expiration
1 parent 95efcc9 commit 8900618

File tree

5 files changed

+156
-21
lines changed

5 files changed

+156
-21
lines changed

zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ static class AuthData {
165165

166166
private int readTimeout;
167167

168+
private int expirationTimeout;
169+
168170
private final int sessionTimeout;
169171

170172
private final ZKWatchManager watchManager;
@@ -411,6 +413,7 @@ public ClientCnxn(
411413

412414
this.connectTimeout = sessionTimeout / hostProvider.size();
413415
this.readTimeout = sessionTimeout * 2 / 3;
416+
this.expirationTimeout = sessionTimeout * 4 / 3;
414417

415418
this.sendThread = new SendThread(clientCnxnSocket);
416419
this.eventThread = new EventThread();
@@ -803,6 +806,12 @@ public String toString() {
803806

804807
}
805808

809+
private static class ConnectionTimeoutException extends IOException {
810+
public ConnectionTimeoutException(String message) {
811+
super(message);
812+
}
813+
}
814+
806815
private static class SessionTimeoutException extends IOException {
807816

808817
private static final long serialVersionUID = 824482094072071178L;
@@ -1143,7 +1152,7 @@ public void run() {
11431152
startConnect(serverAddress);
11441153
// Update now to start the connection timer right after we make a connection attempt
11451154
clientCnxnSocket.updateNow();
1146-
clientCnxnSocket.updateLastSendAndHeard();
1155+
clientCnxnSocket.updateLastSend();
11471156
}
11481157

11491158
if (state.isConnected()) {
@@ -1181,16 +1190,24 @@ public void run() {
11811190
}
11821191
to = readTimeout - clientCnxnSocket.getIdleRecv();
11831192
} else {
1184-
to = connectTimeout - clientCnxnSocket.getIdleRecv();
1193+
to = connectTimeout - clientCnxnSocket.getIdleSend();
11851194
}
11861195

1187-
if (to <= 0) {
1196+
int expiration = expirationTimeout - clientCnxnSocket.getIdleRecv();
1197+
if (expiration <= 0) {
11881198
String warnInfo = String.format(
11891199
"Client session timed out, have not heard from server in %dms for session id 0x%s",
11901200
clientCnxnSocket.getIdleRecv(),
11911201
Long.toHexString(sessionId));
11921202
LOG.warn(warnInfo);
1203+
changeZkState(States.CLOSED);
11931204
throw new SessionTimeoutException(warnInfo);
1205+
} else if (to <= 0) {
1206+
String warnInfo = String.format(
1207+
"Client connection timed out, have not heard from server in %dms for session id 0x%s",
1208+
clientCnxnSocket.getIdleRecv(),
1209+
Long.toHexString(sessionId));
1210+
throw new ConnectionTimeoutException(warnInfo);
11941211
}
11951212
if (state.isConnected()) {
11961213
//1000(1 second) is to prevent race condition missing to send the second ping
@@ -1235,7 +1252,7 @@ public void run() {
12351252
} else {
12361253
LOG.warn(
12371254
"Session 0x{} for server {}, Closing socket connection. "
1238-
+ "Attempting reconnect except it is a SessionExpiredException.",
1255+
+ "Attempting reconnect except it is a SessionExpiredException or SessionTimeoutException.",
12391256
Long.toHexString(getSessionId()),
12401257
serverAddress,
12411258
e);
@@ -1256,7 +1273,12 @@ public void run() {
12561273
if (state.isAlive()) {
12571274
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
12581275
}
1259-
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
1276+
if (closing) {
1277+
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, KeeperState.Closed, null));
1278+
} else if (state == States.CLOSED) {
1279+
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null));
1280+
}
1281+
eventThread.queueEventOfDeath();
12601282

12611283
Login l = loginRef.getAndSet(null);
12621284
if (l != null) {
@@ -1274,7 +1296,6 @@ private void cleanAndNotifyState() {
12741296
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
12751297
}
12761298
clientCnxnSocket.updateNow();
1277-
clientCnxnSocket.updateLastSendAndHeard();
12781299
}
12791300

12801301
private void pingRwServer() throws RWServerFoundException {
@@ -1374,6 +1395,7 @@ void onConnected(
13741395
}
13751396

13761397
readTimeout = negotiatedSessionTimeout * 2 / 3;
1398+
expirationTimeout = negotiatedSessionTimeout * 4 / 3;
13771399
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
13781400
hostProvider.onConnected();
13791401
sessionId = _sessionId;

zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ abstract class ClientCnxnSocket {
6565
protected ByteBuffer incomingBuffer = lenBuffer;
6666
protected final AtomicLong sentCount = new AtomicLong(0L);
6767
protected final AtomicLong recvCount = new AtomicLong(0L);
68+
// Used for reactive timeout detection, say connection read timeout and session expiration timeout.
6869
protected long lastHeard;
70+
// Used for proactive timeout detection, say ping timeout and connection establishment timeout.
6971
protected long lastSend;
7072
protected long now;
7173
protected ClientCnxn.SendThread sendThread;

zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,7 @@ private void testPortChangeToBlockedPort(boolean testLeader) throws Exception {
804804
Thread.sleep(1000);
805805
zkArr[serverIndex].setData("/test", "teststr".getBytes(), -1);
806806
fail("New client connected to new client port!");
807-
} catch (KeeperException.ConnectionLossException e) {
807+
} catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e) {
808808
// Exception is expected
809809
}
810810

zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.junit.jupiter.api.Assertions.fail;
2626
import java.io.File;
2727
import java.io.IOException;
28+
import java.util.Arrays;
2829
import java.util.LinkedList;
2930
import java.util.List;
3031
import java.util.concurrent.CountDownLatch;
@@ -90,10 +91,18 @@ public void tearDown() throws Exception {
9091
private static class CountdownWatcher implements Watcher {
9192

9293
volatile CountDownLatch clientConnected = new CountDownLatch(1);
94+
final CountDownLatch sessionTerminated = new CountDownLatch(1);
9395

9496
public void process(WatchedEvent event) {
95-
if (event.getState() == KeeperState.SyncConnected) {
96-
clientConnected.countDown();
97+
switch (event.getState()) {
98+
case SyncConnected:
99+
clientConnected.countDown();
100+
break;
101+
case AuthFailed:
102+
case Expired:
103+
case Closed:
104+
sessionTerminated.countDown();
105+
break;
97106
}
98107
}
99108

@@ -274,17 +283,15 @@ public void testSessionStateNoDupStateReporting() throws IOException, Interrupte
274283
// shutdown the server
275284
serverFactory.shutdown();
276285

277-
try {
278-
Thread.sleep(10000);
279-
} catch (InterruptedException e) {
280-
// ignore
281-
}
286+
watcher.sessionTerminated.await();
282287

283-
// verify that the size is just 2 - ie connect then disconnect
284-
// if the client attempts reconnect and we are not handling current
285-
// state correctly (ie eventing on duplicate disconnects) then we'll
286-
// see a disconnect for each failed connection attempt
287-
assertEquals(2, watcher.states.size());
288+
// verify that there is no duplicated disconnected event.
289+
List<KeeperState> states = Arrays.asList(
290+
KeeperState.SyncConnected,
291+
KeeperState.Disconnected,
292+
KeeperState.Expired
293+
);
294+
assertEquals(states, watcher.states);
288295

289296
zk.close();
290297
}
@@ -319,11 +326,11 @@ public void testSessionTimeoutAccess() throws Exception {
319326

320327
private class DupWatcher extends CountdownWatcher {
321328

322-
public List<WatchedEvent> states = new LinkedList<>();
329+
public List<KeeperState> states = new LinkedList<>();
323330
public void process(WatchedEvent event) {
324331
super.process(event);
325332
if (event.getType() == EventType.None) {
326-
states.add(event);
333+
states.add(event.getState());
327334
}
328335
}
329336

zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,15 @@
2020

2121
import static org.junit.jupiter.api.Assertions.assertNotNull;
2222
import static org.junit.jupiter.api.Assertions.assertNull;
23+
import static org.junit.jupiter.api.Assertions.assertThrows;
2324
import static org.junit.jupiter.api.Assertions.assertTrue;
2425
import static org.junit.jupiter.api.Assertions.fail;
2526
import java.io.IOException;
27+
import java.net.ServerSocket;
28+
import java.net.Socket;
29+
import java.util.Arrays;
30+
import java.util.List;
31+
import java.util.concurrent.CompletableFuture;
2632
import java.util.concurrent.CountDownLatch;
2733
import java.util.concurrent.TimeUnit;
2834
import org.apache.zookeeper.CreateMode;
@@ -48,6 +54,30 @@ public void setUp() throws Exception {
4854
zk = createClient();
4955
}
5056

57+
private static class BusyServer implements AutoCloseable {
58+
private final ServerSocket server;
59+
private final Socket client;
60+
61+
public BusyServer() throws IOException {
62+
this.server = new ServerSocket(0, 1);
63+
this.client = new Socket("127.0.0.1", server.getLocalPort());
64+
}
65+
66+
public int getLocalPort() {
67+
return server.getLocalPort();
68+
}
69+
70+
public String getHostPort() {
71+
return String.format("127.0.0.1:%d", getLocalPort());
72+
}
73+
74+
@Override
75+
public void close() throws Exception {
76+
client.close();
77+
server.close();
78+
}
79+
}
80+
5181
@Test
5282
public void testSessionExpiration() throws InterruptedException, KeeperException {
5383
final CountDownLatch expirationLatch = new CountDownLatch(1);
@@ -72,6 +102,80 @@ public void testSessionExpiration() throws InterruptedException, KeeperException
72102
assertTrue(gotException);
73103
}
74104

105+
@Test
106+
public void testSessionRecoveredAfterMultipleFailedAttempts() throws Exception {
107+
// stop client also to gain less distraction
108+
zk.close();
109+
110+
try (BusyServer busyServer = new BusyServer()) {
111+
List<String> servers = Arrays.asList(
112+
busyServer.getHostPort(),
113+
busyServer.getHostPort(),
114+
hostPort,
115+
busyServer.getHostPort(),
116+
busyServer.getHostPort(),
117+
busyServer.getHostPort()
118+
);
119+
String connectString = String.join(",", servers);
120+
121+
zk = createClient(new CountdownWatcher(), connectString);
122+
stopServer();
123+
124+
// Wait beyond connectTimeout but not sessionTimeout.
125+
Thread.sleep(zk.getSessionTimeout() / 2);
126+
127+
CompletableFuture<Void> connected = new CompletableFuture<>();
128+
zk.register(event -> {
129+
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
130+
connected.complete(null);
131+
} else {
132+
connected.completeExceptionally(new KeeperException.SessionExpiredException());
133+
}
134+
});
135+
136+
startServer();
137+
connected.join();
138+
}
139+
}
140+
141+
@Test
142+
public void testSessionExpirationAfterAllServerDown() throws Exception {
143+
// stop client also to gain less distraction
144+
zk.close();
145+
146+
// small connection timeout to gain quick ci feedback
147+
int sessionTimeout = 3000;
148+
CompletableFuture<Void> expired = new CompletableFuture<>();
149+
zk = createClient(new CountdownWatcher(), hostPort, sessionTimeout);
150+
zk.register(event -> {
151+
if (event.getState() == Watcher.Event.KeeperState.Expired) {
152+
expired.complete(null);
153+
}
154+
});
155+
stopServer();
156+
expired.join();
157+
assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
158+
}
159+
160+
@Test
161+
public void testSessionExpirationWhenNoServerUp() throws Exception {
162+
// stop client also to gain less distraction
163+
zk.close();
164+
165+
stopServer();
166+
167+
// small connection timeout to gain quick ci feedback
168+
int sessionTimeout = 3000;
169+
CompletableFuture<Void> expired = new CompletableFuture<>();
170+
new TestableZooKeeper(hostPort, sessionTimeout, event -> {
171+
if (event.getState() == Watcher.Event.KeeperState.Expired) {
172+
expired.complete(null);
173+
}
174+
});
175+
expired.join();
176+
assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
177+
}
178+
75179
@Test
76180
public void testQueueEvent() throws InterruptedException, KeeperException {
77181
final CountDownLatch eventLatch = new CountDownLatch(1);

0 commit comments

Comments
 (0)