File tree Expand file tree Collapse file tree 1 file changed +5
-0
lines changed
rsocket-examples/src/test/java/io/rsocket/integration Expand file tree Collapse file tree 1 file changed +5
-0
lines changed Original file line number Diff line number Diff line change @@ -79,11 +79,14 @@ public void testCompleteWithoutNext() {
7979 final AtomicBoolean gracefullyDisposedClient = new AtomicBoolean ();
8080 final AtomicBoolean disposedServer = new AtomicBoolean ();
8181 final AtomicBoolean disposedClient = new AtomicBoolean ();
82+ final Sinks .Empty <Void > requestHandled = Sinks .unsafe ().empty ();
8283 handler =
8384 new RSocket () {
8485 @ Override
8586 public Flux <Payload > requestStream (Payload payload ) {
8687 payload .release ();
88+
89+ requestHandled .tryEmitEmpty ();
8790 return Flux .<Payload >never ().takeUntilOther (serverGracefulShutdownSink .asMono ());
8891 }
8992
@@ -135,6 +138,8 @@ public void disposeGracefully() {
135138 .expectComplete ()
136139 .verifyLater ();
137140
141+ requestHandled .asMono ().block (Duration .ofSeconds (5 ));
142+
138143 assertThat (gracefullyDisposedServer ).isFalse ();
139144 assertThat (gracefullyDisposedClient ).isFalse ();
140145
You can’t perform that action at this time.
0 commit comments