From 365553d1e7d2d3ba54755361829cf6852d97b8b1 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 7 Nov 2025 11:53:25 +0100 Subject: [PATCH 1/3] OTLP: return correct response type for partial successes Now returns an ExportMetricsServiceResponse with an embedded ExportMetricsPartialSuccess rather than a plain ExportMetricsServiceResponse. This also limits the size of the error message to avoid decoding issues in the collector. For indexing errors, it returns one sample error message per unique index and status code. It also limits the number of ignored data point messages to 10. --- .../otlp/OTLPMetricsTransportAction.java | 70 ++++++++++++++----- .../datapoint/DataPointGroupingContext.java | 19 ++++- .../otlp/OTLPMetricsTransportActionTests.java | 31 +++++--- .../DataPointGroupingContextTests.java | 20 +++--- 4 files changed, 104 insertions(+), 36 deletions(-) diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java index 3dcc22e3229bb..17c5f76ee4714 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java @@ -11,8 +11,6 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; -import com.google.protobuf.MessageLite; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.BytesRef; @@ -49,7 +47,9 @@ import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; +import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; /** * Transport action for handling OpenTelemetry Protocol (OTLP) Metrics requests. @@ -68,6 +68,7 @@ public class OTLPMetricsTransportAction extends HandledTransportAction< public static final ActionType TYPE = new ActionType<>(NAME); private static final Logger logger = LogManager.getLogger(OTLPMetricsTransportAction.class); + public static final int IGNORED_DATA_POINTS_MESSAGE_LIMIT = 10; private final Client client; @Inject @@ -136,6 +137,7 @@ private void addIndexRequest( .setRequireDataStream(true) .source(xContentBuilder) .tsid(tsid) + .setIncludeSourceOnError(false) .setDynamicTemplates(dynamicTemplates) ); } @@ -158,7 +160,10 @@ private static void handlePartialSuccess(ActionListener listene // (i.e. when the server accepts only parts of the data and rejects the rest), // the server MUST respond with HTTP 200 OK. // https://opentelemetry.io/docs/specs/otlp/#partial-success-1 - MessageLite response = responseWithRejectedDataPoints(context.getIgnoredDataPoints(), context.getIgnoredDataPointsMessage()); + ExportMetricsServiceResponse response = responseWithRejectedDataPoints( + context.getIgnoredDataPoints(), + context.getIgnoredDataPointsMessage(IGNORED_DATA_POINTS_MESSAGE_LIMIT) + ); listener.onResponse(new MetricsResponse(RestStatus.BAD_REQUEST, response)); } @@ -167,6 +172,7 @@ private static void handlePartialSuccess( DataPointGroupingContext context, ActionListener listener ) { + Map> failureGroups = new HashMap<>(); // If the request is only partially accepted // (i.e. when the server accepts only parts of the data and rejects the rest), // the server MUST respond with HTTP 200 OK. @@ -174,22 +180,54 @@ private static void handlePartialSuccess( RestStatus status = RestStatus.OK; int failures = 0; for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) { - failures += bulkItemResponse.isFailed() ? 1 : 0; - if (bulkItemResponse.isFailed() && bulkItemResponse.getFailure().getStatus() == RestStatus.TOO_MANY_REQUESTS) { - // If the server receives more requests than the client is allowed or the server is overloaded, - // the server SHOULD respond with HTTP 429 Too Many Requests or HTTP 503 Service Unavailable - // and MAY include “Retry-After” header with a recommended time interval in seconds to wait before retrying. - // https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling - status = RestStatus.TOO_MANY_REQUESTS; + BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); + if (failure != null) { + // we're counting each document as one data point here + // which is an approximation since one document can represent multiple data points + failures++; + if (failure.getStatus() == RestStatus.TOO_MANY_REQUESTS) { + // If the server receives more requests than the client is allowed or the server is overloaded, + // the server SHOULD respond with HTTP 429 Too Many Requests or HTTP 503 Service Unavailable + // and MAY include “Retry-After” header with a recommended time interval in seconds to wait before retrying. + // https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + status = RestStatus.TOO_MANY_REQUESTS; + } + FailureGroup failureGroup = failureGroups.computeIfAbsent(failure.getIndex(), k -> new HashMap<>()) + .computeIfAbsent(failure.getStatus(), k -> new FailureGroup(new AtomicInteger(0), failure.getMessage())); + failureGroup.failureCount().incrementAndGet(); } } - MessageLite response = responseWithRejectedDataPoints( + if (bulkItemResponses.getItems().length == failures) { + // all data points failed, so we report total data points as failures + failures = context.totalDataPoints(); + } + StringBuilder failureMessageBuilder = new StringBuilder(); + for (Map.Entry> indexEntry : failureGroups.entrySet()) { + String indexName = indexEntry.getKey(); + for (Map.Entry statusEntry : indexEntry.getValue().entrySet()) { + RestStatus restStatus = statusEntry.getKey(); + FailureGroup failureGroup = statusEntry.getValue(); + failureMessageBuilder.append("Index [") + .append(indexName) + .append("] returned status [") + .append(restStatus) + .append("] for ") + .append(failureGroup.failureCount()) + .append(" documents. Sample error message: "); + failureMessageBuilder.append(failureGroup.failureMessageSample()); + failureMessageBuilder.append("\n"); + } + } + failureMessageBuilder.append(context.getIgnoredDataPointsMessage(10)); + ExportMetricsServiceResponse response = responseWithRejectedDataPoints( failures + context.getIgnoredDataPoints(), - bulkItemResponses.buildFailureMessage() + context.getIgnoredDataPointsMessage() + failureMessageBuilder.toString() ); listener.onResponse(new MetricsResponse(status, response)); } + record FailureGroup(AtomicInteger failureCount, String failureMessageSample) {} + private static void handleFailure(ActionListener listener, Exception e, DataPointGroupingContext context) { // https://opentelemetry.io/docs/specs/otlp/#failures-1 // If the processing of the request fails, @@ -199,12 +237,12 @@ private static void handleFailure(ActionListener listener, Exce ); } - private static ExportMetricsPartialSuccess responseWithRejectedDataPoints(int rejectedDataPoints, String message) { - return ExportMetricsServiceResponse.newBuilder() - .getPartialSuccessBuilder() + private static ExportMetricsServiceResponse responseWithRejectedDataPoints(int rejectedDataPoints, String message) { + ExportMetricsPartialSuccess partialSuccess = ExportMetricsPartialSuccess.newBuilder() .setRejectedDataPoints(rejectedDataPoints) .setErrorMessage(message) .build(); + return ExportMetricsServiceResponse.newBuilder().setPartialSuccess(partialSuccess).build(); } public static class MetricsRequest extends ActionRequest implements CompositeIndicesRequest { @@ -229,7 +267,7 @@ public static class MetricsResponse extends ActionResponse { private final BytesReference response; private final RestStatus status; - public MetricsResponse(RestStatus status, MessageLite response) { + public MetricsResponse(RestStatus status, ExportMetricsServiceResponse response) { this(status, new BytesArray(response.toByteArray())); } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java index 3dad2dfcfd945..f4fc34a050bd4 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java @@ -119,8 +119,23 @@ public int getIgnoredDataPoints() { return ignoredDataPoints; } - public String getIgnoredDataPointsMessage() { - return ignoredDataPointMessages.isEmpty() ? "" : String.join("\n", ignoredDataPointMessages); + public String getIgnoredDataPointsMessage(int limit) { + if (ignoredDataPointMessages.isEmpty()) { + return ""; + } + StringBuilder sb = new StringBuilder(); + sb.append("Ignored ").append(ignoredDataPoints).append(" data points due to the following reasons:\n"); + int count = 0; + for (String message : ignoredDataPointMessages) { + sb.append(" - ").append(message).append("\n"); + count++; + if (count >= limit) { + sb.append(" - ... and more\n"); + break; + } + } + return sb.toString(); + } private ResourceGroup getOrCreateResourceGroup(ResourceMetrics resourceMetrics) { diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportActionTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportActionTests.java index 648df30555d27..d19b2b1f5d569 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportActionTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportActionTests.java @@ -37,6 +37,7 @@ import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; @@ -76,12 +77,13 @@ public void testSuccessEmptyRequest() throws Exception { public void test429() throws Exception { BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { - failureResponse(RestStatus.TOO_MANY_REQUESTS, "too many requests"), + failureResponse("metrics-generic.otel-default", RestStatus.TOO_MANY_REQUESTS, "too many requests"), successResponse() }; MetricsResponse response = executeRequest(createMetricsRequest(createMetric()), new BulkResponse(bulkItemResponses, 0)); assertThat(response.getStatus(), equalTo(RestStatus.TOO_MANY_REQUESTS)); - ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array()); + ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array()) + .getPartialSuccess(); assertThat( metricsServiceResponse.getRejectedDataPoints(), equalTo(Arrays.stream(bulkItemResponses).filter(BulkItemResponse::isFailed).count()) @@ -92,13 +94,25 @@ public void test429() throws Exception { public void testPartialSuccess() throws Exception { MetricsResponse response = executeRequest( createMetricsRequest(createMetric()), - new BulkResponse(new BulkItemResponse[] { failureResponse(RestStatus.BAD_REQUEST, "bad request") }, 0) + new BulkResponse( + new BulkItemResponse[] { + failureResponse("metrics-generic.otel-default", RestStatus.BAD_REQUEST, "bad request 1"), + failureResponse("metrics-generic.otel-default", RestStatus.BAD_REQUEST, "bad request 2"), + failureResponse("metrics-hostmetricsreceiver.otel-default", RestStatus.BAD_REQUEST, "bad request 3"), + failureResponse("metrics-generic.otel-default", RestStatus.INTERNAL_SERVER_ERROR, "internal server error") }, + 0 + ) ); assertThat(response.getStatus(), equalTo(RestStatus.OK)); - ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array()); + ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array()) + .getPartialSuccess(); assertThat(metricsServiceResponse.getRejectedDataPoints(), equalTo(1L)); - assertThat(metricsServiceResponse.getErrorMessage(), containsString("bad request")); + // the error message contains only one message per unique index and error status + assertThat(metricsServiceResponse.getErrorMessage(), containsString("bad request 1")); + assertThat(metricsServiceResponse.getErrorMessage(), not(containsString("bad request 2"))); + assertThat(metricsServiceResponse.getErrorMessage(), containsString("bad request 3")); + assertThat(metricsServiceResponse.getErrorMessage(), containsString("internal server error")); } public void testBulkError() throws Exception { @@ -113,7 +127,8 @@ private void assertExceptionStatus(Exception exception, RestStatus restStatus) t MetricsResponse response = executeRequest(createMetricsRequest(createMetric()), exception); assertThat(response.getStatus(), equalTo(restStatus)); - ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array()); + ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array()) + .getPartialSuccess(); assertThat(metricsServiceResponse.getRejectedDataPoints(), equalTo(1L)); assertThat(metricsServiceResponse.getErrorMessage(), equalTo(exception.getMessage())); } @@ -172,11 +187,11 @@ private static BulkItemResponse successResponse() { return BulkItemResponse.success(-1, DocWriteRequest.OpType.CREATE, mock(DocWriteResponse.class)); } - private static BulkItemResponse failureResponse(RestStatus restStatus, String failureMessage) { + private static BulkItemResponse failureResponse(String index, RestStatus restStatus, String failureMessage) { return BulkItemResponse.failure( -1, DocWriteRequest.OpType.CREATE, - new BulkItemResponse.Failure("index", "id", new RuntimeException(failureMessage), restStatus) + new BulkItemResponse.Failure(index, "id", new RuntimeException(failureMessage), restStatus) ); } diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java index 2b2071acc171a..85b91457e0ec1 100644 --- a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java @@ -74,7 +74,7 @@ public void testGroupingSameGroup() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(6, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -96,7 +96,7 @@ public void testGroupingDifferentTargetIndex() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); List targetIndexes = new ArrayList<>(); @@ -119,7 +119,7 @@ public void testGroupingDifferentGroupUnit() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -136,7 +136,7 @@ public void testGroupingDuplicateNameSameTimeSeries() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(2, context.totalDataPoints()); assertEquals(1, context.getIgnoredDataPoints()); - assertThat(context.getIgnoredDataPointsMessage(), containsString("Duplicate metric name 'system.cpu.usage' for timestamp")); + assertThat(context.getIgnoredDataPointsMessage(10), containsString("Duplicate metric name 'system.cpu.usage' for timestamp")); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -184,7 +184,7 @@ public void testGroupingDifferentResource() throws Exception { context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build()); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -216,7 +216,7 @@ public void testGroupingDifferentScope() throws Exception { context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource1, resource2)).build()); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -234,7 +234,7 @@ public void testGroupingDifferentGroupTimestamp() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -256,7 +256,7 @@ public void testGroupingDifferentGroupAttributes() throws Exception { context.groupDataPoints(metricsRequest); assertEquals(2, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); AtomicInteger groupCount = new AtomicInteger(0); context.consume(dataPointGroup -> groupCount.incrementAndGet()); @@ -280,7 +280,7 @@ public void testReceiverBasedRouting() throws Exception { context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource)).build()); assertEquals(1, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); List targetIndexes = new ArrayList<>(); context.consume(dataPointGroup -> targetIndexes.add(dataPointGroup.targetIndex().index())); @@ -303,7 +303,7 @@ public void testReceiverBasedRoutingWithoutTrailingSlash() throws Exception { context.groupDataPoints(ExportMetricsServiceRequest.newBuilder().addAllResourceMetrics(List.of(resource)).build()); assertEquals(1, context.totalDataPoints()); assertEquals(0, context.getIgnoredDataPoints()); - assertEquals("", context.getIgnoredDataPointsMessage()); + assertEquals("", context.getIgnoredDataPointsMessage(10)); List targetIndexes = new ArrayList<>(); context.consume(dataPointGroup -> targetIndexes.add(dataPointGroup.targetIndex().index())); From 0108616c34d1bc37b01ab8412db25ffe74a3c1cc Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 7 Nov 2025 12:01:21 +0100 Subject: [PATCH 2/3] Update docs/changelog/137718.yaml --- docs/changelog/137718.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/137718.yaml diff --git a/docs/changelog/137718.yaml b/docs/changelog/137718.yaml new file mode 100644 index 0000000000000..a40596209ac6d --- /dev/null +++ b/docs/changelog/137718.yaml @@ -0,0 +1,5 @@ +pr: 137718 +summary: "OTLP: return correct response type for partial successes" +area: TSDB +type: bug +issues: [] From 34f5f70754e8d7a8f28030f3b5c7d691bda84ec5 Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Mon, 10 Nov 2025 14:13:04 +0100 Subject: [PATCH 3/3] Add comment about failureGroups map structure --- .../xpack/oteldata/otlp/OTLPMetricsTransportAction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java index 17c5f76ee4714..15a085d1ea9c8 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java @@ -172,6 +172,7 @@ private static void handlePartialSuccess( DataPointGroupingContext context, ActionListener listener ) { + // index -> status -> failure group Map> failureGroups = new HashMap<>(); // If the request is only partially accepted // (i.e. when the server accepts only parts of the data and rejects the rest),