Skip to content

Commit 911b117

Browse files
committed
Implement otel retry metrics (#12064)
implements [A96](https://github.com/grpc/proposal/pull/488/files)
1 parent e28b5b4 commit 911b117

File tree

13 files changed

+469
-39
lines changed

13 files changed

+469
-39
lines changed

api/src/main/java/io/grpc/ClientStreamTracer.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,15 @@ public static final class StreamInfo {
132132
private final CallOptions callOptions;
133133
private final int previousAttempts;
134134
private final boolean isTransparentRetry;
135+
private final boolean isHedging;
135136

136137
StreamInfo(
137-
CallOptions callOptions, int previousAttempts, boolean isTransparentRetry) {
138+
CallOptions callOptions, int previousAttempts, boolean isTransparentRetry,
139+
boolean isHedging) {
138140
this.callOptions = checkNotNull(callOptions, "callOptions");
139141
this.previousAttempts = previousAttempts;
140142
this.isTransparentRetry = isTransparentRetry;
143+
this.isHedging = isHedging;
141144
}
142145

143146
/**
@@ -165,6 +168,15 @@ public boolean isTransparentRetry() {
165168
return isTransparentRetry;
166169
}
167170

171+
/**
172+
* Whether the stream is hedging.
173+
*
174+
* @since 1.74.0
175+
*/
176+
public boolean isHedging() {
177+
return isHedging;
178+
}
179+
168180
/**
169181
* Converts this StreamInfo into a new Builder.
170182
*
@@ -174,7 +186,9 @@ public Builder toBuilder() {
174186
return new Builder()
175187
.setCallOptions(callOptions)
176188
.setPreviousAttempts(previousAttempts)
177-
.setIsTransparentRetry(isTransparentRetry);
189+
.setIsTransparentRetry(isTransparentRetry)
190+
.setIsHedging(isHedging);
191+
178192
}
179193

180194
/**
@@ -192,6 +206,7 @@ public String toString() {
192206
.add("callOptions", callOptions)
193207
.add("previousAttempts", previousAttempts)
194208
.add("isTransparentRetry", isTransparentRetry)
209+
.add("isHedging", isHedging)
195210
.toString();
196211
}
197212

@@ -204,6 +219,7 @@ public static final class Builder {
204219
private CallOptions callOptions = CallOptions.DEFAULT;
205220
private int previousAttempts;
206221
private boolean isTransparentRetry;
222+
private boolean isHedging;
207223

208224
Builder() {
209225
}
@@ -236,11 +252,21 @@ public Builder setIsTransparentRetry(boolean isTransparentRetry) {
236252
return this;
237253
}
238254

255+
/**
256+
* Sets whether the stream is hedging.
257+
*
258+
* @since 1.74.0
259+
*/
260+
public Builder setIsHedging(boolean isHedging) {
261+
this.isHedging = isHedging;
262+
return this;
263+
}
264+
239265
/**
240266
* Builds a new StreamInfo.
241267
*/
242268
public StreamInfo build() {
243-
return new StreamInfo(callOptions, previousAttempts, isTransparentRetry);
269+
return new StreamInfo(callOptions, previousAttempts, isTransparentRetry, isHedging);
244270
}
245271
}
246272
}

core/src/main/java/io/grpc/internal/ClientCallImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,8 @@ public void runInContext() {
250250
stream = clientStreamProvider.newStream(method, callOptions, headers, context);
251251
} else {
252252
ClientStreamTracer[] tracers =
253-
GrpcUtil.getClientStreamTracers(callOptions, headers, 0, false);
253+
GrpcUtil.getClientStreamTracers(callOptions, headers, 0,
254+
false, false);
254255
String deadlineName = contextIsDeadlineSource ? "Context" : "CallOptions";
255256
Long nameResolutionDelay = callOptions.getOption(NAME_RESOLUTION_DELAYED);
256257
String description = String.format(

core/src/main/java/io/grpc/internal/GrpcUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -759,13 +759,15 @@ public ListenableFuture<SocketStats> getStats() {
759759

760760
/** Gets stream tracers based on CallOptions. */
761761
public static ClientStreamTracer[] getClientStreamTracers(
762-
CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry) {
762+
CallOptions callOptions, Metadata headers, int previousAttempts, boolean isTransparentRetry,
763+
boolean isHedging) {
763764
List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories();
764765
ClientStreamTracer[] tracers = new ClientStreamTracer[factories.size() + 1];
765766
StreamInfo streamInfo = StreamInfo.newBuilder()
766767
.setCallOptions(callOptions)
767768
.setPreviousAttempts(previousAttempts)
768769
.setIsTransparentRetry(isTransparentRetry)
770+
.setIsHedging(isHedging)
769771
.build();
770772
for (int i = 0; i < factories.size(); i++) {
771773
tracers[i] = factories.get(i).newClientStreamTracer(streamInfo, headers);

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -479,7 +479,8 @@ public ClientStream newStream(
479479
// the delayed transport or a real transport will go in-use and cancel the idle timer.
480480
if (!retryEnabled) {
481481
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
482-
callOptions, headers, 0, /* isTransparentRetry= */ false);
482+
callOptions, headers, 0, /* isTransparentRetry= */ false,
483+
/* isHedging= */false);
483484
Context origContext = context.attach();
484485
try {
485486
return delayedTransport.newStream(method, headers, callOptions, tracers);
@@ -519,10 +520,10 @@ void postCommit() {
519520
@Override
520521
ClientStream newSubstream(
521522
Metadata newHeaders, ClientStreamTracer.Factory factory, int previousAttempts,
522-
boolean isTransparentRetry) {
523+
boolean isTransparentRetry, boolean isHedgedStream) {
523524
CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
524525
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
525-
newOptions, newHeaders, previousAttempts, isTransparentRetry);
526+
newOptions, newHeaders, previousAttempts, isTransparentRetry, isHedgedStream);
526527
Context origContext = context.attach();
527528
try {
528529
return delayedTransport.newStream(method, newHeaders, newOptions, tracers);

core/src/main/java/io/grpc/internal/OobChannel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented<Ch
8888
public ClientStream newStream(MethodDescriptor<?, ?> method,
8989
CallOptions callOptions, Metadata headers, Context context) {
9090
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
91-
callOptions, headers, 0, /* isTransparentRetry= */ false);
91+
callOptions, headers, 0, /* isTransparentRetry= */ false,
92+
/* isHedging= */ false);
9293
Context origContext = context.attach();
9394
// delayed transport's newStream() always acquires a lock, but concurrent performance doesn't
9495
// matter here because OOB communication should be sparse, and it's not on application RPC's

core/src/main/java/io/grpc/internal/RetriableStream.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,8 @@ private void commitAndRun(Substream winningSubstream) {
245245
// returns null means we should not create new sub streams, e.g. cancelled or
246246
// other close condition is met for retriableStream.
247247
@Nullable
248-
private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry) {
248+
private Substream createSubstream(int previousAttemptCount, boolean isTransparentRetry,
249+
boolean isHedgedStream) {
249250
int inFlight;
250251
do {
251252
inFlight = inFlightSubStreams.get();
@@ -266,7 +267,8 @@ public ClientStreamTracer newClientStreamTracer(
266267

267268
Metadata newHeaders = updateHeaders(headers, previousAttemptCount);
268269
// NOTICE: This set _must_ be done before stream.start() and it actually is.
269-
sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry);
270+
sub.stream = newSubstream(newHeaders, tracerFactory, previousAttemptCount, isTransparentRetry,
271+
isHedgedStream);
270272
return sub;
271273
}
272274

@@ -276,7 +278,7 @@ public ClientStreamTracer newClientStreamTracer(
276278
*/
277279
abstract ClientStream newSubstream(
278280
Metadata headers, ClientStreamTracer.Factory tracerFactory, int previousAttempts,
279-
boolean isTransparentRetry);
281+
boolean isTransparentRetry, boolean isHedgedStream);
280282

281283
/** Adds grpc-previous-rpc-attempts in the headers of a retry/hedging RPC. */
282284
@VisibleForTesting
@@ -398,7 +400,7 @@ public final void start(ClientStreamListener listener) {
398400
state.buffer.add(new StartEntry());
399401
}
400402

401-
Substream substream = createSubstream(0, false);
403+
Substream substream = createSubstream(0, false, false);
402404
if (substream == null) {
403405
return;
404406
}
@@ -471,7 +473,7 @@ public void run() {
471473
// If this run is not cancelled, the value of state.hedgingAttemptCount won't change
472474
// until state.addActiveHedge() is called subsequently, even the state could possibly
473475
// change.
474-
Substream newSubstream = createSubstream(state.hedgingAttemptCount, false);
476+
Substream newSubstream = createSubstream(state.hedgingAttemptCount, false, true);
475477
if (newSubstream == null) {
476478
return;
477479
}
@@ -949,7 +951,8 @@ public void run() {
949951
|| (rpcProgress == RpcProgress.REFUSED
950952
&& noMoreTransparentRetry.compareAndSet(false, true))) {
951953
// transparent retry
952-
final Substream newSubstream = createSubstream(substream.previousAttemptCount, true);
954+
final Substream newSubstream = createSubstream(substream.previousAttemptCount,
955+
true, false);
953956
if (newSubstream == null) {
954957
return;
955958
}
@@ -1001,7 +1004,8 @@ public void run() {
10011004
RetryPlan retryPlan = makeRetryDecision(status, trailers);
10021005
if (retryPlan.shouldRetry) {
10031006
// retry
1004-
Substream newSubstream = createSubstream(substream.previousAttemptCount + 1, false);
1007+
Substream newSubstream = createSubstream(substream.previousAttemptCount + 1,
1008+
false, false);
10051009
if (newSubstream == null) {
10061010
return;
10071011
}

core/src/main/java/io/grpc/internal/SubchannelChannel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ public ClientStream newStream(MethodDescriptor<?, ?> method,
5959
transport = notReadyTransport;
6060
}
6161
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
62-
callOptions, headers, 0, /* isTransparentRetry= */ false);
62+
callOptions, headers, 0, /* isTransparentRetry= */ false,
63+
/* isHedging= */ false);
6364
Context origContext = context.attach();
6465
try {
6566
return transport.newStream(method, headers, callOptions, tracers);

core/src/test/java/io/grpc/internal/RetriableStreamTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ ClientStream newSubstream(
186186
Metadata metadata,
187187
ClientStreamTracer.Factory tracerFactory,
188188
int previousAttempts,
189-
boolean isTransparentRetry) {
189+
boolean isTransparentRetry,
190+
boolean isHedgedStream) {
190191
bufferSizeTracer =
191192
tracerFactory.newClientStreamTracer(STREAM_INFO, metadata);
192193
int actualPreviousRpcAttemptsInHeader = metadata.get(GRPC_PREVIOUS_RPC_ATTEMPTS) == null

opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
2020
import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;
21+
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.HEDGE_BUCKETS;
2122
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LATENCY_BUCKETS;
23+
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.RETRY_BUCKETS;
2224
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.SIZE_BUCKETS;
25+
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TRANSPARENT_RETRY_BUCKETS;
2326

2427
import com.google.common.annotations.VisibleForTesting;
2528
import com.google.common.base.Stopwatch;
@@ -64,8 +67,8 @@ public Stopwatch get() {
6467
};
6568

6669
@VisibleForTesting
67-
static boolean ENABLE_OTEL_TRACING = GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING",
68-
false);
70+
static boolean ENABLE_OTEL_TRACING =
71+
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_OTEL_TRACING", false);
6972

7073
private final OpenTelemetry openTelemetrySdk;
7174
private final MeterProvider meterProvider;
@@ -241,6 +244,54 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter,
241244
.build());
242245
}
243246

247+
if (isMetricEnabled("grpc.client.call.retries", enableMetrics, disableDefault)) {
248+
builder.clientCallRetriesCounter(
249+
meter.histogramBuilder(
250+
"grpc.client.call.retries")
251+
.setUnit("{retry}")
252+
.setDescription("Number of retries during the client call. "
253+
+ "If there were no retries, 0 is not reported.")
254+
.ofLongs()
255+
.setExplicitBucketBoundariesAdvice(RETRY_BUCKETS)
256+
.build());
257+
}
258+
259+
if (isMetricEnabled("grpc.client.call.transparent_retries", enableMetrics,
260+
disableDefault)) {
261+
builder.clientCallTransparentRetriesCounter(
262+
meter.histogramBuilder(
263+
"grpc.client.call.transparent_retries")
264+
.setUnit("{transparent_retry}")
265+
.setDescription("Number of transparent retries during the client call. "
266+
+ "If there were no transparent retries, 0 is not reported.")
267+
.ofLongs()
268+
.setExplicitBucketBoundariesAdvice(TRANSPARENT_RETRY_BUCKETS)
269+
.build());
270+
}
271+
272+
if (isMetricEnabled("grpc.client.call.hedges", enableMetrics, disableDefault)) {
273+
builder.clientCallHedgesCounter(
274+
meter.histogramBuilder(
275+
"grpc.client.call.hedges")
276+
.setUnit("{hedge}")
277+
.setDescription("Number of hedges during the client call. "
278+
+ "If there were no hedges, 0 is not reported.")
279+
.ofLongs()
280+
.setExplicitBucketBoundariesAdvice(HEDGE_BUCKETS)
281+
.build());
282+
}
283+
284+
if (isMetricEnabled("grpc.client.call.retry_delay", enableMetrics, disableDefault)) {
285+
builder.clientCallRetryDelayCounter(
286+
meter.histogramBuilder(
287+
"grpc.client.call.retry_delay")
288+
.setUnit("s")
289+
.setDescription("Total time of delay while there is no active attempt during the "
290+
+ "client call")
291+
.setExplicitBucketBoundariesAdvice(LATENCY_BUCKETS)
292+
.build());
293+
}
294+
244295
if (isMetricEnabled("grpc.server.call.started", enableMetrics, disableDefault)) {
245296
builder.serverCallCountCounter(
246297
meter.counterBuilder("grpc.server.call.started")
@@ -259,8 +310,8 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter,
259310
.build());
260311
}
261312

262-
if (isMetricEnabled("grpc.server.call.sent_total_compressed_message_size", enableMetrics,
263-
disableDefault)) {
313+
if (isMetricEnabled("grpc.server.call.sent_total_compressed_message_size",
314+
enableMetrics, disableDefault)) {
264315
builder.serverTotalSentCompressedMessageSizeCounter(
265316
meter.histogramBuilder(
266317
"grpc.server.call.sent_total_compressed_message_size")
@@ -271,8 +322,8 @@ static OpenTelemetryMetricsResource createMetricInstruments(Meter meter,
271322
.build());
272323
}
273324

274-
if (isMetricEnabled("grpc.server.call.rcvd_total_compressed_message_size", enableMetrics,
275-
disableDefault)) {
325+
if (isMetricEnabled("grpc.server.call.rcvd_total_compressed_message_size",
326+
enableMetrics, disableDefault)) {
276327
builder.serverTotalReceivedCompressedMessageSizeCounter(
277328
meter.histogramBuilder(
278329
"grpc.server.call.rcvd_total_compressed_message_size")

0 commit comments

Comments
 (0)