diff --git a/CHANGELOG.md b/CHANGELOG.md index ce2b6a63b..4005b52e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Confluent Python Client for Apache Kafka - CHANGELOG + +### Fixes + +- Fixed `Consumer.poll()` and `Consumer.consume()` blocking indefinitely and not responding to Ctrl+C (KeyboardInterrupt) signals. The implementation now uses a "wakeable poll" pattern that breaks long blocking calls into smaller chunks (200ms) and periodically re-acquires the Python GIL to check for pending signals. This allows Ctrl+C to properly interrupt blocking consumer operations. Fixes Issues [#209](https://github.com/confluentinc/confluent-kafka-python/issues/209) and [#807](https://github.com/confluentinc/confluent-kafka-python/issues/807). + + ## v2.12.1 - 2025-10-21 v2.12.1 is a maintenance release with the following fixes: diff --git a/src/confluent_kafka/src/Consumer.c b/src/confluent_kafka/src/Consumer.c index 9a747862d..9a2fcb7ca 100644 --- a/src/confluent_kafka/src/Consumer.c +++ b/src/confluent_kafka/src/Consumer.c @@ -106,9 +106,81 @@ static int Consumer_traverse (Handle *self, } +/**************************************************************************** + * + * Helper functions for implementing interruptible poll/consume operations + * that allow Ctrl+C to terminate blocking calls. See Issues #209 and #807. + * + * + ****************************************************************************/ + +/** + * @brief Calculate the timeout for the current chunk in wakeable poll pattern. + * + * @param total_timeout_ms Total timeout in milliseconds (-1 for infinite) + * @param chunk_count Current chunk iteration count (0-based) + * @param chunk_timeout_ms Chunk size in milliseconds (typically 200ms) + * @return int Chunk timeout in milliseconds, or 0 if total timeout expired + */ +static int calculate_chunk_timeout(int total_timeout_ms, int chunk_count, + int chunk_timeout_ms) { + if (total_timeout_ms < 0) { + /* Infinite timeout - use chunk size */ + return chunk_timeout_ms; + } else { + /* Finite timeout - calculate remaining */ + int remaining_ms = total_timeout_ms - (chunk_count * chunk_timeout_ms); + if (remaining_ms <= 0) { + /* Timeout expired */ + return 0; + } + return (remaining_ms < chunk_timeout_ms) ? remaining_ms : chunk_timeout_ms; + } +} + +/** + * @brief Check for pending signals between poll chunks. + * + * Re-acquires GIL, checks for signals, and handles cleanup if signal detected. + * This allows Ctrl+C to interrupt blocking poll/consume operations. + * + * @param self Consumer handle + * @param cs CallState structure (thread state will be updated) + * @return int 0 if no signal detected (continue), 1 if signal detected (should return NULL) + */ +static int check_signals_between_chunks(Handle *self, CallState *cs) { + /* Re-acquire GIL to check for signals */ + PyEval_RestoreThread(cs->thread_state); + + /* Check for pending signals (KeyboardInterrupt, etc.) */ + /* PyErr_CheckSignals() already set the exception */ + if (PyErr_CheckSignals() == -1) { + /* Note: GIL is already held, but CallState_end expects to restore it */ + /* Save thread state again so CallState_end can restore it properly */ + cs->thread_state = PyEval_SaveThread(); + if (!CallState_end(self, cs)) { + /* CallState_end detected signal and cleaned up */ + return 1; /* Signal detected */ + } + return 1; + } + /* Re-release GIL for next iteration */ + cs->thread_state = PyEval_SaveThread(); + return 0; /* No signal, continue */ +} +/**************************************************************************** + * + * + * Consumer Methods + * + * + * + * + ****************************************************************************/ + static PyObject *Consumer_subscribe (Handle *self, PyObject *args, PyObject *kwargs) { @@ -984,14 +1056,37 @@ static PyObject *Consumer_offsets_for_times (Handle *self, PyObject *args, #endif } - +/** + * @brief Poll for a single message from the subscribed topics. + * + * Instead of a single blocking call to rd_kafka_consumer_poll() with the + * full timeout, this function: + * 1. Splits the timeout into 200ms chunks + * 2. Calls rd_kafka_consumer_poll() with chunk timeout + * 3. Between chunks, re-acquires GIL and calls PyErr_CheckSignals() + * 4. If signal detected, returns NULL (raises KeyboardInterrupt) + * 5. Continues until message received, timeout expired, or signal detected + * + * + * @param self Consumer handle + * @param args Positional arguments (unused) + * @param kwargs Keyword arguments: + * - timeout (float, optional): Timeout in seconds. + * Default: -1.0 (infinite timeout) + * @return PyObject* Message object, None if timeout, or NULL on error + * (raises KeyboardInterrupt if signal detected) + */ static PyObject *Consumer_poll (Handle *self, PyObject *args, PyObject *kwargs) { double tmout = -1.0f; static char *kws[] = { "timeout", NULL }; - rd_kafka_message_t *rkm; + rd_kafka_message_t *rkm = NULL; PyObject *msgobj; CallState cs; + const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */ + int total_timeout_ms; + int chunk_timeout_ms; + int chunk_count = 0; if (!self->rk) { PyErr_SetString(PyExc_RuntimeError, @@ -1002,16 +1097,43 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout)) return NULL; + total_timeout_ms = cfl_timeout_ms(tmout); + CallState_begin(self, &cs); - rkm = rd_kafka_consumer_poll(self->rk, cfl_timeout_ms(tmout)); + while (1) { + /* Calculate timeout for this chunk */ + chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count, + CHUNK_TIMEOUT_MS); + if (chunk_timeout_ms == 0) { + /* Timeout expired */ + break; + } + + /* Poll with chunk timeout */ + rkm = rd_kafka_consumer_poll(self->rk, chunk_timeout_ms); + + /* If we got a message, exit the loop */ + if (rkm) { + break; + } + + chunk_count++; + + /* Check for signals between chunks */ + if (check_signals_between_chunks(self, &cs)) { + return NULL; + } + } + /* Final GIL restore and signal check */ if (!CallState_end(self, &cs)) { if (rkm) rd_kafka_message_destroy(rkm); return NULL; } + /* Handle the message */ if (!rkm) Py_RETURN_NONE; @@ -1053,7 +1175,27 @@ static PyObject *Consumer_memberid (Handle *self, PyObject *args, return memberidobj; } - +/** + * @brief Consume a batch of messages from the subscribed topics. + * + * Instead of a single blocking call to rd_kafka_consume_batch_queue() with the + * full timeout, this function: + * 1. Splits the timeout into 200ms chunks + * 2. Calls rd_kafka_consume_batch_queue() with chunk timeout + * 3. Between chunks, re-acquires GIL and calls PyErr_CheckSignals() + * 4. If signal detected, returns NULL (raises KeyboardInterrupt) + * 5. Continues until messages received, timeout expired, or signal detected. + * + * @param self Consumer handle + * @param args Positional arguments (unused) + * @param kwargs Keyword arguments: + * - num_messages (int, optional): Maximum number of messages to + * consume per call. Default: 1. Maximum: 1000000. + * - timeout (float, optional): Timeout in seconds. + * Default: -1.0 (infinite timeout) + * @return PyObject* List of Message objects, empty list if timeout, or NULL on error + * (raises KeyboardInterrupt if signal detected) + */ static PyObject *Consumer_consume (Handle *self, PyObject *args, PyObject *kwargs) { unsigned int num_messages = 1; @@ -1063,7 +1205,11 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args, PyObject *msglist; rd_kafka_queue_t *rkqu = self->u.Consumer.rkqu; CallState cs; - Py_ssize_t i, n; + Py_ssize_t i, n = 0; + const int CHUNK_TIMEOUT_MS = 200; /* 200ms chunks for signal checking */ + int total_timeout_ms; + int chunk_timeout_ms; + int chunk_count = 0; if (!self->rk) { PyErr_SetString(PyExc_RuntimeError, @@ -1081,14 +1227,53 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args, return NULL; } - CallState_begin(self, &cs); + total_timeout_ms = cfl_timeout_ms(tmout); rkmessages = malloc(num_messages * sizeof(rd_kafka_message_t *)); + if (!rkmessages) { + PyErr_NoMemory(); + return NULL; + } + + CallState_begin(self, &cs); + + while (1) { + /* Calculate timeout for this chunk */ + chunk_timeout_ms = calculate_chunk_timeout(total_timeout_ms, chunk_count, + CHUNK_TIMEOUT_MS); + if (chunk_timeout_ms == 0) { + /* Timeout expired */ + break; + } + + /* Consume with chunk timeout */ + n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, chunk_timeout_ms, + rkmessages, num_messages); + + if (n < 0) { + /* Error - need to restore GIL before setting error */ + PyEval_RestoreThread(cs.thread_state); + free(rkmessages); + cfl_PyErr_Format(rd_kafka_last_error(), + "%s", rd_kafka_err2str(rd_kafka_last_error())); + return NULL; + } + + /* If we got messages, exit the loop */ + if (n > 0) { + break; + } - n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu, - cfl_timeout_ms(tmout), - rkmessages, num_messages); + chunk_count++; + /* Check for signals between chunks */ + if (check_signals_between_chunks(self, &cs)) { + free(rkmessages); + return NULL; + } + } + + /* Final GIL restore and signal check */ if (!CallState_end(self, &cs)) { for (i = 0; i < n; i++) { rd_kafka_message_destroy(rkmessages[i]); @@ -1097,13 +1282,7 @@ static PyObject *Consumer_consume (Handle *self, PyObject *args, return NULL; } - if (n < 0) { - free(rkmessages); - cfl_PyErr_Format(rd_kafka_last_error(), - "%s", rd_kafka_err2str(rd_kafka_last_error())); - return NULL; - } - + /* Create Python list from messages */ msglist = PyList_New(n); for (i = 0; i < n; i++) { diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 0b212d497..639470418 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -1,14 +1,35 @@ #!/usr/bin/env python +import os +import signal +import threading +import time + import pytest -from confluent_kafka import (Consumer, TopicPartition, KafkaError, +from confluent_kafka import (Consumer, Producer, TopicPartition, KafkaError, KafkaException, TIMESTAMP_NOT_AVAILABLE, OFFSET_INVALID) from tests.common import TestConsumer +def send_sigint_after_delay(delay_seconds): + """Send SIGINT to current process after delay. + + Utility function for testing interruptible poll/consume operations. + Used to simulate Ctrl+C in automated tests. + + Args: + delay_seconds: Delay in seconds before sending SIGINT + """ + time.sleep(delay_seconds) + try: + os.kill(os.getpid(), signal.SIGINT) + except Exception: + pass + + def test_basic_api(): """ Basic API tests, these wont really do anything since there is no broker configured. """ @@ -676,3 +697,740 @@ def __init__(self, config): with pytest.raises(RuntimeError, match="Consumer closed"): consumer.consumer_group_metadata() + + +def test_calculate_chunk_timeout_utility_function(): + """Test calculate_chunk_timeout() utility function through poll() API. + """ + # Assertion 1: Infinite timeout chunks forever with 200ms intervals + consumer1 = TestConsumer({ + 'group.id': 'test-chunk-infinite', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer1.poll() # Infinite timeout - should chunk every 200ms + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within ~0.5s (200ms chunk + overhead) + assert elapsed < 1.0, f"Assertion 1 failed: Infinite timeout chunking took {elapsed:.2f}s" + consumer1.close() + + # Assertion 2: Finite timeout exact multiple (1.0s = 5 chunks of 200ms) + consumer2 = TestConsumer({ + 'group.id': 'test-chunk-exact-multiple', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer2.subscribe(['test-topic']) + + start = time.time() + msg = consumer2.poll(timeout=1.0) # Exactly 1000ms (5 chunks) + elapsed = time.time() - start + + assert msg is None, "Assertion 2 failed: Expected None (timeout)" + assert 0.8 <= elapsed <= 1.2, f"Assertion 2 failed: Timeout took {elapsed:.2f}s, expected ~1.0s" + consumer2.close() + + # Assertion 3: Finite timeout not multiple (0.35s = 1 chunk + 150ms partial) + consumer3 = TestConsumer({ + 'group.id': 'test-chunk-not-multiple', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe(['test-topic']) + + start = time.time() + msg = consumer3.poll(timeout=0.35) # 350ms (1 full chunk + 150ms partial) + elapsed = time.time() - start + + assert msg is None, "Assertion 3 failed: Expected None (timeout)" + assert 0.25 <= elapsed <= 0.45, f"Assertion 3 failed: Timeout took {elapsed:.2f}s, expected ~0.35s" + consumer3.close() + + # Assertion 4: Very short timeout (< 200ms chunk size) + consumer4 = TestConsumer({ + 'group.id': 'test-chunk-very-short', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe(['test-topic']) + + start = time.time() + msg = consumer4.poll(timeout=0.05) # 50ms (less than 200ms chunk) + elapsed = time.time() - start + + assert msg is None, "Assertion 4 failed: Expected None (timeout)" + assert 0.03 <= elapsed <= 0.15, f"Assertion 4 failed: Timeout took {elapsed:.2f}s, expected ~0.05s (not 0.2s)" + consumer4.close() + + # Assertion 5: Zero timeout (non-blocking) + consumer5 = TestConsumer({ + 'group.id': 'test-chunk-zero', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer5.subscribe(['test-topic']) + + start = time.time() + msg = consumer5.poll(timeout=0.0) # Non-blocking + elapsed = time.time() - start + + assert elapsed < 0.1, f"Assertion 5 failed: Zero timeout took {elapsed:.2f}s, expected immediate return" + consumer5.close() + + # Assertion 6: Large finite timeout (10s = 50 chunks) + consumer6 = TestConsumer({ + 'group.id': 'test-chunk-large', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer6.subscribe(['test-topic']) + + start = time.time() + msg = consumer6.poll(timeout=10.0) # 10 seconds (50 chunks) + elapsed = time.time() - start + + assert msg is None, "Assertion 6 failed: Expected None (timeout)" + assert 9.5 <= elapsed <= 10.5, f"Assertion 6 failed: Timeout took {elapsed:.2f}s, expected ~10.0s" + consumer6.close() + + # Assertion 7: Finite timeout with interruption (chunk calculation continues correctly) + consumer7 = TestConsumer({ + 'group.id': 'test-chunk-finite-interrupt', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer7.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.4)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer7.poll(timeout=1.0) # 1 second, but interrupt after 0.4s + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt quickly, not wait for full 1 second + assert elapsed < 1.0, f"Assertion 7 failed: Interruption took {elapsed:.2f}s, expected < 1.0s" + consumer7.close() + + +def test_check_signals_between_chunks_utility_function(): + """Test check_signals_between_chunks() utility function through poll() API. + """ + # Assertion 1: Signal detected on first chunk check + consumer1 = TestConsumer({ + 'group.id': 'test-signal-first-chunk', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.05)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer1.poll() # Infinite timeout + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within ~200ms (first chunk check) + assert elapsed < 0.5, f"Assertion 1 failed: First chunk signal check took {elapsed:.2f}s, expected < 0.5s" + consumer1.close() + + # Assertion 2: Signal detected on later chunk check + consumer2 = TestConsumer({ + 'group.id': 'test-signal-later-chunk', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer2.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.5)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer2.poll() # Infinite timeout + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within ~200ms of signal being sent (0.5s + 0.2s = 0.7s max) + assert elapsed < 0.8, f"Assertion 2 failed: Later chunk signal check took {elapsed:.2f}s, expected < 0.8s" + consumer2.close() + + # Assertion 3: No signal - continues polling (returns 0) + consumer3 = TestConsumer({ + 'group.id': 'test-signal-no-signal', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe(['test-topic']) + + start = time.time() + msg = consumer3.poll(timeout=0.5) # 500ms, no signal + elapsed = time.time() - start + + assert msg is None, "Assertion 3 failed: Expected None (timeout), no signal should not interrupt" + assert 0.4 <= elapsed <= 0.6, f"Assertion 3 failed: No signal timeout took {elapsed:.2f}s, expected ~0.5s" + consumer3.close() + + # Assertion 4: Signal checked every chunk (not just once) + consumer4 = TestConsumer({ + 'group.id': 'test-signal-every-chunk', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe(['test-topic']) + + # Send signal after 0.6 seconds (3 chunks should have passed) + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer4.poll() # Infinite timeout + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt quickly after signal (within one chunk period) + # Signal sent at 0.6s, should interrupt by ~0.8s (0.6 + 0.2) + assert 0.6 <= elapsed <= 0.9, f"Assertion 4 failed: Every chunk check took {elapsed:.2f}s, expected 0.6-0.9s" + consumer4.close() + + # Assertion 5: Signal check works during finite timeout + consumer5 = TestConsumer({ + 'group.id': 'test-signal-finite-timeout', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer5.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer5.poll(timeout=2.0) # 2 seconds, but interrupt after 0.3s + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt quickly, not wait for full 2 seconds + assert elapsed < 1.0, f"Assertion 5 failed: Signal during finite timeout took {elapsed:.2f}s, expected < 1.0s" + consumer5.close() + + +def test_wakeable_poll_utility_functions_interaction(): + """Test interaction between calculate_chunk_timeout() and check_signals_between_chunks(). + """ + # Assertion 1: Both functions work together - chunk calculation + signal check + consumer1 = TestConsumer({ + 'group.id': 'test-interaction-chunk-signal', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe(['test-topic']) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.4)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer1.poll(timeout=1.0) # 1 second timeout, interrupt after 0.4s + except KeyboardInterrupt: + elapsed = time.time() - start + # Chunk calculation should work (200ms chunks), signal check should detect signal + # Should interrupt within ~0.6s (0.4s signal + 0.2s chunk) + assert elapsed < 0.8, f"Assertion 1 failed: Interaction test took {elapsed:.2f}s, expected < 0.8s" + # Verify it didn't wait for full 1 second timeout + assert elapsed < 1.0, f"Assertion 1 failed: Should interrupt before timeout, took {elapsed:.2f}s" + consumer1.close() + + # Assertion 2: Multiple chunks before signal - both functions work over multiple iterations + consumer2 = TestConsumer({ + 'group.id': 'test-interaction-multiple-chunks', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer2.subscribe(['test-topic']) + + # Send signal after 0.6 seconds (3 chunks should have passed: 0.2s, 0.4s, 0.6s) + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer2.poll() # Infinite timeout + except KeyboardInterrupt: + elapsed = time.time() - start + # Chunk calculation should continue correctly (200ms each) + # Signal check should happen every chunk + # Should interrupt within ~0.8s (0.6s signal + 0.2s chunk) + assert 0.6 <= elapsed <= 0.9, f"Assertion 2 failed: Multiple chunks interaction took {elapsed:.2f}s, expected 0.6-0.9s" + # Verify chunking was happening (elapsed should be close to signal time + one chunk) + assert elapsed >= 0.6, f"Assertion 2 failed: Should wait for signal at 0.6s, but interrupted at {elapsed:.2f}s" + consumer2.close() + + +def test_poll_interruptibility_and_messages(): + """Test poll() interruptibility (main fix) and message handling. + """ + topic = 'test-poll-interrupt-topic' + + # Assertion 1: Infinite timeout can be interrupted immediately + consumer1 = TestConsumer({ + 'group.id': 'test-poll-infinite-immediate', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.1)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer1.poll() # Infinite timeout + assert False, "Assertion 1 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within first chunk (~200ms) + assert elapsed < 0.5, f"Assertion 1 failed: Immediate interrupt took {elapsed:.2f}s, expected < 0.5s" + consumer1.close() + + # Assertion 2: Finite timeout can be interrupted before timeout expires + consumer2 = TestConsumer({ + 'group.id': 'test-poll-finite-interrupt', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer2.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer2.poll(timeout=2.0) # 2 seconds, but interrupt after 0.3s + assert False, "Assertion 2 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt quickly, not wait for full 2 seconds + assert elapsed < 1.0, f"Assertion 2 failed: Finite timeout interrupt took {elapsed:.2f}s, expected < 1.0s" + assert elapsed < 2.0, f"Assertion 2 failed: Should interrupt before timeout, took {elapsed:.2f}s" + consumer2.close() + + # Assertion 3: Signal sent after multiple chunks still interrupts quickly + consumer3 = TestConsumer({ + 'group.id': 'test-poll-multiple-chunks', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer3.poll() # Infinite timeout + assert False, "Assertion 3 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within one chunk period after signal (0.6s + 0.2s = 0.8s max) + assert 0.6 <= elapsed <= 0.9, f"Assertion 3 failed: Multiple chunks interrupt took {elapsed:.2f}s, expected 0.6-0.9s" + consumer3.close() + + # Assertion 4: No signal - timeout works normally + consumer4 = TestConsumer({ + 'group.id': 'test-poll-timeout-normal', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe([topic]) + + start = time.time() + msg = consumer4.poll(timeout=0.5) # 500ms, no signal + elapsed = time.time() - start + + assert msg is None, "Assertion 4 failed: Expected None (timeout), no signal should not interrupt" + assert 0.4 <= elapsed <= 0.6, f"Assertion 4 failed: Normal timeout took {elapsed:.2f}s, expected ~0.5s" + consumer4.close() + + # Assertion 5: Message available - returns immediately + producer = Producer({'bootstrap.servers': 'localhost:9092'}) + producer.produce(topic, value=b'test-message') + producer.flush(timeout=1.0) + producer = None + + consumer5 = TestConsumer({ + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'test-poll-message-available', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 6000, + 'auto.offset.reset': 'earliest' + }) + consumer5.subscribe([topic]) + + # Wait for subscription and message availability + time.sleep(2.0) + + start = time.time() + msg = consumer5.poll(timeout=2.0) + elapsed = time.time() - start + + # Message should be available and return quickly (after consumer is ready) + assert msg is not None, "Assertion 5 failed: Expected message, got None" + assert not msg.error(), f"Assertion 5 failed: Message has error: {msg.error()}" + # Allow more time for initial consumer setup, but once ready, should return quickly + assert elapsed < 2.5, f"Assertion 5 failed: Message available but took {elapsed:.2f}s, expected < 2.5s" + assert msg.value() == b'test-message', "Assertion 5 failed: Message value mismatch" + consumer5.close() + + +def test_poll_edge_cases(): + """Test poll() edge cases. + """ + topic = 'test-poll-edge-topic' + + # Assertion 1: Zero timeout returns immediately (non-blocking) + consumer1 = TestConsumer({ + 'group.id': 'test-poll-zero-timeout', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe([topic]) + + start = time.time() + msg = consumer1.poll(timeout=0.0) # Zero timeout + elapsed = time.time() - start + + assert elapsed < 0.1, f"Assertion 1 failed: Zero timeout took {elapsed:.2f}s, expected < 0.1s" + assert msg is None, "Assertion 1 failed: Zero timeout with no messages should return None" + consumer1.close() + + # Assertion 2: Closed consumer raises RuntimeError + consumer2 = TestConsumer({ + 'group.id': 'test-poll-closed', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000 + }) + consumer2.close() + + with pytest.raises(RuntimeError) as exc_info: + consumer2.poll(timeout=0.1) + assert 'Consumer closed' in str(exc_info.value), f"Assertion 2 failed: Expected 'Consumer closed' error, got: {exc_info.value}" + + # Assertion 3: Short timeout works correctly (no signal) + consumer3 = TestConsumer({ + 'group.id': 'test-poll-short-timeout', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe([topic]) + + start = time.time() + msg = consumer3.poll(timeout=0.1) # 100ms timeout + elapsed = time.time() - start + + assert msg is None, "Assertion 3 failed: Short timeout with no messages should return None" + assert 0.05 <= elapsed <= 0.2, f"Assertion 3 failed: Short timeout took {elapsed:.2f}s, expected ~0.1s" + consumer3.close() + + # Assertion 4: Very short timeout (less than chunk size) works + consumer4 = TestConsumer({ + 'group.id': 'test-poll-very-short', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe([topic]) + + start = time.time() + msg = consumer4.poll(timeout=0.05) # 50ms timeout (less than 200ms chunk) + elapsed = time.time() - start + + assert msg is None, "Assertion 4 failed: Very short timeout should return None" + assert elapsed < 0.2, f"Assertion 4 failed: Very short timeout took {elapsed:.2f}s, expected < 0.2s" + consumer4.close() + + +def test_consume_interruptibility_and_messages(): + """Test consume() interruptibility (main fix) and message handling. + """ + topic = 'test-consume-interrupt-topic' + + # Assertion 1: Infinite timeout can be interrupted immediately + consumer1 = TestConsumer({ + 'group.id': 'test-consume-infinite-immediate', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.1)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer1.consume() # Infinite timeout, default num_messages=1 + assert False, "Assertion 1 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within first chunk (~200ms) + assert elapsed < 0.5, f"Assertion 1 failed: Immediate interrupt took {elapsed:.2f}s, expected < 0.5s" + consumer1.close() + + # Assertion 2: Finite timeout can be interrupted before timeout expires + consumer2 = TestConsumer({ + 'group.id': 'test-consume-finite-interrupt', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer2.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.3)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer2.consume(num_messages=10, timeout=2.0) # 2 seconds, but interrupt after 0.3s + assert False, "Assertion 2 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt quickly, not wait for full 2 seconds + assert elapsed < 1.0, f"Assertion 2 failed: Finite timeout interrupt took {elapsed:.2f}s, expected < 1.0s" + assert elapsed < 2.0, f"Assertion 2 failed: Should interrupt before timeout, took {elapsed:.2f}s" + consumer2.close() + + # Assertion 3: Signal sent after multiple chunks still interrupts quickly + consumer3 = TestConsumer({ + 'group.id': 'test-consume-multiple-chunks', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe([topic]) + + interrupt_thread = threading.Thread(target=lambda: send_sigint_after_delay(0.6)) + interrupt_thread.daemon = True + interrupt_thread.start() + + start = time.time() + try: + consumer3.consume(num_messages=5) # Infinite timeout + assert False, "Assertion 3 failed: Should have raised KeyboardInterrupt" + except KeyboardInterrupt: + elapsed = time.time() - start + # Should interrupt within one chunk period after signal (0.6s + 0.2s = 0.8s max) + assert 0.6 <= elapsed <= 0.9, f"Assertion 3 failed: Multiple chunks interrupt took {elapsed:.2f}s, expected 0.6-0.9s" + consumer3.close() + + # Assertion 4: No signal - timeout works normally, returns empty list + consumer4 = TestConsumer({ + 'group.id': 'test-consume-timeout-normal', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe([topic]) + + start = time.time() + msglist = consumer4.consume(num_messages=10, timeout=0.5) # 500ms, no signal + elapsed = time.time() - start + + assert isinstance(msglist, list), "Assertion 4 failed: consume() should return a list" + assert len(msglist) == 0, f"Assertion 4 failed: Expected empty list (timeout), got {len(msglist)} messages" + assert 0.4 <= elapsed <= 0.6, f"Assertion 4 failed: Normal timeout took {elapsed:.2f}s, expected ~0.5s" + consumer4.close() + + # Assertion 5: num_messages=0 returns empty list immediately + consumer5 = TestConsumer({ + 'group.id': 'test-consume-zero-messages', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer5.subscribe([topic]) + + start = time.time() + msglist = consumer5.consume(num_messages=0, timeout=1.0) + elapsed = time.time() - start + + assert isinstance(msglist, list), "Assertion 5 failed: consume() should return a list" + assert len(msglist) == 0, "Assertion 5 failed: num_messages=0 should return empty list" + assert elapsed < 0.1, f"Assertion 5 failed: num_messages=0 took {elapsed:.2f}s, expected < 0.1s" + consumer5.close() + + # Assertion 6: Message available - returns messages + producer = Producer({'bootstrap.servers': 'localhost:9092'}) + for i in range(3): + producer.produce(topic, value=f'test-message-{i}'.encode()) + producer.flush(timeout=1.0) + producer = None + + consumer6 = TestConsumer({ + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'test-consume-messages-available', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 6000, + 'auto.offset.reset': 'earliest' + }) + consumer6.subscribe([topic]) + + # Wait for subscription and message availability + time.sleep(2.0) + + start = time.time() + msglist = consumer6.consume(num_messages=5, timeout=2.0) + elapsed = time.time() - start + + # Messages should be available and return quickly (after consumer is ready) + assert len(msglist) > 0, f"Assertion 6 failed: Expected messages, got empty list" + assert len(msglist) <= 5, f"Assertion 6 failed: Should return at most 5 messages, got {len(msglist)}" + # Allow more time for initial consumer setup, but once ready, should return quickly + assert elapsed < 2.5, f"Assertion 6 failed: Messages available but took {elapsed:.2f}s, expected < 2.5s" + # Verify message values + for i, msg in enumerate(msglist): + assert not msg.error(), f"Assertion 6 failed: Message {i} has error: {msg.error()}" + assert msg.value() is not None, f"Assertion 6 failed: Message {i} has no value" + consumer6.close() + + +def test_consume_edge_cases(): + """Test consume() edge cases. + """ + topic = 'test-consume-edge-topic' + + # Assertion 1: Zero timeout returns immediately (non-blocking) + consumer1 = TestConsumer({ + 'group.id': 'test-consume-zero-timeout', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer1.subscribe([topic]) + + start = time.time() + msglist = consumer1.consume(num_messages=10, timeout=0.0) # Zero timeout + elapsed = time.time() - start + + assert elapsed < 0.1, f"Assertion 1 failed: Zero timeout took {elapsed:.2f}s, expected < 0.1s" + assert isinstance(msglist, list), "Assertion 1 failed: consume() should return a list" + assert len(msglist) == 0, "Assertion 1 failed: Zero timeout with no messages should return empty list" + consumer1.close() + + # Assertion 2: Closed consumer raises RuntimeError + consumer2 = TestConsumer({ + 'group.id': 'test-consume-closed', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000 + }) + consumer2.close() + + with pytest.raises(RuntimeError) as exc_info: + consumer2.consume(num_messages=10, timeout=0.1) + assert 'Consumer closed' in str(exc_info.value), f"Assertion 2 failed: Expected 'Consumer closed' error, got: {exc_info.value}" + + # Assertion 3: Invalid num_messages (negative) raises ValueError + consumer3 = TestConsumer({ + 'group.id': 'test-consume-invalid-negative', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer3.subscribe([topic]) + + with pytest.raises(ValueError) as exc_info: + consumer3.consume(num_messages=-1, timeout=0.1) + assert 'num_messages must be between 0 and 1000000' in str(exc_info.value), f"Assertion 3 failed: Expected num_messages range error, got: {exc_info.value}" + consumer3.close() + + # Assertion 4: Invalid num_messages (too large) raises ValueError + consumer4 = TestConsumer({ + 'group.id': 'test-consume-invalid-large', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer4.subscribe([topic]) + + with pytest.raises(ValueError) as exc_info: + consumer4.consume(num_messages=1000001, timeout=0.1) + assert 'num_messages must be between 0 and 1000000' in str(exc_info.value), f"Assertion 4 failed: Expected num_messages range error, got: {exc_info.value}" + consumer4.close() + + # Assertion 5: Short timeout works correctly (no signal) + consumer5 = TestConsumer({ + 'group.id': 'test-consume-short-timeout', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer5.subscribe([topic]) + + start = time.time() + msglist = consumer5.consume(num_messages=10, timeout=0.1) # 100ms timeout + elapsed = time.time() - start + + assert isinstance(msglist, list), "Assertion 5 failed: consume() should return a list" + assert len(msglist) == 0, "Assertion 5 failed: Short timeout with no messages should return empty list" + assert 0.05 <= elapsed <= 0.2, f"Assertion 5 failed: Short timeout took {elapsed:.2f}s, expected ~0.1s" + consumer5.close() + + # Assertion 6: Very short timeout (less than chunk size) works + consumer6 = TestConsumer({ + 'group.id': 'test-consume-very-short', + 'socket.timeout.ms': 100, + 'session.timeout.ms': 1000, + 'auto.offset.reset': 'latest' + }) + consumer6.subscribe([topic]) + + start = time.time() + msglist = consumer6.consume(num_messages=5, timeout=0.05) # 50ms timeout (less than 200ms chunk) + elapsed = time.time() - start + + assert isinstance(msglist, list), "Assertion 6 failed: consume() should return a list" + assert len(msglist) == 0, "Assertion 6 failed: Very short timeout should return empty list" + assert elapsed < 0.2, f"Assertion 6 failed: Very short timeout took {elapsed:.2f}s, expected < 0.2s" + consumer6.close()