Skip to content
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.search;

import org.apache.http.HttpEntity;
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.elasticsearch.client.Request;
Expand All @@ -16,7 +18,6 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
Expand All @@ -31,7 +32,7 @@
reason = "testing debug log output to identify race condition",
value = "org.elasticsearch.xpack.search.MutableSearchResponse:DEBUG,org.elasticsearch.xpack.search.AsyncSearchTask:DEBUG"
)
public class AsyncSearchErrorTraceIT extends ESIntegTestCase {
public class AsyncSearchErrorTraceIT extends AsyncSearchIntegTestCase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed for the fix or is it an unrelated improvement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn’t necessary for this test; I added it to keep it consistent with the other tests in the same package.


@Override
protected boolean addMockHttpTransport() {
Expand Down Expand Up @@ -77,10 +78,18 @@ public void testAsyncSearchFailingQueryErrorTraceDefault() throws Exception {
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
ErrorTraceHelper.expectStackTraceCleared(internalCluster());
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

// check that the stack trace was not sent from the data node to the coordinating node
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me, I see a similar thing is done in CCSDuelIT for async searches.

Copy link
Contributor Author

@drempapis drempapis Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, Ben, for the review. Yes, this is actually the most important part of this PR, ensuring that the entry (with id) in the index is deleted before the test reaches the “after test cleanup,” where the exception is thrown.

}
}

Expand All @@ -103,11 +112,19 @@ public void testAsyncSearchFailingQueryErrorTraceTrue() throws Exception {
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
ErrorTraceHelper.expectStackTraceObserved(internalCluster());
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

// check that the stack trace was sent from the data node to the coordinating node
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
}

Expand All @@ -130,11 +147,19 @@ public void testAsyncSearchFailingQueryErrorTraceFalse() throws Exception {
createAsyncRequest.addParameter("wait_for_completion_timeout", "0ms");
ErrorTraceHelper.expectStackTraceCleared(internalCluster());
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

// check that the stack trace was not sent from the data node to the coordinating node
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
}

Expand Down Expand Up @@ -169,19 +194,24 @@ public void testDataNodeLogsStackTrace() throws Exception {
try (var mockLog = MockLog.capture(SearchService.class)) {
ErrorTraceHelper.addSeenLoggingExpectations(numShards, mockLog, errorTriggeringIndex);
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
// Use the same value of error_trace as the search request
if (errorTraceValue == 0) {
getAsyncRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
getAsyncRequest.addParameter("error_trace", "false");
} // else empty
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

mockLog.assertAllExpectationsMatched();
try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
// Use the same value of error_trace as the search request
if (errorTraceValue == 0) {
getAsyncRequest.addParameter("error_trace", "true");
} else if (errorTraceValue == 1) {
getAsyncRequest.addParameter("error_trace", "false");
} // else empty
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

mockLog.assertAllExpectationsMatched();
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
}
}

Expand All @@ -204,11 +234,19 @@ public void testAsyncSearchFailingQueryErrorTraceFalseOnSubmitAndTrueOnGet() thr
createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms");
ErrorTraceHelper.expectStackTraceCleared(internalCluster());
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);

try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "true");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

// check that the stack trace was not sent from the data node to the coordinating node
ErrorTraceHelper.assertStackTraceCleared(internalCluster());
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
}

Expand All @@ -231,18 +269,56 @@ public void testAsyncSearchFailingQueryErrorTraceTrueOnSubmitAndFalseOnGet() thr
createAsyncSearchRequest.addParameter("wait_for_completion_timeout", "0ms");
ErrorTraceHelper.expectStackTraceObserved(internalCluster());
Map<String, Object> createAsyncResponseEntity = performRequestAndGetResponseEntity(createAsyncSearchRequest);
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);
try {
if (Boolean.TRUE.equals(createAsyncResponseEntity.get("is_running"))) {
String asyncExecutionId = (String) createAsyncResponseEntity.get("id");
Request getAsyncRequest = new Request("GET", "/_async_search/" + asyncExecutionId);
getAsyncRequest.addParameter("error_trace", "false");
awaitAsyncRequestDoneRunning(getAsyncRequest);
}

// check that the stack trace was sent from the data node to the coordinating node
ErrorTraceHelper.assertStackTraceObserved(internalCluster());
} finally {
deleteAsyncSearchIfPresent(createAsyncResponseEntity);
}
}

private Map<String, Object> performRequestAndGetResponseEntity(Request r) throws IOException {
Response response = getRestClient().performRequest(r);
XContentType entityContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue());
return XContentHelper.convertToMap(entityContentType.xContent(), response.getEntity().getContent(), false);

HttpEntity entity = response.getEntity();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what this entity stuff has to do with the test errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess nothing! :) I got the idea from ESRestTestCase, to ensure that the connection is released back to the pool regardless of what happens. Upon re-examining this, the ShardLockObtainFailedException is a server-side issue related to the shard lifecycle, and consuming the entity doesn’t affect shard locks, it only impacts client connection reuse. I’ll revert this part.

However, I’m keeping it in the deleteAsyncSearchIfPresent to ensure that the Http connection is fully consumed and returned to the pool before teardown.

if (entity == null) {
return Map.of();
}

try {
XContentType entityContentType = XContentType.fromMediaType(entity.getContentType().getValue());
return XContentHelper.convertToMap(entityContentType.xContent(), entity.getContent(), false);
} finally {
// Make sure the connection is released
EntityUtils.consumeQuietly(entity);
}
}

private void deleteAsyncSearchIfPresent(Map<String, Object> map) throws IOException {
String id = (String) map.get("id");
if (id != null) {
return;
}

// Make sure the .async-search system index is green before deleting it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ensure green?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the time we reach the cleanup phase, the .async-search shard may still be relocating or recovering, which is when shard-lock timeouts are most likely to occur during test teardown. To prevent this, we ensure that the .async-search system index is fully ready and stable before deleting the async search result.

try {
ensureGreen(".async-search");
} catch (Exception ignore) {
// the index may not exist
}

Response response = getRestClient().performRequest(new Request("DELETE", "/_async_search/" + id));
HttpEntity entity = response.getEntity();
if (entity != null) {
EntityUtils.consumeQuietly(entity);
}
}

private void awaitAsyncRequestDoneRunning(Request getAsyncRequest) throws Exception {
Expand Down