@@ -90,6 +90,7 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
9090 private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER ;
9191 private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER ;
9292 private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER ;
93+ private static final LongCounterMetricInstrument ENDPOINT_SLOW_START_COUNTER ;
9394 private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM ;
9495 private static final Logger log = Logger .getLogger (
9596 WeightedRoundRobinLoadBalancer .class .getName ());
@@ -133,6 +134,14 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
133134 Lists .newArrayList ("grpc.target" ),
134135 Lists .newArrayList ("grpc.lb.locality" , "grpc.lb.backend_service" ),
135136 false );
137+ ENDPOINT_SLOW_START_COUNTER = metricInstrumentRegistry .registerLongCounter (
138+ "grpc.lb.wrr.endpoints_in_slow_start" ,
139+ "EXPERIMENTAL. Number of endpoints from each scheduler update that "
140+ + "are in slow start window" ,
141+ "{endpoint}" ,
142+ Lists .newArrayList ("grpc.target" ),
143+ Lists .newArrayList ("grpc.lb.locality" , "grpc.lb.backend_service" ),
144+ false );
136145 ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry .registerDoubleHistogram (
137146 "grpc.lb.wrr.endpoint_weights" ,
138147 "EXPERIMENTAL. The histogram buckets will be endpoint weight ranges." ,
@@ -243,16 +252,21 @@ private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList)
243252 private void updateWeight (WeightedRoundRobinPicker picker ) {
244253 Helper helper = getHelper ();
245254 float [] newWeights = new float [picker .children .size ()];
255+ float [] newScales = new float [picker .children .size ()];
246256 AtomicInteger staleEndpoints = new AtomicInteger ();
247257 AtomicInteger notYetUsableEndpoints = new AtomicInteger ();
258+ AtomicInteger slowStartEndpoints = new AtomicInteger ();
248259 for (int i = 0 ; i < picker .children .size (); i ++) {
249260 double newWeight = ((WeightedChildLbState ) picker .children .get (i )).getWeight (staleEndpoints ,
250261 notYetUsableEndpoints );
262+ double newScale = ((WeightedChildLbState ) picker .children .get (i ))
263+ .getScale (slowStartEndpoints );
251264 helper .getMetricRecorder ()
252265 .recordDoubleHistogram (ENDPOINT_WEIGHTS_HISTOGRAM , newWeight ,
253266 ImmutableList .of (helper .getChannelTarget ()),
254267 ImmutableList .of (locality , backendService ));
255268 newWeights [i ] = newWeight > 0 ? (float ) newWeight : 0.0f ;
269+ newScales [i ] = newScale > 0 ? (float ) newScale : 1.0f ;
256270 }
257271
258272 if (staleEndpoints .get () > 0 ) {
@@ -267,7 +281,13 @@ private void updateWeight(WeightedRoundRobinPicker picker) {
267281 ImmutableList .of (helper .getChannelTarget ()),
268282 ImmutableList .of (locality , backendService ));
269283 }
270- boolean weightsEffective = picker .updateWeight (newWeights );
284+ if (slowStartEndpoints .get () > 0 ) {
285+ helper .getMetricRecorder ()
286+ .addLongCounter (ENDPOINT_SLOW_START_COUNTER , slowStartEndpoints .get (),
287+ ImmutableList .of (helper .getChannelTarget ()),
288+ ImmutableList .of (locality , backendService ));
289+ }
290+ boolean weightsEffective = picker .updateWeight (newWeights , newScales );
271291 if (!weightsEffective ) {
272292 helper .getMetricRecorder ()
273293 .addLongCounter (RR_FALLBACK_COUNTER , 1 , ImmutableList .of (helper .getChannelTarget ()),
@@ -289,6 +309,7 @@ final class WeightedChildLbState extends ChildLbState {
289309 private final Set <WrrSubchannel > subchannels = new HashSet <>();
290310 private volatile long lastUpdated ;
291311 private volatile long nonEmptySince ;
312+ private volatile long readySince ;
292313 private volatile double weight = 0 ;
293314
294315 private OrcaReportListener orcaReportListener ;
@@ -320,6 +341,25 @@ private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsabl
320341 }
321342 }
322343
344+ private double getScale (AtomicInteger slowStartEndpoints ) {
345+ if (config == null || config .slowStartConfig == null ) {
346+ return 1 ;
347+ }
348+ long slowStartWindowNanos = config .slowStartConfig .slowStartWindowNanos ;
349+ if (slowStartWindowNanos <= 0 ) {
350+ return 1 ;
351+ }
352+ long now = ticker .nanoTime ();
353+ if (now - readySince >= slowStartWindowNanos ) {
354+ return 1 ;
355+ } else {
356+ slowStartEndpoints .incrementAndGet ();
357+ double timeFactor = Math .max (now - readySince , 1.0 ) / slowStartWindowNanos ;
358+ double weightPercent = Math .pow (timeFactor , 1.0 / config .slowStartConfig .aggression );
359+ return Math .max (config .slowStartConfig .minWeightPercent / 100.0 , weightPercent );
360+ }
361+ }
362+
323363 public void addSubchannel (WrrSubchannel wrrSubchannel ) {
324364 subchannels .add (wrrSubchannel );
325365 }
@@ -439,6 +479,7 @@ public void start(SubchannelStateListener listener) {
439479 public void onSubchannelState (ConnectivityStateInfo newState ) {
440480 if (newState .getState ().equals (ConnectivityState .READY )) {
441481 owner .nonEmptySince = infTime ;
482+ owner .readySince = ticker .nanoTime ();
442483 }
443484 listener .onSubchannelState (newState );
444485 }
@@ -517,8 +558,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
517558 }
518559
519560 /** Returns {@code true} if weights are different than round_robin. */
520- private boolean updateWeight (float [] newWeights ) {
521- this .scheduler = new StaticStrideScheduler (newWeights , sequence );
561+ private boolean updateWeight (float [] newWeights , float [] newScales ) {
562+ this .scheduler = new StaticStrideScheduler (newWeights , newScales , sequence );
522563 return !this .scheduler .usesRoundRobin ();
523564 }
524565
@@ -604,7 +645,7 @@ static final class StaticStrideScheduler {
604645 private static final double K_MAX_RATIO = 10 ;
605646 private static final double K_MIN_RATIO = 0.1 ;
606647
607- StaticStrideScheduler (float [] weights , AtomicInteger sequence ) {
648+ StaticStrideScheduler (float [] weights , float [] scales , AtomicInteger sequence ) {
608649 checkArgument (weights .length >= 1 , "Couldn't build scheduler: requires at least one weight" );
609650 int numChannels = weights .length ;
610651 int numWeightedChannels = 0 ;
@@ -643,12 +684,14 @@ static final class StaticStrideScheduler {
643684 int weightLowerBound = (int ) Math .ceil (scalingFactor * unscaledMeanWeight * K_MIN_RATIO );
644685 short [] scaledWeights = new short [numChannels ];
645686 for (int i = 0 ; i < numChannels ; i ++) {
687+ double curScalingFactor = scalingFactor * scales [i ];
688+ int weight ;
646689 if (weights [i ] <= 0 ) {
647- scaledWeights [ i ] = (short ) Math .round (scalingFactor * unscaledMeanWeight );
690+ weight = (int ) Math .round (curScalingFactor * unscaledMeanWeight );
648691 } else {
649- int weight = (int ) Math .round (scalingFactor * Math .min (weights [i ], unscaledMaxWeight ));
650- scaledWeights [i ] = (short ) Math .max (weight , weightLowerBound );
692+ weight = (int ) Math .round (curScalingFactor * Math .min (weights [i ], unscaledMaxWeight ));
651693 }
694+ scaledWeights [i ] = (short ) Math .max (weight , weightLowerBound );
652695 }
653696
654697 this .scaledWeights = scaledWeights ;
0 commit comments