Skip to content

Conversation

@k-raina
Copy link
Member

@k-raina k-raina commented Nov 7, 2025

Summary

This PR fixes a critical usability issue where Consumer.poll() and Consumer.consume() would block indefinitely and not respond to Ctrl+C (KeyboardInterrupt) signals.

Fixes: #209 and #807

Problem

When calling Consumer.poll() or Consumer.consume() with an infinite timeout (or very long timeout), the operations would block indefinitely in the underlying librdkafka C library. Because the Python Global Interpreter Lock (GIL) was released during these blocking calls, Python's signal handling mechanism couldn't detect Ctrl+C signals, making it impossible to gracefully interrupt the consumer.

Solution

The fix implements a "wakeable poll" pattern that:

  1. Chunks long timeouts: Instead of making a single blocking call with the full timeout, the implementation breaks it into smaller 200ms chunks
  2. Periodic signal checking: Between chunks, the code re-acquires the Python GIL and calls PyErr_CheckSignals() to detect pending KeyboardInterrupt signals
  3. Immediate interruption: If a signal is detected, the operation returns immediately, allowing the KeyboardInterrupt to propagate to the Python code

Impact

This fix significantly improves the developer and operational experience for applications using the Confluent Python Kafka client.

Before this fix:

  • Developers had to forcefully kill processes during development and testing
  • No way to gracefully stop consumer loops during debugging
  • Frustrating workflow interruptions when testing consumer behavior

After this fix:

  • Developers can use standard Ctrl+C to interrupt consumer operations
  • Clean shutdown during development and testing
  • Improved debugging experience with proper signal handling

Testing (tests/test_Consumer.py)

  1. Utility function tests:

    • test_calculate_chunk_timeout_utility_function(): Tests chunk timeout calculation logic
    • test_check_signals_between_chunks_utility_function(): Tests signal detection between chunks
    • test_wakeable_poll_utility_functions_interaction(): Tests interaction between both utilities
  2. Poll interruptibility tests:

    • test_poll_interruptibility_and_messages(): Tests poll() can be interrupted and still handles messages correctly
    • test_poll_edge_cases(): Tests edge cases (zero timeout, closed consumer, short timeouts)
  3. Consume interruptibility tests:

    • test_consume_interruptibility_and_messages(): Tests consume() can be interrupted and still handles messages correctly
    • test_consume_edge_cases(): Tests edge cases (zero timeout, invalid parameters, short timeouts)

All tests use a helper function send_sigint_after_delay() to simulate Ctrl+C in automated tests.

Performance Impact

  • Minimal overhead: Only adds signal checks between 200ms chunks

Manual Testing

test_consumer_consume.py

[3.20s] Message #3435: Message #3457 at 1762530584.4208431
  Topic: test-topic, Partition: 0, Offset: 30833
[3.20s] Message #3436: Message #3458 at 1762530584.926145
  Topic: test-topic, Partition: 0, Offset: 30834
[3.20s] Message #3437: Message #3459 at 1762530585.431317
  Topic: test-topic, Partition: 0, Offset: 30835
[3.20s] Message #3438: Message #3460 at 1762530585.936573
  Topic: test-topic, Partition: 0, Offset: 30836
[3.20s] Message #3439: Message #3461 at 1762530586.441687
  Topic: test-topic, Partition: 0, Offset: 30837
[3.20s] Message #3440: Message #3462 at 1762530586.9468532
  Topic: test-topic, Partition: 0, Offset: 30838
^C
============================================================
KeyboardInterrupt caught successfully!
Total messages received: 3440
Total time elapsed: 4.02 seconds
============================================================

✓ SUCCESS: Ctrl+C interruption is working!

Closing consumer...
Consumer closed.
➜  confluent-kafka-python git:(fix/wakeable-poll-issues-209-807) ✗ 

Copilot AI review requested due to automatic review settings November 7, 2025 15:53
@k-raina k-raina requested review from a team and MSeal as code owners November 7, 2025 15:53
@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes critical usability issues where Consumer.poll() and Consumer.consume() would block indefinitely and not respond to Ctrl+C (KeyboardInterrupt) signals. The solution implements a "wakeable poll" pattern that chunks long timeouts into 200ms intervals and periodically checks for pending signals between chunks, allowing proper signal handling and graceful interruption.

Key Changes:

  • Implemented helper functions calculate_chunk_timeout() and check_signals_between_chunks() in C code to enable interruptible polling
  • Modified Consumer.poll() and Consumer.consume() to use chunked polling with periodic signal checks
  • Added comprehensive test coverage for utility functions, interruptibility, edge cases, and message handling

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.

File Description
tests/test_Consumer.py Added utility helper send_sigint_after_delay() and 7 new test functions covering chunk timeout calculation, signal detection, utility function interaction, poll/consume interruptibility, and edge cases
src/confluent_kafka/src/Consumer.c Implemented wakeable poll pattern with helper functions and refactored Consumer_poll() and Consumer_consume() to use chunked polling with signal checking
CHANGELOG.md Documented the fix for blocking poll/consume operations

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

*
* Instead of a single blocking call to rd_kafka_consumer_poll() with the
* full timeout, this function:
* 1. Splits the timeout into 200ms chunks
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The function documentation would benefit from documenting the CHUNK_TIMEOUT_MS constant value (200ms) in the description to match the implementation details mentioned in comment lines.

Suggested change
* 1. Splits the timeout into 200ms chunks
* 1. Splits the timeout into 200ms chunks (CHUNK_TIMEOUT_MS = 200ms)

Copilot uses AI. Check for mistakes.
return NULL;
}

/* Create Python list from messages */
Copy link

Copilot AI Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra whitespace before closing comment marker.

Suggested change
/* Create Python list from messages */
/* Create Python list from messages */

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@MSeal MSeal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor comment, tested out things locally with a 30s timeout with this vs master... much better experience of always being able to interrupt.

time.sleep(delay_seconds)
try:
os.kill(os.getpid(), signal.SIGINT)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably catch the KeyboardInterrupt here instead so you don't mask other errors being raised instead

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants