@@ -34,6 +34,8 @@ import com.mongodb.client.model.changestream.UpdateDescription
3434import com.mongodb.client.test.CollectionHelper
3535import com.mongodb.connection.AsyncConnection
3636import com.mongodb.connection.Connection
37+ import com.mongodb.connection.ConnectionDescription
38+ import com.mongodb.connection.ServerVersion
3739import com.mongodb.session.SessionContext
3840import org.bson.BsonArray
3941import org.bson.BsonBoolean
@@ -452,46 +454,78 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
452454
453455 def ' should set the startAtOperationTime on the sync cursor' () {
454456 given :
457+ def changeStream
455458 def binding = Stub (ReadBinding ) {
456459 getSessionContext() >> Stub (SessionContext ) {
457460 getReadConcern() >> ReadConcern . DEFAULT
458- getOperationTime() >> new BsonTimestamp (42 , 1 )
461+ getOperationTime() >> new BsonTimestamp ()
459462 }
460463 getReadConnectionSource() >> Stub (ConnectionSource ) {
461464 getConnection() >> Stub (Connection ) {
462- command(* _) >> new BsonDocument (' cursor' , new BsonDocument (' id' , new BsonInt64 (1 ))
463- .append(' ns' , new BsonString (getNamespace(). getFullName()))
464- .append(' firstBatch' , new BsonArrayWrapper ([])))
465+ command(* _) >> {
466+ changeStream = getChangeStream(it[1 ])
467+ new BsonDocument (' cursor' , new BsonDocument (' id' , new BsonInt64 (1 ))
468+ .append(' ns' , new BsonString (getNamespace(). getFullName()))
469+ .append(' firstBatch' , new BsonArrayWrapper ([])))
470+ }
471+ getDescription() >> Stub (ConnectionDescription ) {
472+ getServerVersion() >> new ServerVersion ([4 , 0 , 0 ])
473+ }
465474 }
466475 }
467476 }
468477
469478 when : ' Resume token'
470- def operation = new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
479+ new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
471480 .resumeAfter(new BsonDocument ())
472- operation . execute(binding)
481+ .execute(binding)
473482
474483 then :
475- operation. getStartAtOperationTime() == null
484+ changeStream. containsKey(' resumeAfter' )
485+ ! changeStream. containsKey(' startAtOperationTime' )
476486
477- when : ' No token or time'
478- operation = new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
479- operation. execute(binding)
487+ when : ' Set startAtOperationTime'
488+ def startAtTime = new BsonTimestamp (42 )
489+ new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
490+ .startAtOperationTime(startAtTime)
491+ .execute(binding)
480492
481493 then :
482- operation . getStartAtOperationTime( ) == new BsonTimestamp ( 42 , 1 )
494+ changeStream . getTimestamp( ' startAtOperationTime ' ) == startAtTime
483495
484- when : ' Set time'
485- def startAtTime = new BsonTimestamp (42 )
486- operation = operation. startAtOperationTime(startAtTime)
487- operation. execute(binding)
496+ when : ' set startAtOperationTimeForResume'
497+ def resumeStartAt = new BsonTimestamp (42 , 42 )
498+ new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
499+ .startOperationTimeForResume(resumeStartAt)
500+ .execute(binding)
501+
502+ then :
503+ changeStream. getTimestamp(' startAtOperationTime' ) == resumeStartAt
504+
505+ when : ' set startAtOperationTime && startAtOperationTimeForResume'
506+ new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
507+ .startAtOperationTime(startAtTime)
508+ .startOperationTimeForResume(resumeStartAt)
509+ .execute(binding)
510+
511+ then :
512+ changeStream. getTimestamp(' startAtOperationTime' ) == startAtTime
513+
514+ when : ' set resumeToken && startAtOperationTime && startAtOperationTimeForResume'
515+ new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
516+ .resumeAfter(new BsonDocument ())
517+ .startAtOperationTime(startAtTime)
518+ .startOperationTimeForResume(resumeStartAt)
519+ .execute(binding)
488520
489521 then :
490- operation. getStartAtOperationTime() == startAtTime
522+ changeStream. containsKey(' resumeAfter' )
523+ changeStream. getTimestamp(' startAtOperationTime' ) == startAtTime
491524 }
492525
493526 def ' should set the startAtOperationTime on the async cursor' () {
494527 given :
528+ def changeStream
495529 def binding = Stub (AsyncReadBinding ) {
496530 getSessionContext() >> Stub (SessionContext ) {
497531 getReadConcern() >> ReadConcern . DEFAULT
@@ -502,39 +536,66 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
502536 getConnection(_) >> {
503537 it. last(). onResult(Stub (AsyncConnection ) {
504538 commandAsync(* _) >> {
539+ changeStream = getChangeStream(it[1 ])
505540 it. last(). onResult(new BsonDocument (' cursor' , new BsonDocument (' id' , new BsonInt64 (1 ))
506541 .append(' ns' , new BsonString (getNamespace(). getFullName()))
507542 .append(' firstBatch' , new BsonArrayWrapper ([]))), null )
508543 }
544+ getDescription() >> Stub (ConnectionDescription ) {
545+ getServerVersion() >> new ServerVersion ([4 , 0 , 0 ])
546+ }
509547 }, null )
510548 }
511549 }, null )
512550 }
513551 }
514552
515553 when : ' Resume Token'
516- def operation = new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
554+ new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
517555 .resumeAfter(new BsonDocument ())
556+ .executeAsync(binding, Stub (SingleResultCallback ))
557+
558+ then :
559+ changeStream. containsKey(' resumeAfter' )
560+ ! changeStream. containsKey(' startAtOperationTime' )
518561
519- operation. executeAsync(binding, Stub (SingleResultCallback ))
562+ when : ' Set startAtOperationTime'
563+ def startAtTime = new BsonTimestamp (42 )
564+ new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
565+ .startAtOperationTime(startAtTime)
566+ .executeAsync(binding, Stub (SingleResultCallback ))
520567
521568 then :
522- operation . getStartAtOperationTime( ) == null
569+ changeStream . getTimestamp( ' startAtOperationTime ' ) == startAtTime
523570
524- when : ' No token or time'
525- operation = new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
526- operation. executeAsync(binding, Stub (SingleResultCallback ))
571+ when : ' set startAtOperationTimeForResume'
572+ def resumeStartAt = new BsonTimestamp (42 , 42 )
573+ new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
574+ .startOperationTimeForResume(resumeStartAt)
575+ .executeAsync(binding, Stub (SingleResultCallback ))
527576
528577 then :
529- operation . getStartAtOperationTime( ) == new BsonTimestamp ()
578+ changeStream . getTimestamp( ' startAtOperationTime ' ) == resumeStartAt
530579
531- when : ' set time'
532- def startAtTime = new BsonTimestamp (42 )
533- operation = operation. startAtOperationTime(startAtTime)
534- operation. executeAsync(binding, Stub (SingleResultCallback ))
580+ when : ' set startAtOperationTime && startAtOperationTimeForResume'
581+ new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
582+ .startAtOperationTime(startAtTime)
583+ .startOperationTimeForResume(resumeStartAt)
584+ .executeAsync(binding, Stub (SingleResultCallback ))
585+
586+ then :
587+ changeStream. getTimestamp(' startAtOperationTime' ) == startAtTime
588+
589+ when : ' set resumeToken && startAtOperationTime && startAtOperationTimeForResume'
590+ new ChangeStreamOperation<BsonDocument > (helper. getNamespace(), FullDocument . DEFAULT , [], CODEC )
591+ .resumeAfter(new BsonDocument ())
592+ .startAtOperationTime(startAtTime)
593+ .startOperationTimeForResume(resumeStartAt)
594+ .executeAsync(binding, Stub (SingleResultCallback ))
535595
536596 then :
537- operation. getStartAtOperationTime() == startAtTime
597+ changeStream. containsKey(' resumeAfter' )
598+ changeStream. getTimestamp(' startAtOperationTime' ) == startAtTime
538599 }
539600
540601 private final static CODEC = new BsonDocumentCodec ()
@@ -573,4 +634,7 @@ class ChangeStreamOperationSpecification extends OperationFunctionalSpecificatio
573634 }
574635 }
575636
637+ def getChangeStream (BsonDocument command ) {
638+ command. getArray(' pipeline' ). head(). getDocument(' $changeStream' )
639+ }
576640}
0 commit comments