3838import com .mongodb .client .vault .ClientEncryption ;
3939import com .mongodb .connection .ClusterConnectionMode ;
4040import com .mongodb .connection .ClusterDescription ;
41+ import com .mongodb .event .ConnectionReadyEvent ;
4142import com .mongodb .event .TestServerMonitorListener ;
4243import com .mongodb .internal .connection .ServerMonitoringModeUtil ;
4344import com .mongodb .internal .connection .TestClusterListener ;
6465import java .util .concurrent .Executors ;
6566import java .util .concurrent .Future ;
6667import java .util .concurrent .TimeUnit ;
68+ import java .util .concurrent .TimeoutException ;
6769import java .util .function .BiFunction ;
6870import java .util .function .Function ;
6971import java .util .stream .Collectors ;
8284import static com .mongodb .client .unified .UnifiedCrudHelper .asReadPreference ;
8385import static com .mongodb .client .unified .UnifiedCrudHelper .asWriteConcern ;
8486import static com .mongodb .internal .connection .AbstractConnectionPoolTest .waitForPoolAsyncWorkManagerStart ;
87+ import static java .lang .String .format ;
8588import static java .lang .System .getenv ;
8689import static java .util .Arrays .asList ;
8790import static org .junit .Assume .assumeTrue ;
@@ -90,7 +93,8 @@ public final class Entities {
9093 private static final Set <String > SUPPORTED_CLIENT_ENTITY_OPTIONS = new HashSet <>(
9194 asList (
9295 "id" , "autoEncryptOpts" , "uriOptions" , "serverApi" , "useMultipleMongoses" , "storeEventsAsEntities" ,
93- "observeEvents" , "observeLogMessages" , "observeSensitiveCommands" , "ignoreCommandMonitoringEvents" ));
96+ "observeEvents" , "observeLogMessages" , "observeSensitiveCommands" , "ignoreCommandMonitoringEvents" ,
97+ "awaitMinPoolSizeMS" ));
9498 private final Set <String > entityNames = new HashSet <>();
9599 private final Map <String , ExecutorService > threads = new HashMap <>();
96100 private final Map <String , ArrayList <Future <?>>> tasks = new HashMap <>();
@@ -306,6 +310,7 @@ private void initClient(final BsonDocument entity, final String id,
306310 throw new UnsupportedOperationException ("Client entity contains unsupported options: " + entity .keySet ()
307311 + ". Supported options are " + SUPPORTED_CLIENT_ENTITY_OPTIONS );
308312 }
313+ boolean waitForMinPoolSizeToPopulate = isWaitForMinPoolSizeToPopulate (entity );
309314 MongoClientSettings .Builder clientSettingsBuilder ;
310315 if (entity .getBoolean ("useMultipleMongoses" , BsonBoolean .FALSE ).getValue () && (isSharded () || isLoadBalanced ())) {
311316 assumeTrue ("Multiple mongos connection string not available for sharded cluster" ,
@@ -331,6 +336,9 @@ private void initClient(final BsonDocument entity, final String id,
331336 if (entity .containsKey ("observeEvents" )) {
332337 List <String > observeEvents = entity .getArray ("observeEvents" ).stream ()
333338 .map (type -> type .asString ().getValue ()).collect (Collectors .toList ());
339+ if (waitForMinPoolSizeToPopulate ) {
340+ observeEvents .add ("connectionReadyEvent" );
341+ }
334342 List <String > ignoreCommandMonitoringEvents = entity
335343 .getArray ("ignoreCommandMonitoringEvents" , new BsonArray ()).stream ()
336344 .map (type -> type .asString ().getValue ()).collect (Collectors .toList ());
@@ -341,7 +349,6 @@ private void initClient(final BsonDocument entity, final String id,
341349 null );
342350 clientSettingsBuilder .addCommandListener (testCommandListener );
343351 putEntity (id + "-command-listener" , testCommandListener , clientCommandListeners );
344-
345352 TestConnectionPoolListener testConnectionPoolListener = new TestConnectionPoolListener (observeEvents );
346353 clientSettingsBuilder .applyToConnectionPoolSettings (builder ->
347354 builder .addConnectionPoolListener (testConnectionPoolListener ));
@@ -583,6 +590,35 @@ private void initClient(final BsonDocument entity, final String id,
583590 if (waitForPoolAsyncWorkManagerStart ) {
584591 waitForPoolAsyncWorkManagerStart ();
585592 }
593+ if (waitForMinPoolSizeToPopulate ) {
594+ waitForMinPoolSizeToPopulate (entity , id , clientSettings );
595+ }
596+ }
597+
598+ private void waitForMinPoolSizeToPopulate (final BsonDocument entity , final String id , final MongoClientSettings clientSettings ) {
599+ int minSize = clientSettings .getConnectionPoolSettings ().getMinSize ();
600+ int awaitMinPoolSizeMS = entity .getInt32 ("awaitMinPoolSizeMS" ).getValue ();
601+ TestConnectionPoolListener testConnectionPoolListener = getConnectionPoolListener (id );
602+ try {
603+ /*
604+ From the spec:
605+ Any CMAP and SDAM event/log listeners configured on the client SHOULD ignore any events that occur before the pool is populated.
606+
607+ Currently, no flaky or racy behavior is caused by not clearing pre-pool-population CMAP and SDAM events.
608+ If any race behavior is observed, consider clearing events.
609+ */
610+ testConnectionPoolListener .waitForEvent (ConnectionReadyEvent .class , minSize , awaitMinPoolSizeMS , TimeUnit .MILLISECONDS );
611+ } catch (TimeoutException | InterruptedException e ) {
612+ throw new RuntimeException (format ("Error waiting for awaitMinPoolSizeMS [%s] to establish minPoolSize [%s] connections" ,
613+ awaitMinPoolSizeMS , minSize ));
614+ }
615+ }
616+
617+ private static boolean isWaitForMinPoolSizeToPopulate (final BsonDocument clientEntity ) {
618+ int minPoolSize = clientEntity .getDocument ("uriOptions" , new BsonDocument ())
619+ .get ("minPoolSize" , new BsonInt32 (0 ))
620+ .asInt32 ().getValue ();
621+ return minPoolSize != 0 && clientEntity .containsKey ("awaitMinPoolSizeMS" );
586622 }
587623
588624 private static LogMessage .Component toComponent (final Map .Entry <String , BsonValue > entry ) {
0 commit comments