6868 * "\"oobReportingPeriod\":\"10s\"," +
6969 * "\"weightExpirationPeriod\":\"180s\"," +
7070 * "\"errorUtilizationPenalty\":\"1.0\"," +
71- * "\"weightUpdatePeriod\":\"1s\"}}]}";
71+ * "\"weightUpdatePeriod\":\"1s\"," +
72+ * "\"slowStartConfig\":{" +
73+ * "\"minWeightPercent\":10.0," +
74+ * "\"aggression\":1.0," +
75+ * "\"slowStartWindow\":\"30s\"}}}]}";
7276 * serviceConfig = (Map<String, ?>) JsonParser.parse(wrrConfig);
7377 * channel = ManagedChannelBuilder.forTarget("test:///lb.test.grpc.io")
7478 * .defaultServiceConfig(serviceConfig)
@@ -90,6 +94,7 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
9094 private static final LongCounterMetricInstrument RR_FALLBACK_COUNTER ;
9195 private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_NOT_YET_USEABLE_COUNTER ;
9296 private static final LongCounterMetricInstrument ENDPOINT_WEIGHT_STALE_COUNTER ;
97+ private static final LongCounterMetricInstrument ENDPOINT_SLOW_START_COUNTER ;
9398 private static final DoubleHistogramMetricInstrument ENDPOINT_WEIGHTS_HISTOGRAM ;
9499 private static final Logger log = Logger .getLogger (
95100 WeightedRoundRobinLoadBalancer .class .getName ());
@@ -133,6 +138,14 @@ final class WeightedRoundRobinLoadBalancer extends MultiChildLoadBalancer {
133138 Lists .newArrayList ("grpc.target" ),
134139 Lists .newArrayList ("grpc.lb.locality" , "grpc.lb.backend_service" ),
135140 false );
141+ ENDPOINT_SLOW_START_COUNTER = metricInstrumentRegistry .registerLongCounter (
142+ "grpc.lb.wrr.endpoints_in_slow_start" ,
143+ "EXPERIMENTAL. Number of endpoints from each scheduler update that "
144+ + "are in slow start window" ,
145+ "{endpoint}" ,
146+ Lists .newArrayList ("grpc.target" ),
147+ Lists .newArrayList ("grpc.lb.locality" , "grpc.lb.backend_service" ),
148+ false );
136149 ENDPOINT_WEIGHTS_HISTOGRAM = metricInstrumentRegistry .registerDoubleHistogram (
137150 "grpc.lb.wrr.endpoint_weights" ,
138151 "EXPERIMENTAL. The histogram buckets will be endpoint weight ranges." ,
@@ -243,16 +256,21 @@ private SubchannelPicker createReadyPicker(Collection<ChildLbState> activeList)
243256 private void updateWeight (WeightedRoundRobinPicker picker ) {
244257 Helper helper = getHelper ();
245258 float [] newWeights = new float [picker .children .size ()];
259+ float [] newScales = new float [picker .children .size ()];
246260 AtomicInteger staleEndpoints = new AtomicInteger ();
247261 AtomicInteger notYetUsableEndpoints = new AtomicInteger ();
262+ AtomicInteger slowStartEndpoints = new AtomicInteger ();
248263 for (int i = 0 ; i < picker .children .size (); i ++) {
249264 double newWeight = ((WeightedChildLbState ) picker .children .get (i )).getWeight (staleEndpoints ,
250265 notYetUsableEndpoints );
266+ double newScale = ((WeightedChildLbState ) picker .children .get (i ))
267+ .getScale (slowStartEndpoints );
251268 helper .getMetricRecorder ()
252269 .recordDoubleHistogram (ENDPOINT_WEIGHTS_HISTOGRAM , newWeight ,
253270 ImmutableList .of (helper .getChannelTarget ()),
254271 ImmutableList .of (locality , backendService ));
255272 newWeights [i ] = newWeight > 0 ? (float ) newWeight : 0.0f ;
273+ newScales [i ] = newScale > 0 ? (float ) newScale : 1.0f ;
256274 }
257275
258276 if (staleEndpoints .get () > 0 ) {
@@ -267,7 +285,13 @@ private void updateWeight(WeightedRoundRobinPicker picker) {
267285 ImmutableList .of (helper .getChannelTarget ()),
268286 ImmutableList .of (locality , backendService ));
269287 }
270- boolean weightsEffective = picker .updateWeight (newWeights );
288+ if (slowStartEndpoints .get () > 0 ) {
289+ helper .getMetricRecorder ()
290+ .addLongCounter (ENDPOINT_SLOW_START_COUNTER , slowStartEndpoints .get (),
291+ ImmutableList .of (helper .getChannelTarget ()),
292+ ImmutableList .of (locality , backendService ));
293+ }
294+ boolean weightsEffective = picker .updateWeight (newWeights , newScales );
271295 if (!weightsEffective ) {
272296 helper .getMetricRecorder ()
273297 .addLongCounter (RR_FALLBACK_COUNTER , 1 , ImmutableList .of (helper .getChannelTarget ()),
@@ -289,6 +313,7 @@ final class WeightedChildLbState extends ChildLbState {
289313 private final Set <WrrSubchannel > subchannels = new HashSet <>();
290314 private volatile long lastUpdated ;
291315 private volatile long nonEmptySince ;
316+ private volatile long readySince ;
292317 private volatile double weight = 0 ;
293318
294319 private OrcaReportListener orcaReportListener ;
@@ -320,6 +345,25 @@ private double getWeight(AtomicInteger staleEndpoints, AtomicInteger notYetUsabl
320345 }
321346 }
322347
348+ private double getScale (AtomicInteger slowStartEndpoints ) {
349+ if (config == null || config .slowStartConfig == null ) {
350+ return 1 ;
351+ }
352+ long slowStartWindowNanos = config .slowStartConfig .slowStartWindowNanos ;
353+ if (slowStartWindowNanos <= 0 ) {
354+ return 1 ;
355+ }
356+ long now = ticker .nanoTime ();
357+ if (now - readySince >= slowStartWindowNanos ) {
358+ return 1 ;
359+ } else {
360+ slowStartEndpoints .incrementAndGet ();
361+ double timeFactor = Math .max (now - readySince , 1.0 ) / slowStartWindowNanos ;
362+ double weightPercent = Math .pow (timeFactor , 1.0 / config .slowStartConfig .aggression );
363+ return Math .max (config .slowStartConfig .minWeightPercent / 100.0 , weightPercent );
364+ }
365+ }
366+
323367 public void addSubchannel (WrrSubchannel wrrSubchannel ) {
324368 subchannels .add (wrrSubchannel );
325369 }
@@ -439,6 +483,7 @@ public void start(SubchannelStateListener listener) {
439483 public void onSubchannelState (ConnectivityStateInfo newState ) {
440484 if (newState .getState ().equals (ConnectivityState .READY )) {
441485 owner .nonEmptySince = infTime ;
486+ owner .readySince = ticker .nanoTime ();
442487 }
443488 listener .onSubchannelState (newState );
444489 }
@@ -517,8 +562,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
517562 }
518563
519564 /** Returns {@code true} if weights are different than round_robin. */
520- private boolean updateWeight (float [] newWeights ) {
521- this .scheduler = new StaticStrideScheduler (newWeights , sequence );
565+ private boolean updateWeight (float [] newWeights , float [] newScales ) {
566+ this .scheduler = new StaticStrideScheduler (newWeights , newScales , sequence );
522567 return !this .scheduler .usesRoundRobin ();
523568 }
524569
@@ -604,7 +649,7 @@ static final class StaticStrideScheduler {
604649 private static final double K_MAX_RATIO = 10 ;
605650 private static final double K_MIN_RATIO = 0.1 ;
606651
607- StaticStrideScheduler (float [] weights , AtomicInteger sequence ) {
652+ StaticStrideScheduler (float [] weights , float [] scales , AtomicInteger sequence ) {
608653 checkArgument (weights .length >= 1 , "Couldn't build scheduler: requires at least one weight" );
609654 int numChannels = weights .length ;
610655 int numWeightedChannels = 0 ;
@@ -643,12 +688,14 @@ static final class StaticStrideScheduler {
643688 int weightLowerBound = (int ) Math .ceil (scalingFactor * unscaledMeanWeight * K_MIN_RATIO );
644689 short [] scaledWeights = new short [numChannels ];
645690 for (int i = 0 ; i < numChannels ; i ++) {
691+ double curScalingFactor = scalingFactor * scales [i ];
692+ int weight ;
646693 if (weights [i ] <= 0 ) {
647- scaledWeights [ i ] = (short ) Math .round (scalingFactor * unscaledMeanWeight );
694+ weight = (int ) Math .round (curScalingFactor * unscaledMeanWeight );
648695 } else {
649- int weight = (int ) Math .round (scalingFactor * Math .min (weights [i ], unscaledMaxWeight ));
650- scaledWeights [i ] = (short ) Math .max (weight , weightLowerBound );
696+ weight = (int ) Math .round (curScalingFactor * Math .min (weights [i ], unscaledMaxWeight ));
651697 }
698+ scaledWeights [i ] = (short ) Math .max (weight , weightLowerBound );
652699 }
653700
654701 this .scaledWeights = scaledWeights ;
0 commit comments