diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientImplTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientImplTest.java index cbc9301142f4..6d342d43eeea 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientImplTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/RxDocumentClientImplTest.java @@ -448,7 +448,9 @@ private static RxDocumentServiceResponse mockRxDocumentServiceResponse(String co HttpResponseStatus.OK.code(), headers, new ByteBufInputStream(Unpooled.wrappedBuffer(blob), true), - blob.length); + blob.length, + null, + null); RxDocumentServiceResponse documentServiceResponse = new RxDocumentServiceResponse(new DiagnosticsClientContext() { diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/StoreResponseBuilder.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/StoreResponseBuilder.java index 96fb55bd0922..500d2a67b0d9 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/StoreResponseBuilder.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/StoreResponseBuilder.java @@ -95,8 +95,8 @@ public StoreResponseBuilder withContent(String content) { public StoreResponse build() { ByteBuf buffer = getUTF8BytesOrNull(content); if (buffer == null) { - return new StoreResponse(null, status, headers, null, 0); + return new StoreResponse(null, status, headers, null, 0, null, null); } - return new StoreResponse(null, status, headers, new ByteBufInputStream(buffer, true), buffer.readableBytes()); + return new StoreResponse(null, status, headers, new ByteBufInputStream(buffer, true), buffer.readableBytes(), null, null); } } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/CosmosBulkItemResponseTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/CosmosBulkItemResponseTest.java index 019cb9a5b213..31e86ac065ee 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/CosmosBulkItemResponseTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/CosmosBulkItemResponseTest.java @@ -83,7 +83,9 @@ public void validateAllSetValuesInCosmosBulkItemResponse() { HttpResponseStatus.OK.code(), headers, new ByteBufInputStream(Unpooled.wrappedBuffer(blob), true), - blob.length); + blob.length, + null, + null); CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse( new RxDocumentServiceResponse(null, storeResponse), @@ -166,7 +168,9 @@ public void validateEmptyHeaderInCosmosBulkItemResponse() { HttpResponseStatus.OK.code(), new HashMap<>(), new ByteBufInputStream(Unpooled.wrappedBuffer(blob), true), - blob.length); + blob.length, + null, + null); CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse( new RxDocumentServiceResponse(null, storeResponse), diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBatchResponseTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBatchResponseTests.java index f631d5268f59..dfaa4adaf527 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBatchResponseTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/batch/TransactionalBatchResponseTests.java @@ -79,7 +79,9 @@ public void validateAllSetValuesInResponse() { HttpResponseStatus.OK.code(), headers, new ByteBufInputStream(Unpooled.wrappedBuffer(blob), true), - blob.length); + blob.length, + null, + null); CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse( new RxDocumentServiceResponse(null, storeResponse), @@ -143,7 +145,9 @@ public void validateEmptyHeaderInResponse() { HttpResponseStatus.OK.code(), new HashMap<>(), new ByteBufInputStream(Unpooled.wrappedBuffer(blob), true), - blob.length); + blob.length, + null, + null); CosmosBatchResponse batchResponse = BatchResponseParser.fromDocumentServiceResponse( new RxDocumentServiceResponse(null, storeResponse), diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java index 9ede1c281219..39214066a398 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriterTest.java @@ -213,7 +213,7 @@ public void getLsnAndGlobalCommittedLsn() { headers.put(WFConstants.BackendHeaders.LSN, "3"); headers.put(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, "2"); - StoreResponse sr = new StoreResponse(null, 0, headers, null, 0); + StoreResponse sr = new StoreResponse(null, 0, headers, null, 0, null, null); Utils.ValueHolder lsn = Utils.ValueHolder.initialize(-2l); Utils.ValueHolder globalCommittedLsn = Utils.ValueHolder.initialize(-2l); ConsistencyWriter.getLsnAndGlobalCommittedLsn(sr, lsn, globalCommittedLsn); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayloadTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayloadTests.java index 16a15157118d..afe80f8364f6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayloadTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayloadTests.java @@ -28,7 +28,7 @@ public void parsingBytesWithInvalidUT8Bytes() { try { byte[] bytes = hexStringToByteArray(invalidHexString); ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes); - JsonNodeStorePayload jsonNodeStorePayload = new JsonNodeStorePayload(new ByteBufInputStream(byteBuf), bytes.length, new HashMap<>()); + JsonNodeStorePayload jsonNodeStorePayload = new JsonNodeStorePayload(new ByteBufInputStream(byteBuf), bytes.length, new HashMap<>(), null, null); jsonNodeStorePayload.getPayload().toString(); } finally { System.clearProperty("COSMOS.CHARSET_DECODER_ERROR_ACTION_ON_MALFORMED_INPUT"); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseTest.java index 1e6c6cc147f8..b2166cee2851 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseTest.java @@ -22,7 +22,7 @@ public void stringContent() { headerMap.put("key2", "value2"); ByteBuf buffer = getUTF8BytesOrNull(jsonContent); - StoreResponse sp = new StoreResponse(null, 200, headerMap, new ByteBufInputStream(buffer, true), buffer.readableBytes()); + StoreResponse sp = new StoreResponse(null, 200, headerMap, new ByteBufInputStream(buffer, true), buffer.readableBytes(), null, null); assertThat(sp.getStatus()).isEqualTo(200); assertThat(sp.getResponseBodyAsJson().get("id").asText()).isEqualTo(content); @@ -39,7 +39,7 @@ public void headerNamesAreCaseInsensitive() { headerMap.put("KEY3", "value3"); ByteBuf buffer = getUTF8BytesOrNull(jsonContent); - StoreResponse sp = new StoreResponse(null, 200, headerMap, new ByteBufInputStream(buffer, true), buffer.readableBytes()); + StoreResponse sp = new StoreResponse(null, 200, headerMap, new ByteBufInputStream(buffer, true), buffer.readableBytes(), null, null); assertThat(sp.getStatus()).isEqualTo(200); assertThat(sp.getResponseBodyAsJson().get("id").asText()).isEqualTo(content); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResultDiagnosticsSerializerTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResultDiagnosticsSerializerTests.java index 18eea0e3cc5e..1db78a492e3b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResultDiagnosticsSerializerTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResultDiagnosticsSerializerTests.java @@ -37,7 +37,7 @@ public StoreResultDiagnosticsSerializerTests() throws IOException { //TODO: add more test cases @Test(groups = "unit") public void storeResultDiagnosticsSerializerTests() { - StoreResponse storeResponse = new StoreResponse(null, 200, new HashMap<>(), null, 0); + StoreResponse storeResponse = new StoreResponse(null, 200, new HashMap<>(), null, 0, null, null); StoreResult storeResult = new StoreResult( storeResponse, null, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java index a543309ef37a..ad604b1ae501 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java @@ -20,6 +20,8 @@ import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.InternalObjectNode; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.Utils; @@ -53,6 +55,7 @@ import com.azure.cosmos.test.faultinjection.FaultInjectionRuleBuilder; import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorResult; import com.azure.cosmos.test.faultinjection.FaultInjectionServerErrorType; +import com.azure.cosmos.test.faultinjection.JsonParseInterceptorHelper; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -2101,6 +2104,72 @@ public void verifyLeasesOnRestart_AfterSplit() throws InterruptedException { } } + /** + * Tests that ChangeFeedProcessor gracefully handles StreamConstraintsException during JSON parsing. + * + *

This test uses a GLOBAL interceptor to inject a StreamConstraintsException on the first + * JSON parse operation across all threads (including CFP worker threads from thread pools).

+ * + *

IMPORTANT: This test should NOT run in parallel with other tests that use + * the JSON parse interceptor, as the interceptor is global and would cause interference.

+ * + *

Expected behavior: The ChangeFeedProcessor should retry with reduced maxItemCount and + * eventually process all documents successfully.

+ */ + @Test(groups = { "long-emulator" }, timeOut = 50000, singleThreaded = true) + public void changeFeedProcessorHandlesStreamConstraintsException() throws Exception { + CosmosAsyncContainer feedContainer = createFeedCollection(FEED_COLLECTION_THROUGHPUT); + CosmosAsyncContainer leaseContainer = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); + + try { + List createdDocuments = new ArrayList<>(); + Map receivedDocuments = new ConcurrentHashMap<>(); + + // Create documents + setupReadFeedDocuments(createdDocuments, feedContainer, FEED_COUNT); + + // Set up GLOBAL interceptor to throw StreamConstraintsException once + // Works across all threads (main thread + CFP worker threads from thread pools) + // NOTE: Test marked as singleThreaded to prevent interference with parallel tests + try (AutoCloseable interceptor = JsonParseInterceptorHelper.injectStreamConstraintsExceptionOnce(OperationType.ReadFeed, ResourceType.Document)) { + + ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions(); + options.setMaxItemCount(100); + options.setStartFromBeginning(true); + + ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder() + .hostName(hostName) + .feedContainer(feedContainer) + .leaseContainer(leaseContainer) + .options(options) + .handleLatestVersionChanges(changeFeedProcessorHandler(receivedDocuments)) + .buildChangeFeedProcessor(); + + try { + startChangeFeedProcessor(changeFeedProcessor); + + // Wait for documents to be processed + Thread.sleep(10000); + + safeStopChangeFeedProcessor(changeFeedProcessor); + + // Verify all documents were processed despite the StreamConstraintsException + assertThat(receivedDocuments.size()).isEqualTo(FEED_COUNT); + + logger.info("Successfully processed {} documents after handling StreamConstraintsException", + receivedDocuments.size()); + + } finally { + Thread.sleep(2000); + } + } + + } finally { + safeDeleteCollection(feedContainer); + safeDeleteCollection(leaseContainer); + } + } + private void startChangeFeedProcessor(ChangeFeedProcessor changeFeedProcessor) { changeFeedProcessor .start() @@ -2321,21 +2390,21 @@ private Consumer> leasesChangeFeedProcessorHandler }; } - @BeforeMethod(groups = { "query", "cfp-split" }, timeOut = 2 * SETUP_TIMEOUT, alwaysRun = true) + @BeforeMethod(groups = { "query", "cfp-split", "long-emulator" }, timeOut = 2 * SETUP_TIMEOUT, alwaysRun = true) public void beforeMethod() { } - @BeforeClass(groups = { "query", "cfp-split" }, timeOut = SETUP_TIMEOUT, alwaysRun = true) + @BeforeClass(groups = { "query", "cfp-split", "long-emulator" }, timeOut = SETUP_TIMEOUT, alwaysRun = true) public void before_ChangeFeedProcessorTest() { client = getClientBuilder().buildAsyncClient(); createdDatabase = getSharedCosmosDatabase(client); } - @AfterMethod(groups = { "query", "cfp-split" }, timeOut = 3 * SHUTDOWN_TIMEOUT, alwaysRun = true) + @AfterMethod(groups = { "query", "cfp-split", "long-emulator" }, timeOut = 3 * SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterMethod() { } - @AfterClass(groups = { "query", "cfp-split" }, timeOut = 2 * SHUTDOWN_TIMEOUT, alwaysRun = true) + @AfterClass(groups = { "query", "cfp-split", "long-emulator" }, timeOut = 2 * SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeClose(client); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java index ad70d2997423..8f718534e305 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java @@ -20,6 +20,8 @@ import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.InternalObjectNode; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentClientImpl; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.changefeed.pkversion.ServiceItemLease; @@ -39,6 +41,7 @@ import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.models.ThroughputResponse; import com.azure.cosmos.rx.TestSuiteBase; +import com.azure.cosmos.test.faultinjection.JsonParseInterceptorHelper; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -86,8 +89,7 @@ public class IncrementalChangeFeedProcessorTest extends TestSuiteBase { private static final ObjectMapper OBJECT_MAPPER = Utils.getSimpleObjectMapper(); private CosmosAsyncDatabase createdDatabase; -// private final String databaseId = "testdb1"; -// private final String hostName = "TestHost1"; + private final String hostName = RandomStringUtils.randomAlphabetic(6); private final int FEED_COUNT = 10; private final int CHANGE_FEED_PROCESSOR_TIMEOUT = 5000; @@ -1762,6 +1764,77 @@ public void readFeedDocumentsWithThroughputControl() throws InterruptedException } } + /** + * Tests that ChangeFeedProcessor gracefully handles StreamConstraintsException during JSON parsing. + * + *

This test uses a GLOBAL interceptor to inject a StreamConstraintsException on the first + * JSON parse operation across all threads (including CFP worker threads from thread pools).

+ * + *

IMPORTANT: This test should NOT run in parallel with other tests that use + * the JSON parse interceptor, as the interceptor is global and would cause interference.

+ * + *

Expected behavior: The ChangeFeedProcessor should retry with reduced maxItemCount and + * eventually process all documents successfully.

+ */ + @Test(groups = { "long-emulator" }, timeOut = 50000, singleThreaded = true) + public void changeFeedProcessorHandlesStreamConstraintsException() throws Exception { + CosmosAsyncContainer feedContainer = createFeedCollection(FEED_COLLECTION_THROUGHPUT); + CosmosAsyncContainer leaseContainer = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT); + + try { + List createdDocuments = new ArrayList<>(); + Map receivedDocuments = new ConcurrentHashMap<>(); + + // Create documents + setupReadFeedDocuments(createdDocuments, feedContainer, FEED_COUNT); + + // Set up GLOBAL interceptor to throw StreamConstraintsException once + // Works across all threads (main thread + CFP worker threads from thread pools) + // NOTE: Test marked as singleThreaded to prevent interference with parallel tests + try (AutoCloseable interceptor = JsonParseInterceptorHelper.injectStreamConstraintsExceptionOnce(OperationType.ReadFeed, ResourceType.Document)) { + + ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions(); + options.setMaxItemCount(100); + options.setStartFromBeginning(true); + + ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder() + .hostName(hostName) + .feedContainer(feedContainer) + .leaseContainer(leaseContainer) + .options(options) + .handleChanges(docs -> { + logger.info("Received {} documents", docs.size()); + for (JsonNode doc : docs) { + receivedDocuments.put(doc.get("id").asText(), doc); + } + }) + .buildChangeFeedProcessor(); + + try { + startChangeFeedProcessor(changeFeedProcessor); + + // Wait for documents to be processed + Thread.sleep(10000); + + safeStopChangeFeedProcessor(changeFeedProcessor); + + // Verify all documents were processed despite the StreamConstraintsException + assertThat(receivedDocuments.size()).isEqualTo(FEED_COUNT); + + logger.info("Successfully processed {} documents after handling StreamConstraintsException", + receivedDocuments.size()); + + } finally { + Thread.sleep(2000); + } + } + + } finally { + safeDeleteCollection(feedContainer); + safeDeleteCollection(leaseContainer); + } + } + // Steps followed in this test // 1. Ingest 10 documents into the feed container. // 2. Start the LatestVersion / INCREMENTAL ChangeFeedProcessor with either startFromBeginning set to 'true' or 'false'. diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/test/faultinjection/JsonParseInterceptor.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/test/faultinjection/JsonParseInterceptor.java new file mode 100644 index 000000000000..3d7dcba11933 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/test/faultinjection/JsonParseInterceptor.java @@ -0,0 +1,38 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.test.faultinjection; + +import com.fasterxml.jackson.databind.JsonNode; + +import java.io.IOException; +import java.util.Map; + +/** + * Functional interface for intercepting JSON parsing in tests. + * This allows injecting faults during JSON deserialization for testing purposes. + */ +@FunctionalInterface +public interface JsonParseInterceptor { + /** + * Intercepts JSON parsing to allow fault injection. + * + * @param bytes the byte array containing JSON + * @param responseHeaders the response headers + * @param defaultParser the default parsing logic to delegate to + * @return the parsed JsonNode + * @throws IOException if parsing fails or fault is injected + */ + JsonNode intercept( + byte[] bytes, + Map responseHeaders, + DefaultJsonParser defaultParser + ) throws IOException; + + /** + * Functional interface for the default JSON parsing logic. + */ + @FunctionalInterface + interface DefaultJsonParser { + JsonNode parse(byte[] bytes, Map responseHeaders) throws IOException; + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/test/faultinjection/JsonParseInterceptorHelper.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/test/faultinjection/JsonParseInterceptorHelper.java new file mode 100644 index 000000000000..0dfed4c0e956 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/test/faultinjection/JsonParseInterceptorHelper.java @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.test.faultinjection; + +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.directconnectivity.JsonNodeStorePayload; +import com.fasterxml.jackson.core.exc.StreamConstraintsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Helper class to inject faults into JSON parsing for testing purposes. + * This should only be used in test scenarios. + * + *

IMPORTANT: This uses a GLOBAL interceptor that affects all threads. + * Tests using these methods should NOT run in parallel with other tests using interceptors. + * Use TestNG's singleThreaded configuration or similar mechanisms for test isolation.

+ */ +public class JsonParseInterceptorHelper { + private static final Logger logger = LoggerFactory.getLogger(JsonParseInterceptorHelper.class); + + /** + * Sets an interceptor that throws StreamConstraintsException once, then delegates to default parser. + * Useful for testing retry logic in change feed processor. + * + *

WARNING: This sets a GLOBAL interceptor. The test using this method + * should NOT run in parallel with other tests to avoid interference.

+ * + * @return AutoCloseable that clears the interceptor when closed + */ + public static AutoCloseable injectStreamConstraintsExceptionOnce(OperationType operationType, ResourceType resourceType) { + AtomicInteger callCount = new AtomicInteger(0); + + JsonNodeStorePayload.TestOnlyJsonParseInterceptor interceptor = (bytes, responseHeaders, defaultParser, actualOperationType, actualResourceType) -> { + if (operationType.equals(actualOperationType) && resourceType.equals(actualResourceType)) { + + int count = callCount.incrementAndGet(); + + if (count == 1) { + logger.info("JsonParseInterceptor: Injecting StreamConstraintsException (call #{})", count); + throw new StreamConstraintsException("Test-injected StreamConstraintsException"); + } + } + + logger.debug("JsonParseInterceptor: Delegating to default parser (call #{})", callCount.get()); + return defaultParser.parse(bytes, responseHeaders); + }; + + JsonNodeStorePayload.setTestOnlyJsonParseInterceptor(interceptor); + + return () -> { + logger.info("JsonParseInterceptor: Clearing interceptor after {} calls", callCount.get()); + JsonNodeStorePayload.clearTestOnlyJsonParseInterceptor(); + }; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index b656b84e66db..72b875324809 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -250,6 +250,11 @@ public class Configs { public static final String CHARSET_DECODER_ERROR_ACTION_ON_UNMAPPED_CHARACTER = "COSMOS.CHARSET_DECODER_ERROR_ACTION_ON_UNMAPPED_CHARACTER"; public static final String DEFAULT_CHARSET_DECODER_ERROR_ACTION_ON_UNMAPPED_CHARACTER = StringUtils.EMPTY; + // Config to enable recovery for CFP on malformed response + public static final String CHANGE_FEED_PROCESSOR_MALFORMED_RESPONSE_RECOVERY_ENABLED = "COSMOS.CHANGE_FEED_PROCESSOR_MALFORMED_RESPONSE_RECOVERY_ENABLED"; + public static final String CHANGE_FEED_PROCESSOR_MALFORMED_RESPONSE_RECOVERY_ENABLED_VARIABLE = "COSMOS_CHANGE_FEED_PROCESSOR_MALFORMED_RESPONSE_RECOVERY_ENABLED"; + public static final boolean DEFAULT_CHANGE_FEED_PROCESSOR_MALFORMED_RESPONSE_RECOVERY_ENABLED = true; + // Metrics // Samples: // System.setProperty( @@ -1229,6 +1234,16 @@ public static int getWarnLevelLoggingThresholdForPpaf() { return Integer.parseInt(warnLevelLoggingThresholdForPpaf); } + public static boolean isChangeFeedProcessorMalformedResponseRecoveryEnabled() { + String isMalformedResponseRecoveryEnabled = System.getProperty( + CHANGE_FEED_PROCESSOR_MALFORMED_RESPONSE_RECOVERY_ENABLED, + firstNonNull( + emptyToNull(System.getenv().get(CHANGE_FEED_PROCESSOR_MALFORMED_RESPONSE_RECOVERY_ENABLED_VARIABLE)), + String.valueOf(DEFAULT_CHANGE_FEED_PROCESSOR_MALFORMED_RESPONSE_RECOVERY_ENABLED))); + + return Boolean.parseBoolean(isMalformedResponseRecoveryEnabled); + } + public static String getAzureMonitorConnectionString() { return System.getProperty( APPLICATIONINSIGHTS_CONNECTION_STRING, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index 63c238f0e6ab..401904199b24 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -466,6 +466,7 @@ public static class SubStatusCodes { public static final int INVALID_RESULT = 20910; public static final int CLOSED_CLIENT = 20912; public static final int PPCB_INVALID_STATE = 20913; + public static final int JACKSON_STREAMS_CONSTRAINED = 20914; //SDK Codes (Server) // IMPORTANT - whenever possible use consistency substatus codes that .Net SDK also uses diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java index 906fc518f2f7..ca5e5cbb84d5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java @@ -25,8 +25,6 @@ import com.azure.cosmos.implementation.http.HttpTransportSerializer; import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord; import com.azure.cosmos.implementation.interceptor.ITransportClientInterceptor; -import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker; -import com.azure.cosmos.implementation.perPartitionCircuitBreaker.LocationSpecificHealthContext; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; @@ -233,7 +231,9 @@ public StoreResponse unwrapToStoreResponse( statusCode, HttpUtils.unescape(headers.toLowerCaseMap()), new ByteBufInputStream(retainedContent, true), - size); + size, + request.getOperationType(), + request.getResourceType()); } else { retainedContent.release(); } @@ -243,7 +243,9 @@ public StoreResponse unwrapToStoreResponse( statusCode, HttpUtils.unescape(headers.toLowerCaseMap()), null, - 0); + 0, + request.getOperationType(), + request.getResourceType()); } private Mono query(RxDocumentServiceRequest request) { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ExceptionClassifier.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ExceptionClassifier.java index 1092a5a08f28..760b867b4ff6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ExceptionClassifier.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ExceptionClassifier.java @@ -3,6 +3,7 @@ package com.azure.cosmos.implementation.changefeed.common; import com.azure.cosmos.CosmosException; +import com.azure.cosmos.implementation.HttpConstants; /** * Classifies exceptions based on the status codes. @@ -27,6 +28,14 @@ public static StatusCodeErrorType classifyClientException(CosmosException client return StatusCodeErrorType.PARTITION_NOT_FOUND; } + if (clientException.getStatusCode() == HttpConstants.StatusCodes.BADREQUEST) { + if (subStatusCode == HttpConstants.SubStatusCodes.JACKSON_STREAMS_CONSTRAINED) { + return StatusCodeErrorType.JACKSON_STREAMS_CONSTRAINED; + } else if (subStatusCode == HttpConstants.SubStatusCodes.FAILED_TO_PARSE_SERVER_RESPONSE) { + return StatusCodeErrorType.JSON_PARSING_ERROR; + } + } + if (clientException.getStatusCode() == ChangeFeedHelper.HTTP_STATUS_CODE_GONE && (subStatusCode == SubStatusCode_PartitionKeyRangeGone || subStatusCode == SubStatusCode_Splitting_Or_Merging)) { return StatusCodeErrorType.PARTITION_SPLIT_OR_MERGE; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/StatusCodeErrorType.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/StatusCodeErrorType.java index eae515e7dda3..a8c55788374b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/StatusCodeErrorType.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/StatusCodeErrorType.java @@ -10,5 +10,7 @@ public enum StatusCodeErrorType { PARTITION_NOT_FOUND, PARTITION_SPLIT_OR_MERGE, TRANSIENT_ERROR, - MAX_ITEM_COUNT_TOO_LARGE + MAX_ITEM_COUNT_TOO_LARGE, + JACKSON_STREAMS_CONSTRAINED, + JSON_PARSING_ERROR } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java index 8baff17be50b..f865b44ac9c8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImpl.java @@ -4,7 +4,9 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.ThroughputControlGroupConfig; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.CosmosSchedulers; +import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.changefeed.CancellationToken; @@ -27,6 +29,7 @@ import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.ModelBridgeInternal; +import com.azure.cosmos.util.CosmosChangeFeedContinuationTokenUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -34,6 +37,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -58,6 +62,8 @@ class PartitionProcessorImpl implements PartitionProcessor { private volatile String lastServerContinuationToken; private volatile boolean hasMoreResults; private volatile boolean hasServerContinuationTokenChange; + private final int maxStreamsConstrainedRetries = 1; + private final AtomicInteger streamsConstrainedRetries = new AtomicInteger(0); private final FeedRangeThroughputControlConfigManager feedRangeThroughputControlConfigManager; private volatile Instant lastProcessedTime; @@ -82,6 +88,7 @@ public PartitionProcessorImpl(ChangeFeedObserver observer, settings.getStartState(), settings.getMaxItemCount(), this.changeFeedMode); + this.feedRangeThroughputControlConfigManager = feedRangeThroughputControlConfigManager; this.lastProcessedTime = Instant.now(); } @@ -187,6 +194,8 @@ public Mono run(CancellationToken cancellationToken) { if (this.options.getMaxItemCount() != this.settings.getMaxItemCount()) { this.options.setMaxItemCount(this.settings.getMaxItemCount()); // Reset after successful execution. } + + this.streamsConstrainedRetries.set(0); }) .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { @@ -221,6 +230,7 @@ public Mono run(CancellationToken cancellationToken) { this.resultException = new RuntimeException(clientException); } break; + case MAX_ITEM_COUNT_TOO_LARGE: { if (this.options.getMaxItemCount() <= 1) { logger.error( @@ -246,8 +256,57 @@ public Mono run(CancellationToken cancellationToken) { return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer); }).flatMap(values -> Flux.empty()); } + break; } - break; + case JACKSON_STREAMS_CONSTRAINED: { + + if (!Configs.isChangeFeedProcessorMalformedResponseRecoveryEnabled()) { + logger.error( + "Lease with token : " + this.lease.getLeaseToken() + " : Streams constrained exception encountered. To enable automatic retries, please set the " + Configs.CHANGE_FEED_PROCESSOR_MALFORMED_RESPONSE_RECOVERY_ENABLED + " configuration to 'true'. Failing.", + clientException); + this.resultException = new RuntimeException(clientException); + return Flux.error(throwable); + } + + int retryCount = this.streamsConstrainedRetries.incrementAndGet(); + boolean shouldRetry = retryCount <= this.maxStreamsConstrainedRetries; + + if (!shouldRetry) { + logger.error( + "Lease with token : " + this.lease.getLeaseToken() + ": Reached max retries for streams constrained exception with statusCode : [" + clientException.getStatusCode() + "]" + " : subStatusCode " + clientException.getSubStatusCode() + " : message " + clientException.getMessage() + ", failing.", + clientException); + this.resultException = new RuntimeException(clientException); + return Flux.error(throwable); + } + + logger.warn( + "Lease with token : " + this.lease.getLeaseToken() + " : Streams constrained exception encountered, will retry. " + "retryCount " + retryCount + " of " + this.maxStreamsConstrainedRetries + " retries.", + clientException); + + if (this.options.getMaxItemCount() == -1) { + logger.warn( + "Lease with token : " + this.lease.getLeaseToken() + " : max item count is set to -1, will retry after setting it to 100. " + "retryCount " + retryCount + " of " + this.maxStreamsConstrainedRetries + " retries.", + clientException); + this.options.setMaxItemCount(1); + return Flux.empty(); + } + + if (this.options.getMaxItemCount() <= 1) { + logger.error( + "Lease with token : " + this.lease.getLeaseToken() + " Cannot reduce maxItemCount further as it's already at :" + this.options.getMaxItemCount(), clientException); + this.resultException = new RuntimeException(clientException); + return Flux.error(throwable); + } + + this.options.setMaxItemCount(1); + logger.warn("Lease with token : " + this.lease.getLeaseToken() + " Reducing maxItemCount, new value: " + this.options.getMaxItemCount()); + return Flux.empty(); + } + case JSON_PARSING_ERROR: + logger.error( + "Lease with token : " + this.lease.getLeaseToken() + ": Parsing error encountered.", clientException); + this.resultException = new RuntimeException(clientException); + return Flux.error(throwable); default: { logger.error( "Lease with token " + this.lease.getLeaseToken() + diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java index db5b4f4a87df..322fa5cd6adb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/pkversion/PartitionProcessorImpl.java @@ -4,9 +4,11 @@ import com.azure.cosmos.CosmosException; import com.azure.cosmos.ThroughputControlGroupConfig; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.CosmosSchedulers; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.changefeed.CancellationToken; import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient; @@ -28,6 +30,7 @@ import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.ModelBridgeInternal; +import com.azure.cosmos.util.CosmosChangeFeedContinuationTokenUtils; import com.fasterxml.jackson.databind.JsonNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +39,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; @@ -58,6 +62,8 @@ class PartitionProcessorImpl implements PartitionProcessor { private volatile String lastServerContinuationToken; private volatile boolean hasMoreResults; private volatile boolean hasServerContinuationTokenChange; + private final int maxStreamsConstrainedRetries = 1; + private final AtomicInteger streamsConstrainedRetries = new AtomicInteger(0); private final FeedRangeThroughputControlConfigManager feedRangeThroughputControlConfigManager; private volatile Instant lastProcessedTime; @@ -77,6 +83,7 @@ public PartitionProcessorImpl(ChangeFeedObserver observer, ChangeFeedState state = settings.getStartState(); this.options = ModelBridgeInternal.createChangeFeedRequestOptionsForChangeFeedState(state); this.options.setMaxItemCount(settings.getMaxItemCount()); + // For pk version, merge is not support, exclude it from the capabilities header ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor() .setHeader( @@ -194,6 +201,8 @@ public Mono run(CancellationToken cancellationToken) { if (this.options.getMaxItemCount() != this.settings.getMaxItemCount()) { this.options.setMaxItemCount(this.settings.getMaxItemCount()); // Reset after successful execution. } + + this.streamsConstrainedRetries.set(0); }) .onErrorResume(throwable -> { if (throwable instanceof CosmosException) { @@ -251,8 +260,58 @@ public Mono run(CancellationToken cancellationToken) { return !cancellationToken.isCancellationRequested() && currentTime.isBefore(stopTimer); }).flatMap(values -> Flux.empty()); } + + break; } - break; + case JACKSON_STREAMS_CONSTRAINED: { + + if (!Configs.isChangeFeedProcessorMalformedResponseRecoveryEnabled()) { + logger.error( + "Partition : " + this.lease.getLeaseToken() + " : Streams constrained exception encountered. To enable automatic retries, please set the " + Configs.CHANGE_FEED_PROCESSOR_MALFORMED_RESPONSE_RECOVERY_ENABLED + " configuration to 'true'. Failing.", + clientException); + this.resultException = new RuntimeException(clientException); + return Flux.error(throwable); + } + + int retryCount = this.streamsConstrainedRetries.incrementAndGet(); + boolean shouldRetry = retryCount <= this.maxStreamsConstrainedRetries; + + if (!shouldRetry) { + logger.error( + "Partition : " + this.lease.getLeaseToken() + ": Reached max retries for streams constrained exception with statusCode : [" + clientException.getStatusCode() + "]" + " : subStatusCode " + clientException.getSubStatusCode() + " : message " + clientException.getMessage() + ", failing.", + clientException); + this.resultException = new RuntimeException(clientException); + return Flux.error(throwable); + } + + logger.warn( + "Partition : " + this.lease.getLeaseToken() + " : Streams constrained exception encountered, will retry. " + "retryCount " + retryCount + " of " + this.maxStreamsConstrainedRetries + " retries.", + clientException); + + if (this.options.getMaxItemCount() == -1) { + logger.warn( + "Partition : " + this.lease.getLeaseToken() + " : max item count is set to -1, will retry after setting it to 100. " + "retryCount " + retryCount + " of " + this.maxStreamsConstrainedRetries + " retries.", + clientException); + this.options.setMaxItemCount(1); + return Flux.empty(); + } + + if (this.options.getMaxItemCount() <= 1) { + logger.error( + "Cannot reduce maxItemCount further as it's already at :" + this.options.getMaxItemCount(), clientException); + this.resultException = new RuntimeException(clientException); + return Flux.error(throwable); + } + + this.options.setMaxItemCount(1); + logger.warn("Reducing maxItemCount, new value: " + this.options.getMaxItemCount()); + return Flux.empty(); + } + case JSON_PARSING_ERROR: + logger.error( + "Partition : " + this.lease.getLeaseToken() + ": Parsing error encountered.", clientException); + this.resultException = new RuntimeException(clientException); + return Flux.error(throwable); default: { logger.error("Unrecognized Cosmos exception returned error code " + docDbError, clientException); this.resultException = new RuntimeException(clientException); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayload.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayload.java index bbf642ba667a..eeed41958bf7 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayload.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/JsonNodeStorePayload.java @@ -5,7 +5,10 @@ import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.Utils; +import com.fasterxml.jackson.core.exc.StreamConstraintsException; import com.fasterxml.jackson.databind.JsonNode; import io.netty.buffer.ByteBufInputStream; import io.netty.util.internal.StringUtil; @@ -19,27 +22,67 @@ import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; public class JsonNodeStorePayload implements StorePayload { private static final Logger logger = LoggerFactory.getLogger(JsonNodeStorePayload.class); private static final CharsetDecoder fallbackCharsetDecoder = getFallbackCharsetDecoder(); + + // Test-only interceptor for fault injection + // Using AtomicReference for thread-safe global interceptor + // This is a test-only feature and tests should ensure proper isolation through test orchestration + private static final AtomicReference globalTestInterceptor = new AtomicReference<>(); + private final int responsePayloadSize; private final JsonNode jsonValue; - public JsonNodeStorePayload(ByteBufInputStream bufferStream, int readableBytes, Map responseHeaders) { + public JsonNodeStorePayload( + ByteBufInputStream bufferStream, + int readableBytes, + Map responseHeaders, + OperationType operationType, + ResourceType resourceType) { + if (readableBytes > 0) { this.responsePayloadSize = readableBytes; - this.jsonValue = fromJson(bufferStream, readableBytes, responseHeaders); + this.jsonValue = fromJson(bufferStream, readableBytes, responseHeaders, operationType, resourceType); } else { this.responsePayloadSize = 0; this.jsonValue = null; } } - private static JsonNode fromJson(ByteBufInputStream bufferStream, int readableBytes, Map responseHeaders) { + private static JsonNode fromJson(ByteBufInputStream bufferStream, int readableBytes, Map responseHeaders, + OperationType operationType, ResourceType resourceType) { byte[] bytes = new byte[readableBytes]; try { bufferStream.read(bytes); + + // Allow test-only interceptor to inject faults before parsing + TestOnlyJsonParseInterceptor interceptor = globalTestInterceptor.get(); + if (interceptor != null) { + return interceptor.intercept(bytes, responseHeaders, + (b, h) -> fromJsonWithBytes(b, h), operationType, resourceType); + } + + return fromJsonWithBytes(bytes, responseHeaders); + } catch (IOException e) { + // IOException from read operation + String baseErrorMessage = "Failed to read JSON document from stream."; + logger.error(baseErrorMessage, e); + + IllegalStateException innerException = new IllegalStateException("Unable to read JSON stream.", e); + + throw Utils.createCosmosException( + HttpConstants.StatusCodes.BADREQUEST, + evaluateSubStatusCode(e), + innerException, + responseHeaders); + } + } + + private static JsonNode fromJsonWithBytes(byte[] bytes, Map responseHeaders) throws IOException { + try { return Utils.getSimpleObjectMapper().readTree(bytes); } catch (IOException e) { if (fallbackCharsetDecoder != null) { @@ -60,7 +103,7 @@ private static JsonNode fromJson(ByteBufInputStream bufferStream, int readableBy throw Utils.createCosmosException( HttpConstants.StatusCodes.BADREQUEST, - HttpConstants.SubStatusCodes.FAILED_TO_PARSE_SERVER_RESPONSE, + evaluateSubStatusCode(e), innerException, responseHeaders); } @@ -91,7 +134,7 @@ private static JsonNode fromJsonWithFallbackCharsetDecoder(byte[] bytes, Map responseHeaders, + DefaultJsonParser defaultParser, + OperationType operationType, + ResourceType resourceType + ) throws IOException; + + /** + * Functional interface for the default JSON parsing logic. + */ + @FunctionalInterface + interface DefaultJsonParser { + JsonNode parse(byte[] bytes, Map responseHeaders) throws IOException; + } + } + + /** + * Sets a test-only interceptor for JSON parsing globally. + * WARNING: This is intended for testing purposes only and should not be used in production code. + * + *

This sets a GLOBAL interceptor that affects all threads. Tests using this interceptor + * should NOT run in parallel with other tests to avoid interference. Use TestNG's + * singleThreaded = true or similar mechanisms to ensure test isolation.

+ * + *

The interceptor will be active across all threads including thread pool workers, + * making it suitable for testing multi-threaded components like ChangeFeedProcessor.

+ * + * @param interceptor the interceptor to set (null to clear) + */ + public static void setTestOnlyJsonParseInterceptor(TestOnlyJsonParseInterceptor interceptor) { + globalTestInterceptor.set(interceptor); + if (interceptor != null) { + logger.warn("GLOBAL test-only JSON parse interceptor has been set on thread {}. " + + "This affects ALL threads and should only be used in isolated test scenarios. " + + "Ensure tests using this do NOT run in parallel.", + Thread.currentThread().getName()); + } + } + + /** + * Clears the test-only interceptor. + */ + public static void clearTestOnlyJsonParseInterceptor() { + globalTestInterceptor.set(null); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java index bd5bf896eac4..e508e0e446cf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ResponseUtils.java @@ -4,7 +4,6 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.implementation.http.HttpHeaders; -import com.azure.cosmos.implementation.http.HttpRequest; import com.azure.cosmos.implementation.http.HttpResponse; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; @@ -29,7 +28,9 @@ static Mono toStoreResponse(HttpResponse httpClientResponse, Stri httpClientResponse.statusCode(), HttpUtils.unescape(httpResponseHeaders.toMap()), null, - 0); + 0, + null, + null); } return new StoreResponse( @@ -37,7 +38,9 @@ static Mono toStoreResponse(HttpResponse httpClientResponse, Stri httpClientResponse.statusCode(), HttpUtils.unescape(httpResponseHeaders.toMap()), new ByteBufInputStream(byteBufContent, true), - size); + size, + null, + null); }); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java index a4bab8f3edbe..13ad4007d9bd 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java @@ -4,7 +4,9 @@ package com.azure.cosmos.implementation.directconnectivity; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.RequestTimeline; +import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelAcquisitionTimeline; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdChannelStatistics; @@ -15,7 +17,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,11 +48,13 @@ public class StoreResponse { private final String endpoint; public StoreResponse( - String endpoint, - int status, - Map headerMap, - ByteBufInputStream contentStream, - int responsePayloadLength) { + String endpoint, + int status, + Map headerMap, + ByteBufInputStream contentStream, + int responsePayloadLength, + OperationType operationType, + ResourceType resourceType) { checkArgument((contentStream == null) == (responsePayloadLength == 0), "Parameter 'contentStream' must be consistent with 'responsePayloadLength'."); @@ -71,7 +74,7 @@ public StoreResponse( replicaStatusList = new HashMap<>(); if (contentStream != null) { try { - this.responsePayload = new JsonNodeStorePayload(contentStream, responsePayloadLength, headerMap); + this.responsePayload = new JsonNodeStorePayload(contentStream, responsePayloadLength, headerMap, operationType, resourceType); } finally { try { contentStream.close(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java index d7e8ccb56421..8c3c0fe48a84 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdRequestManager.java @@ -16,6 +16,7 @@ import com.azure.cosmos.implementation.LockedException; import com.azure.cosmos.implementation.MethodNotAllowedException; import com.azure.cosmos.implementation.NotFoundException; +import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionIsMigratingException; import com.azure.cosmos.implementation.PartitionKeyRangeGoneException; import com.azure.cosmos.implementation.PartitionKeyRangeIsSplittingException; @@ -23,6 +24,7 @@ import com.azure.cosmos.implementation.RequestEntityTooLargeException; import com.azure.cosmos.implementation.RequestRateTooLargeException; import com.azure.cosmos.implementation.RequestTimeoutException; +import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RetryWithException; import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.ServiceUnavailableException; @@ -1001,6 +1003,9 @@ private void messageReceived(final ChannelHandlerContext context, final RntbdRes final String requestUriAsString = requestRecord.args().physicalAddressUri() != null ? requestRecord.args().physicalAddressUri().getURI().toString() : null; + OperationType operationType = requestRecord.args().serviceRequest().getOperationType(); + ResourceType resourceType = requestRecord.args().serviceRequest().getResourceType(); + if ((HttpResponseStatus.OK.code() <= statusCode && statusCode < HttpResponseStatus.MULTIPLE_CHOICES.code()) || statusCode == HttpResponseStatus.NOT_MODIFIED.code()) { @@ -1008,7 +1013,7 @@ private void messageReceived(final ChannelHandlerContext context, final RntbdRes if (rntbdCtx == null) { throw new IllegalStateException("Expecting non-null rntbd context."); } - final StoreResponse storeResponse = response.toStoreResponse(rntbdCtx.serverVersion(), requestUriAsString); + final StoreResponse storeResponse = response.toStoreResponse(rntbdCtx.serverVersion(), requestUriAsString, operationType, resourceType); if (this.serverErrorInjector != null) { Consumer completeWithInjectedDelayConsumer = diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java index 1fb8b43d3fe8..d3976381cc8e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/rntbd/RntbdResponse.java @@ -3,6 +3,8 @@ package com.azure.cosmos.implementation.directconnectivity.rntbd; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; @@ -347,7 +349,7 @@ public static RntbdResponse decode(final ByteBuf in) { return new RntbdResponse(in.readSlice(end - start), frame, headers, content); } - StoreResponse toStoreResponse(final String serverVersion, final String endpoint) { + StoreResponse toStoreResponse(final String serverVersion, final String endpoint, OperationType operationType, ResourceType resourceType) { checkNotNull(serverVersion, "Argument 'serverVersion' must not be null."); @@ -359,7 +361,9 @@ StoreResponse toStoreResponse(final String serverVersion, final String endpoint) this.getStatus().code(), this.headers.asMap(serverVersion, this.getActivityId()), null, - 0); + 0, + operationType, + resourceType); } return new StoreResponse( @@ -367,7 +371,9 @@ StoreResponse toStoreResponse(final String serverVersion, final String endpoint) this.getStatus().code(), this.headers.asMap(serverVersion, this.getActivityId()), new ByteBufInputStream(this.content.retain(), true), - length); + length, + operationType, + resourceType); } // endregion diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ServerSideOnlyContinuationNonDocumentFetcherImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ServerSideOnlyContinuationNonDocumentFetcherImpl.java index 9efe5a2dae66..863bdaa2b4ac 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ServerSideOnlyContinuationNonDocumentFetcherImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ServerSideOnlyContinuationNonDocumentFetcherImpl.java @@ -53,7 +53,7 @@ public ServerSideOnlyContinuationNonDocumentFetcherImpl( checkNotNull(client, "Argument 'client' must not be null."); checkNotNull(createRequestFunc, "Argument 'createRequestFunc' must not be null."); - + this.createRequestFunc = createRequestFunc; this.continuationToken = continuationToken; this.retryPolicySupplier = () -> client.getResetSessionTokenRetryPolicy().getRequestPolicy(null); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java index 6e35e910f3b9..7e3c0d6e2f50 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/ChangeFeedProcessorOptions.java @@ -386,7 +386,7 @@ public ChangeFeedProcessorOptions setScheduler(Scheduler scheduler) { * Please use this config with caution. By default, CFP will try to process the changes as fast as possible, * only use this config if you want to limit the RU that can be used for your change feed processing. * By using this config, it can slow down the process and cause the lag. - * + * * For direct mode, please configure the throughput control group with the total RU you would allow for changeFeed processing. * For gateway mode, please configure the throughput control group with the total RU you would allow for changeFeed processing / total CFP Instances. *