33import io .netty .buffer .ByteBuf ;
44import io .netty .buffer .UnpooledByteBufAllocator ;
55import io .rsocket .core .StressSubscriber ;
6+ import io .rsocket .utils .FastLogger ;
7+ import java .util .Arrays ;
8+ import java .util .ConcurrentModificationException ;
69import org .openjdk .jcstress .annotations .Actor ;
710import org .openjdk .jcstress .annotations .Arbiter ;
811import org .openjdk .jcstress .annotations .Expect ;
1417import org .openjdk .jcstress .infra .results .L_Result ;
1518import reactor .core .Fuseable ;
1619import reactor .core .publisher .Hooks ;
20+ import reactor .util .Logger ;
1721
1822public abstract class UnboundedProcessorStressTest {
1923
2024 static {
2125 Hooks .onErrorDropped (t -> {});
2226 }
2327
24- final UnboundedProcessor unboundedProcessor = new UnboundedProcessor ();
28+ final Logger logger = new FastLogger (getClass ().getName ());
29+
30+ final UnboundedProcessor unboundedProcessor = new UnboundedProcessor (logger );
2531
2632 @ JCStressTest
2733 @ Outcome (
@@ -145,6 +151,8 @@ public void arbiter(LLL_Result r) {
145151 stressSubscriber .values .forEach (ByteBuf ::release );
146152
147153 r .r3 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
154+
155+ checkOutcomes (this , r .toString (), logger );
148156 }
149157 }
150158
@@ -270,6 +278,8 @@ public void arbiter(LLL_Result r) {
270278 stressSubscriber .values .forEach (ByteBuf ::release );
271279
272280 r .r3 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
281+
282+ checkOutcomes (this , r .toString (), logger );
273283 }
274284 }
275285
@@ -375,6 +385,8 @@ public void arbiter(LLL_Result r) {
375385 stressSubscriber .values .forEach (ByteBuf ::release );
376386
377387 r .r3 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
388+
389+ checkOutcomes (this , r .toString (), logger );
378390 }
379391 }
380392
@@ -476,6 +488,8 @@ public void arbiter(LLL_Result r) {
476488 stressSubscriber .values .forEach (ByteBuf ::release );
477489
478490 r .r3 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
491+
492+ checkOutcomes (this , r .toString (), logger );
479493 }
480494 }
481495
@@ -578,6 +592,8 @@ public void arbiter(LLL_Result r) {
578592 stressSubscriber .values .forEach (ByteBuf ::release );
579593
580594 r .r3 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
595+
596+ checkOutcomes (this , r .toString (), logger );
581597 }
582598 }
583599
@@ -701,6 +717,8 @@ public void arbiter(LLL_Result r) {
701717 stressSubscriber .values .forEach (ByteBuf ::release );
702718
703719 r .r3 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
720+
721+ checkOutcomes (this , r .toString (), logger );
704722 }
705723 }
706724
@@ -781,6 +799,8 @@ public void arbiter(LLL_Result r) {
781799 stressSubscriber .values .forEach (ByteBuf ::release );
782800
783801 r .r3 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
802+
803+ checkOutcomes (this , r .toString (), logger );
784804 }
785805 }
786806
@@ -837,9 +857,15 @@ public void arbiter(LLL_Result r) {
837857 + stressSubscriber .onErrorCalls * 2
838858 + stressSubscriber .droppedErrors .size () * 3 ;
839859
860+ if (stressSubscriber .concurrentOnNext || stressSubscriber .concurrentOnComplete ) {
861+ throw new ConcurrentModificationException ("boo" );
862+ }
863+
840864 stressSubscriber .values .forEach (ByteBuf ::release );
841865
842866 r .r3 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
867+
868+ checkOutcomes (this , r .toString (), logger );
843869 }
844870 }
845871
@@ -892,6 +918,8 @@ public void arbiter(LLL_Result r) {
892918 stressSubscriber .values .forEach (ByteBuf ::release );
893919
894920 r .r3 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
921+
922+ checkOutcomes (this , r .toString (), logger );
895923 }
896924 }
897925
@@ -1107,6 +1135,8 @@ public void arbiter(LLLL_Result r) {
11071135 stressSubscriber .values .forEach (ByteBuf ::release );
11081136
11091137 r .r4 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
1138+
1139+ checkOutcomes (this , r .toString (), logger );
11101140 }
11111141 }
11121142
@@ -1238,6 +1268,8 @@ public void arbiter(LLLL_Result r) {
12381268 stressSubscriber .values .forEach (ByteBuf ::release );
12391269
12401270 r .r4 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
1271+
1272+ checkOutcomes (this , r .toString (), logger );
12411273 }
12421274 }
12431275
@@ -1390,6 +1422,8 @@ public void arbiter(LLLL_Result r) {
13901422 stressSubscriber .values .forEach (ByteBuf ::release );
13911423
13921424 r .r4 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
1425+
1426+ checkOutcomes (this , r .toString (), logger );
13931427 }
13941428 }
13951429
@@ -1522,6 +1556,8 @@ public void arbiter(LLLL_Result r) {
15221556 stressSubscriber .values .forEach (ByteBuf ::release );
15231557
15241558 r .r4 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
1559+
1560+ checkOutcomes (this , r .toString (), logger );
15251561 }
15261562 }
15271563
@@ -1587,6 +1623,8 @@ public void arbiter(LLLL_Result r) {
15871623 stressSubscriber .values .forEach (ByteBuf ::release );
15881624
15891625 r .r4 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
1626+
1627+ checkOutcomes (this , r .toString (), logger );
15901628 }
15911629 }
15921630
@@ -1652,6 +1690,8 @@ public void arbiter(LLLL_Result r) {
16521690 stressSubscriber .values .forEach (ByteBuf ::release );
16531691
16541692 r .r4 = byteBuf1 .refCnt () + byteBuf2 .refCnt () + byteBuf3 .refCnt () + byteBuf4 .refCnt ();
1693+
1694+ checkOutcomes (this , r .toString (), logger );
16551695 }
16561696 }
16571697
@@ -1678,6 +1718,16 @@ public void subscribe2() {
16781718 @ Arbiter
16791719 public void arbiter (L_Result r ) {
16801720 r .r1 = stressSubscriber1 .onErrorCalls + stressSubscriber2 .onErrorCalls ;
1721+
1722+ checkOutcomes (this , r .toString (), logger );
1723+ }
1724+ }
1725+
1726+ static void checkOutcomes (Object instance , String result , Logger logger ) {
1727+ if (Arrays .stream (instance .getClass ().getDeclaredAnnotationsByType (Outcome .class ))
1728+ .flatMap (o -> Arrays .stream (o .id ()))
1729+ .noneMatch (s -> s .equalsIgnoreCase (result ))) {
1730+ throw new RuntimeException (result + " " + logger );
16811731 }
16821732 }
16831733}
0 commit comments