Skip to content

Commit 40a07c9

Browse files
lchqlchqanmolnar
authored andcommitted
ZOOKEEPER-4736: Fix nio socket fd leak if network service is down
Reviewers: kezhuw, anmolnar Author: lchqlchq Closes #2047 from lchqlchq/fd (cherry picked from commit e8e141b) Signed-off-by: Andor Molnar <andor@cloudera.com>
1 parent 2d4d4eb commit 40a07c9

File tree

5 files changed

+113
-27
lines changed

5 files changed

+113
-27
lines changed

zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1338,6 +1338,17 @@ public void run() {
13381338
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
13391339
}
13401340

1341+
private void abortConnection() {
1342+
try {
1343+
clientCnxnSocket.testableCloseSocket();
1344+
} catch (IOException e) {
1345+
LOG.debug("Fail to close ongoing socket", e);
1346+
}
1347+
}
1348+
1349+
/**
1350+
* This is not thread-safe and should only be called inside {@link SendThread}.
1351+
*/
13411352
private void cleanAndNotifyState() {
13421353
cleanup();
13431354
if (state.isAlive()) {
@@ -1580,7 +1591,7 @@ public ReplyHeader submitRequest(
15801591
}
15811592
}
15821593
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
1583-
sendThread.cleanAndNotifyState();
1594+
sendThread.abortConnection();
15841595
}
15851596
return r;
15861597
}

zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,12 @@ void cleanup() {
209209
} catch (IOException e) {
210210
LOG.debug("Ignoring exception during channel close", e);
211211
}
212+
try {
213+
selector.wakeup();
214+
selector.selectNow();
215+
} catch (IOException e) {
216+
LOG.debug("Ignoring exception during selecting of cancelled socket", e);
217+
}
212218
}
213219
try {
214220
Thread.sleep(100);

zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,32 @@
1818

1919
package org.apache.zookeeper;
2020

21+
import static org.hamcrest.MatcherAssert.assertThat;
22+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2123
import static org.junit.jupiter.api.Assertions.assertEquals;
2224
import static org.junit.jupiter.api.Assertions.assertFalse;
2325
import static org.junit.jupiter.api.Assertions.assertTrue;
26+
import static org.mockito.Mockito.any;
27+
import static org.mockito.Mockito.doAnswer;
28+
import static org.mockito.Mockito.doThrow;
29+
import static org.mockito.Mockito.spy;
2430
import java.io.IOException;
2531
import java.net.InetSocketAddress;
32+
import java.net.SocketException;
33+
import java.nio.channels.Selector;
34+
import java.nio.channels.SocketChannel;
35+
import java.time.Duration;
2636
import java.util.Queue;
2737
import java.util.concurrent.CountDownLatch;
2838
import java.util.concurrent.Executors;
2939
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.atomic.AtomicReference;
3041
import org.apache.zookeeper.ClientCnxn.Packet;
3142
import org.apache.zookeeper.Watcher.Event.KeeperState;
3243
import org.apache.zookeeper.ZooDefs.Ids;
3344
import org.apache.zookeeper.client.HostProvider;
3445
import org.apache.zookeeper.client.ZKClientConfig;
46+
import org.apache.zookeeper.common.BusyServer;
3547
import org.apache.zookeeper.data.Stat;
3648
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
3749
import org.apache.zookeeper.test.ClientBase;
@@ -75,6 +87,40 @@ private void closeZookeeper(ZooKeeper zk) {
7587
});
7688
}
7789

90+
@Test
91+
public void testSocketClosedAfterFailure() throws Exception {
92+
Duration sessionTimeout = Duration.ofMillis(1000);
93+
final AtomicReference<Selector> nioSelector = new AtomicReference<>();
94+
try (
95+
// given: busy server
96+
BusyServer server = new BusyServer();
97+
ZooKeeper zk = new ZooKeeper(server.getHostPort(), (int) sessionTimeout.toMillis(), null) {
98+
@Override
99+
ClientCnxn createConnection(HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException {
100+
ClientCnxnSocketNIO socket = spy((ClientCnxnSocketNIO) clientCnxnSocket);
101+
102+
doAnswer(mock -> {
103+
SocketChannel spy = spy((SocketChannel) mock.callRealMethod());
104+
// when: connect get exception
105+
//
106+
// this could happen if system's network service is unavailable,
107+
// for examples, "ifdown eth0" or "service network stop" and so on.
108+
doThrow(new SocketException("Network is unreachable")).when(spy).connect(any());
109+
return spy;
110+
}).when(socket).createSock();
111+
112+
nioSelector.set(socket.getSelector());
113+
return super.createConnection(hostProvider, sessionTimeout, clientConfig, defaultWatcher, socket, sessionId, sessionPasswd, canBeReadOnly);
114+
}
115+
}) {
116+
117+
Thread.sleep(sessionTimeout.toMillis() * 5);
118+
119+
// then: sockets of failed connections are closed, so at most one registered socket
120+
assertThat(nioSelector.get().keys().size(), lessThanOrEqualTo(1));
121+
}
122+
}
123+
78124
@Test
79125
public void testClientCnxnSocketFragility() throws Exception {
80126
System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.zookeeper.common;
20+
21+
import java.io.IOException;
22+
import java.net.InetAddress;
23+
import java.net.ServerSocket;
24+
import java.net.Socket;
25+
26+
public class BusyServer implements AutoCloseable {
27+
private final ServerSocket server;
28+
private final Socket client;
29+
30+
public BusyServer() throws IOException {
31+
this.server = new ServerSocket(0, 1, InetAddress.getByName("127.0.0.1"));
32+
this.client = new Socket("127.0.0.1", server.getLocalPort());
33+
}
34+
35+
public int getLocalPort() {
36+
return server.getLocalPort();
37+
}
38+
39+
public String getHostPort() {
40+
return String.format("127.0.0.1:%d", getLocalPort());
41+
}
42+
43+
@Override
44+
public void close() throws Exception {
45+
client.close();
46+
server.close();
47+
}
48+
}

zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import static org.junit.jupiter.api.Assertions.assertTrue;
2828
import static org.junit.jupiter.api.Assertions.fail;
2929
import java.io.IOException;
30-
import java.net.ServerSocket;
31-
import java.net.Socket;
3230
import java.util.Arrays;
3331
import java.util.List;
3432
import java.util.concurrent.CompletableFuture;
@@ -42,6 +40,7 @@
4240
import org.apache.zookeeper.Watcher;
4341
import org.apache.zookeeper.ZooDefs;
4442
import org.apache.zookeeper.ZooKeeper;
43+
import org.apache.zookeeper.common.BusyServer;
4544
import org.apache.zookeeper.common.Time;
4645
import org.junit.jupiter.api.BeforeEach;
4746
import org.junit.jupiter.api.Test;
@@ -75,30 +74,6 @@ public synchronized void process(WatchedEvent event) {
7574
}
7675
}
7776

78-
private static class BusyServer implements AutoCloseable {
79-
private final ServerSocket server;
80-
private final Socket client;
81-
82-
public BusyServer() throws IOException {
83-
this.server = new ServerSocket(0, 1);
84-
this.client = new Socket("127.0.0.1", server.getLocalPort());
85-
}
86-
87-
public int getLocalPort() {
88-
return server.getLocalPort();
89-
}
90-
91-
public String getHostPort() {
92-
return String.format("127.0.0.1:%d", getLocalPort());
93-
}
94-
95-
@Override
96-
public void close() throws Exception {
97-
client.close();
98-
server.close();
99-
}
100-
}
101-
10277
@Test
10378
public void testSessionExpiration() throws InterruptedException, KeeperException {
10479
final CountDownLatch expirationLatch = new CountDownLatch(1);

0 commit comments

Comments
 (0)