2424import static com .rabbitmq .stream .impl .TestUtils .latchAssert ;
2525import static com .rabbitmq .stream .impl .TestUtils .streamName ;
2626import static com .rabbitmq .stream .impl .TestUtils .waitAtMost ;
27+ import static java .time .Duration .ofSeconds ;
2728import static java .util .stream .Collectors .toList ;
2829import static org .assertj .core .api .Assertions .assertThat ;
2930
3839import com .rabbitmq .stream .impl .TestUtils .BrokerVersionAtLeast ;
3940import com .rabbitmq .stream .impl .TestUtils .BrokerVersionAtLeast311Condition ;
4041import com .rabbitmq .stream .impl .TestUtils .DisabledIfRabbitMqCtlNotSet ;
42+ import io .github .bucket4j .Bucket ;
4143import java .nio .charset .StandardCharsets ;
4244import java .time .Duration ;
4345import java .util .Collections ;
@@ -480,7 +482,7 @@ void killingConnectionsShouldTriggerConsumerUpdateNotification() throws Exceptio
480482
481483 @ Test
482484 void superStreamRebalancingShouldWorkWhilePublishing (TestInfo info ) throws Exception {
483- Map <Byte , Boolean > consumerStates = consumerStates (3 * 3 );
485+ Map <Byte , Boolean > consumerStates = consumerStates (2 );
484486 String superStream = streamName (info );
485487 String consumerName = "foo" ;
486488 Client configurationClient = cf .get ();
@@ -495,15 +497,25 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep
495497 publisher .declarePublisher (b (0 ), null , partitionInUse );
496498 new Thread (
497499 () -> {
500+ long rate = 20_000 ;
501+ Bucket bucket =
502+ Bucket .builder ()
503+ .addLimit (limit -> limit .capacity (rate ).refillGreedy (rate , ofSeconds (1 )))
504+ .build ();
498505 while (keepPublishing .get ()) {
506+ try {
507+ bucket .asBlocking ().consume (1 );
508+ } catch (InterruptedException e ) {
509+ Thread .currentThread ().interrupt ();
510+ break ;
511+ }
499512 publisher .publish (
500513 b (0 ),
501514 Collections .singletonList (
502515 publisher
503516 .messageBuilder ()
504517 .addData ("hello" .getBytes (StandardCharsets .UTF_8 ))
505518 .build ()));
506- TestUtils .waitMs (1 );
507519 }
508520 })
509521 .start ();
@@ -526,7 +538,7 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep
526538 // we keep track of credit errors
527539 // with the amount of initial credit and the rebalancing,
528540 // the first subscriber is likely to have in-flight credit commands
529- // when it becomes inactive. The server should then sends some credit
541+ // when it becomes inactive. The server should then send some credit
530542 // notifications to tell the client it's not supposed to ask for credits
531543 // for this subscription.
532544 CreditNotification creditNotification =
@@ -603,7 +615,7 @@ void singleActiveConsumerMustHaveName() {
603615 @ DisabledIfRabbitMqCtlNotSet
604616 @ BrokerVersionAtLeast (RABBITMQ_3_11_14 )
605617 void connectionShouldBeClosedIfConsumerUpdateTakesTooLong () throws Exception {
606- Duration timeout = Duration . ofSeconds (1 );
618+ Duration timeout = ofSeconds (1 );
607619 try {
608620 Cli .setEnv ("request_timeout" , String .valueOf (timeout .getSeconds ()));
609621 CountDownLatch shutdownLatch = new CountDownLatch (1 );
0 commit comments