Skip to content

Commit c36f1cb

Browse files
iit2009060lucasbru
authored andcommitted
KAFKA-19930: Adding process handler null check for global threads (#21016)
1. Added process handler null check for global threads. 2. Added unit test to verify it. 3. Updated the documentation. Reviewers: Matthias J. Sax <matthias@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
1 parent 7caa4ad commit c36f1cb

File tree

4 files changed

+36
-3
lines changed

4 files changed

+36
-3
lines changed

docs/streams/developer-guide/config-streams.html

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1036,7 +1036,10 @@ <h4><a class="toc-backref" href="#id30">probing.rebalance.interval.ms</a><a clas
10361036
<div><p>The processing exception handler allows you to manage exceptions triggered during the processing of a record. The implemented exception
10371037
handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning
10381038
<code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue
1039-
and continue processing. The following library built-in exception handlers are available:</p>
1039+
and continue processing.</p>
1040+
<p><strong>Note:</strong> This handler applies only to regular stream processing tasks. It does not apply to global state store updates
1041+
(global threads). Exceptions occurring in global threads will bubble up to the configured uncaught exception handler.</p>
1042+
<p>The following library built-in exception handlers are available:</p>
10401043
<ul class="simple">
10411044
<li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.html">LogAndContinueProcessingExceptionHandler</a>:
10421045
This handler logs the processing exception and then signals the processing pipeline to continue processing more records.

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,8 @@ public class StreamsConfig extends AbstractConfig {
574574
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.dead.letter.queue.topic.name";
575575

576576
private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If not null, the default exception handler will build and send a Dead Letter Queue record to the topic with the provided name if an error occurs.\n" +
577-
"If a custom deserialization/production or processing exception handler is set, this parameter is ignored for this handler.";
577+
"If a custom deserialization/production or processing exception handler is set, this parameter is ignored for this handler.\n" +
578+
"Note: This configuration applies only to regular stream processing tasks. It does not apply to global state store updates (global threads).";
578579

579580
/** {@code log.summary.interval.ms} */
580581
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
@@ -652,7 +653,9 @@ public class StreamsConfig extends AbstractConfig {
652653
@SuppressWarnings("WeakerAccess")
653654
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler";
654655
@Deprecated
655-
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface.";
656+
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface. " +
657+
"Note: This handler applies only to regular stream processing tasks. It does not apply to global state store updates (global threads). " +
658+
"Exceptions occurring in global threads will bubble up to the configured uncaught exception handler.";
656659

657660
/** {@code processing.guarantee} */
658661
@SuppressWarnings("WeakerAccess")

streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,15 @@ public void process(final Record<KIn, VIn> record) {
208208
// while Java distinguishes checked vs unchecked exceptions, other languages
209209
// like Scala or Kotlin do not, and thus we need to catch `Exception`
210210
// (instead of `RuntimeException`) to work well with those languages
211+
212+
// If the processing exception handler is not set (e.g., for global threads),
213+
// rethrow the exception to let it bubble up to the uncaught exception handler.
214+
// The processing exception handler is only set for regular stream tasks, not for
215+
// global state update tasks which use a different error handling mechanism.
216+
if (processingExceptionHandler == null) {
217+
throw processingException;
218+
}
219+
211220
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
212221
null, // only required to pass for DeserializationExceptionHandler
213222
internalProcessorContext.recordContext().topic(),

streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,24 @@ public void shouldNotThrowFailedProcessingExceptionWhenProcessingExceptionHandle
133133
assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
134134
}
135135

136+
@Test
137+
public void shouldRethrowExceptionWhenProcessingExceptionHandlerIsNull() {
138+
// This simulates the global thread case where no ProcessingExceptionHandler is set
139+
final ProcessorNode<Object, Object, Object, Object> node =
140+
new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet());
141+
142+
final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext();
143+
// Initialize without a ProcessingExceptionHandler (simulates global thread initialization)
144+
node.init(internalProcessorContext);
145+
146+
// The exception should be rethrown since there's no handler to process it
147+
final RuntimeException exception = assertThrows(RuntimeException.class,
148+
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
149+
150+
assertEquals("Processing exception should be caught and handled by the processing exception handler.",
151+
exception.getMessage());
152+
}
153+
136154
@ParameterizedTest
137155
@CsvSource({
138156
"FailedProcessingException,java.lang.RuntimeException,Fail processing",

0 commit comments

Comments
 (0)