@@ -67,8 +67,8 @@ public ClientServerInputMultiplexer(
6767 this .source = source ;
6868 this .isClient = isClient ;
6969
70- this .serverReceiver = new InternalDuplexConnection (this , source );
71- this .clientReceiver = new InternalDuplexConnection (this , source );
70+ this .serverReceiver = new InternalDuplexConnection (Type . SERVER , this , source );
71+ this .clientReceiver = new InternalDuplexConnection (Type . CLIENT , this , source );
7272 this .serverConnection = registry .initConnection (Type .SERVER , serverReceiver );
7373 this .clientConnection = registry .initConnection (Type .CLIENT , clientReceiver );
7474 }
@@ -221,6 +221,7 @@ public String toString() {
221221
222222 private static class InternalDuplexConnection extends Flux <ByteBuf >
223223 implements Subscription , DuplexConnection {
224+ private final Type type ;
224225 private final ClientServerInputMultiplexer clientServerInputMultiplexer ;
225226 private final DuplexConnection source ;
226227
@@ -231,7 +232,10 @@ private static class InternalDuplexConnection extends Flux<ByteBuf>
231232 CoreSubscriber <? super ByteBuf > actual ;
232233
233234 public InternalDuplexConnection (
234- ClientServerInputMultiplexer clientServerInputMultiplexer , DuplexConnection source ) {
235+ Type type ,
236+ ClientServerInputMultiplexer clientServerInputMultiplexer ,
237+ DuplexConnection source ) {
238+ this .type = type ;
235239 this .clientServerInputMultiplexer = clientServerInputMultiplexer ;
236240 this .source = source ;
237241 }
@@ -331,7 +335,14 @@ public double availability() {
331335
332336 @ Override
333337 public String toString () {
334- return "InternalDuplexConnection{" + ", source=" + source + ", state=" + state + '}' ;
338+ return "InternalDuplexConnection{"
339+ + "type="
340+ + type
341+ + ", source="
342+ + source
343+ + ", state="
344+ + state
345+ + '}' ;
335346 }
336347 }
337348}
0 commit comments