1818
1919import java .util .Map ;
2020import java .util .concurrent .ConcurrentHashMap ;
21+ import java .util .concurrent .Executors ;
2122import java .util .concurrent .ScheduledExecutorService ;
2223import java .util .concurrent .ScheduledFuture ;
24+ import java .util .concurrent .ThreadFactory ;
2325import java .util .concurrent .TimeUnit ;
2426
2527import static com .mongodb .ServerConnectionState .Connecting ;
2931import static org .bson .util .Assertions .notNull ;
3032
3133class DefaultServer implements ClusterableServer {
34+
35+ private final String clusterId ;
36+
37+ private enum HeartbeatFrequency {
38+ NORMAL {
39+ @ Override
40+ long getFrequencyMS (final ServerSettings settings ) {
41+ return settings .getHeartbeatFrequency (MILLISECONDS );
42+ }
43+ },
44+
45+ RETRY {
46+ @ Override
47+ long getFrequencyMS (final ServerSettings settings ) {
48+ return settings .getHeartbeatConnectRetryFrequency (MILLISECONDS );
49+ }
50+ };
51+
52+ abstract long getFrequencyMS (final ServerSettings settings );
53+ }
54+
3255 private final ScheduledExecutorService scheduledExecutorService ;
3356 private final ServerAddress serverAddress ;
3457 private final ServerStateNotifier stateNotifier ;
35- private final ScheduledFuture <?> scheduledFuture ;
3658 private final PooledConnectionProvider connectionProvider ;
3759 private final Map <ChangeListener <ServerDescription >, Boolean > changeListeners =
3860 new ConcurrentHashMap <ChangeListener <ServerDescription >, Boolean >();
@@ -41,22 +63,22 @@ class DefaultServer implements ClusterableServer {
4163 private volatile ServerDescription description ;
4264 private volatile boolean isClosed ;
4365
66+ private ScheduledFuture <?> scheduledFuture ;
67+ private HeartbeatFrequency currentFrequency ;
68+
4469 public DefaultServer (final ServerAddress serverAddress ,
4570 final ServerSettings settings ,
46- final PooledConnectionProvider connectionProvider ,
47- final ScheduledExecutorService scheduledExecutorService ,
48- Mongo mongo ) {
71+ final String clusterId , final PooledConnectionProvider connectionProvider ,
72+ final Mongo mongo ) {
73+ this . clusterId = notNull ( "clusterId" , clusterId );
4974 this .settings = notNull ("settings" , settings );
50-
51- this .scheduledExecutorService = notNull ("scheduledExecutorService" , scheduledExecutorService );
5275 this .serverAddress = notNull ("serverAddress" , serverAddress );
5376 this .description = ServerDescription .builder ().state (Connecting ).address (serverAddress ).build ();
5477 serverStateListener = new DefaultServerStateListener ();
5578 this .stateNotifier = new ServerStateNotifier (serverAddress , serverStateListener ,
5679 settings .getHeartbeatSocketSettings (), mongo );
57- this .scheduledFuture = scheduledExecutorService .scheduleAtFixedRate (stateNotifier , 0 ,
58- settings .getHeartbeatFrequency (MILLISECONDS ),
59- MILLISECONDS );
80+ this .scheduledExecutorService = Executors .newSingleThreadScheduledExecutor (new DefaultThreadFactory ());
81+ setHeartbeat (0 , HeartbeatFrequency .NORMAL );
6082 this .connectionProvider = connectionProvider ;
6183 }
6284
@@ -87,14 +109,15 @@ public void invalidate() {
87109 serverStateListener .stateChanged (new ChangeEvent <ServerDescription >(description , ServerDescription .builder ()
88110 .state (Connecting )
89111 .address (serverAddress ).build ()));
90- scheduledExecutorService . submit ( stateNotifier );
112+ setHeartbeat ( 0 , HeartbeatFrequency . RETRY );
91113 connectionProvider .invalidate ();
92114 }
93115
94116 @ Override
95117 public void close () {
96118 if (!isClosed ()) {
97119 scheduledFuture .cancel (true );
120+ scheduledExecutorService .shutdownNow ();
98121 stateNotifier .close ();
99122 connectionProvider .close ();
100123 isClosed = true ;
@@ -106,18 +129,44 @@ public boolean isClosed() {
106129 return isClosed ;
107130 }
108131
132+ private void setHeartbeat (final ChangeEvent <ServerDescription > event ) {
133+ HeartbeatFrequency heartbeatFrequency = event .getNewValue ().getState () == Unconnected
134+ ? HeartbeatFrequency .RETRY
135+ : HeartbeatFrequency .NORMAL ;
136+ long initialDelay = heartbeatFrequency .getFrequencyMS (settings );
137+ setHeartbeat (initialDelay , heartbeatFrequency );
138+ }
139+
140+ private synchronized void setHeartbeat (final long initialDelay , final HeartbeatFrequency newFrequency ) {
141+ if (currentFrequency != newFrequency ) {
142+ currentFrequency = newFrequency ;
143+ if (scheduledFuture != null ) {
144+ scheduledFuture .cancel (false );
145+ }
146+ scheduledFuture = scheduledExecutorService .scheduleAtFixedRate (stateNotifier , initialDelay ,
147+ newFrequency .getFrequencyMS (settings ),
148+ MILLISECONDS );
149+ }
150+ }
151+
152+ // Custom thread factory for scheduled executor service that creates daemon threads. Otherwise,
153+ // applications that neglect to close the MongoClient will not exit.
154+ class DefaultThreadFactory implements ThreadFactory {
155+ public Thread newThread (final Runnable runnable ) {
156+ Thread t = new Thread (runnable , "cluster-" + clusterId + "-" + serverAddress );
157+ t .setDaemon (true );
158+ return t ;
159+ }
160+ }
161+
109162 private final class DefaultServerStateListener implements ChangeListener <ServerDescription > {
110163 @ Override
111164 public void stateChanged (final ChangeEvent <ServerDescription > event ) {
112165 description = event .getNewValue ();
113166 for (ChangeListener <ServerDescription > listener : changeListeners .keySet ()) {
114167 listener .stateChanged (event );
115168 }
116- if (event .getNewValue ().getState () == Unconnected ) {
117- scheduledExecutorService .schedule (stateNotifier , settings .getHeartbeatConnectRetryFrequency (MILLISECONDS ),
118- MILLISECONDS );
119- }
169+ setHeartbeat (event );
120170 }
121-
122171 }
123172}
0 commit comments