@@ -13,6 +13,7 @@ import (
1313 "os"
1414 "reflect"
1515 "strings"
16+ "sync"
1617 "testing"
1718 "time"
1819
@@ -675,9 +676,9 @@ func TestClient(t *testing.T) {
675676 },
676677 }
677678
679+ _, err := mt.Coll.InsertOne(context.Background(), bson.D{})
678680 for _, tc := range testCases {
679681 mt.Run(tc.desc, func(mt *mtest.T) {
680- _, err := mt.Coll.InsertOne(context.Background(), bson.D{})
681682 require.NoError(mt, err)
682683
683684 mt.SetFailPoint(failpoint.FailPoint{
@@ -692,30 +693,47 @@ func TestClient(t *testing.T) {
692693
693694 mt.ClearEvents()
694695
696+ wg := sync.WaitGroup{}
697+ wg.Add(50)
698+
695699 for i := 0; i < 50; i++ {
696- // Run 50 operations, each with a timeout of 50ms. Expect
700+ // Run 50 concurrent operations, each with a timeout of 50ms. Expect
697701 // them to all return a timeout error because the failpoint
698- // blocks find operations for 500ms . Run 50 to increase the
702+ // blocks find operations for 50ms . Run 50 to increase the
699703 // probability that an operation will time out in a way that
700704 // can cause a retry.
701- ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
702- err = tc.operation(ctx, mt.Coll)
703- cancel()
704- assert.ErrorIs(mt, err, context.DeadlineExceeded)
705- assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true")
706-
707- // Assert that each operation reported exactly one command
708- // started events, which means the operation did not retry
709- // after the context timeout.
710- evts := mt.GetAllStartedEvents()
711- require.Len(mt,
712- mt.GetAllStartedEvents(),
713- 1,
714- "expected exactly 1 command started event per operation, but got %d after %d iterations",
715- len(evts),
716- i)
717- mt.ClearEvents()
705+ go func() {
706+ ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
707+ err := tc.operation(ctx, mt.Coll)
708+ cancel()
709+ assert.ErrorIs(mt, err, context.DeadlineExceeded)
710+ assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true")
711+
712+ wg.Done()
713+ }()
718714 }
715+
716+ wg.Wait()
717+
718+ // Since an operation requires checking out a connection and because we
719+ // attempt a pending read for socket timeouts and since the test forces
720+ // 50 concurrent socket timeouts, then it's possible that an
721+ // operation checks out a connection that has a pending read. In this
722+ // case the operation will time out when checking out a connection, and
723+ // a started event will not be propagated. So instead of
724+ // checking that we got exactly 50 started events, we should instead
725+ // ensure that the number of started events is equal to the number of
726+ // unique connections used to process the operations.
727+ pendingReadConns := mt.NumberConnectionsPendingReadStarted()
728+ evts := mt.GetAllStartedEvents()
729+
730+ require.Equal(mt,
731+ len(evts)+pendingReadConns,
732+ 50,
733+ "expected exactly 1 command started event per operation (50), but got %d",
734+ len(evts)+pendingReadConns)
735+ mt.ClearEvents()
736+ mt.ClearFailPoints()
719737 })
720738 }
721739 })
0 commit comments