Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quixstreams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@

__all__ = ["Application", "message_context", "MessageContext", "State"]

__version__ = "4.0.0a2"
__version__ = "4.0.0a3"
8 changes: 4 additions & 4 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1111,12 +1111,12 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
]
# TODO: The set is used because the watermark tp can already be present in the "topic_partitions"
# because we use `subscribe()` earlier. Fix the mess later.
# TODO: Also, how to avoid reading the whole WM topic on each restart?
# We really need only the most recent data
# Is it fine to read it from the end? The active partitions must still publish something.
# Or should we commit it?
self._consumer.assign(list(set(topic_partitions + watermarks_partitions)))

# Bootstrap watermarks by progressively reading the watermarks topic
# This uses an exponential backoff strategy to minimize startup time
self._watermark_manager.bootstrap_watermarks(self._consumer)

# Pause changelog topic+partitions immediately after assignment
changelog_topics = {t.name for t in self._topic_manager.changelog_topics_list}
changelog_tps = [tp for tp in topic_partitions if tp.topic in changelog_topics]
Expand Down
180 changes: 179 additions & 1 deletion quixstreams/processing/watermarking.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import logging
from time import monotonic
from typing import Optional, TypedDict
from typing import TYPE_CHECKING, Optional, TypedDict

from confluent_kafka import TopicPartition

from quixstreams.internal_producer import InternalProducer
from quixstreams.kafka.consumer import raise_for_msg_error
from quixstreams.models import Topic
from quixstreams.models.topics.manager import TopicManager
from quixstreams.utils.format import format_timestamp
from quixstreams.utils.json import dumps

if TYPE_CHECKING:
from quixstreams.kafka import BaseConsumer

logger = logging.getLogger(__name__)

__all__ = ("WatermarkManager", "WatermarkMessage")
Expand Down Expand Up @@ -155,3 +161,175 @@ def _get_watermark(self) -> int:
if watermarks := self._watermarks.values():
watermark = min(watermarks)
return watermark

def bootstrap_watermarks(self, consumer: "BaseConsumer") -> None:
"""
Bootstrap watermarks by reading the watermarks topic progressively.

This method uses an exponential backoff strategy:
1. Try to read N messages from the end of the topic
2. If not all topic-partitions are found, seek further back exponentially
3. Continue until all TPs have watermarks or the beginning is reached

:param consumer: The Kafka consumer to use for reading watermarks
"""
watermarks_topic_name = self.watermarks_topic.name
watermarks_partition = 0 # Watermarks topic always has 1 partition

# Get the expected topic-partitions that need watermarks
expected_tps = set(self._watermarks.keys())
if not expected_tps:
logger.info("No topic-partitions to bootstrap watermarks for")
return

logger.info(
f"Bootstrapping watermarks for {len(expected_tps)} topic-partitions "
f"from topic '{watermarks_topic_name}'. Expected TPs: {expected_tps}"
)

# Get the high watermark (end offset) of the watermarks topic
tp = TopicPartition(watermarks_topic_name, watermarks_partition)
logger.debug(f"Getting watermark offsets for {watermarks_topic_name}...")
try:
_, high_offset = consumer.get_watermark_offsets(tp, timeout=5.0)
logger.debug(f"Watermarks topic high offset: {high_offset}")
except Exception as e:
# If we can't get watermark offsets, the topic might not be ready yet
# Log a warning but allow the application to start with -1 watermarks
logger.warning(
f"Failed to get watermark offsets for topic {watermarks_topic_name}: {e}. "
f"Watermarks will start at -1 and be updated as messages arrive."
)
return

if high_offset == 0:
logger.info("Watermarks topic is empty, no bootstrapping needed")
return

# Progressive search parameters
initial_lookback = 100 # Start by looking at last 100 messages
lookback_step = min(initial_lookback, high_offset)
found_tps: set[tuple[str, int]] = set()
seek_offset = max(0, high_offset - lookback_step)

iteration_count = 0
max_iterations = 20 # Safety limit to prevent infinite loops
while found_tps != expected_tps:
iteration_count += 1
if iteration_count > max_iterations:
missing_tps = expected_tps - found_tps
raise RuntimeError(
f"Bootstrap failed: exceeded {max_iterations} iterations. "
f"Found {len(found_tps)}/{len(expected_tps)} topic-partitions. "
f"Missing: {missing_tps}. This suggests a bug in the bootstrap logic."
)
logger.info(
f"Bootstrap iteration {iteration_count}: seeking to offset {seek_offset} "
f"(lookback_step={lookback_step}, found {len(found_tps)}/{len(expected_tps)} TPs)"
)

# Seek to the calculated position
tp_with_offset = TopicPartition(
watermarks_topic_name, watermarks_partition, seek_offset
)
try:
consumer.seek(tp_with_offset)
logger.debug(f"Seeked to offset {seek_offset}")
except Exception as e:
logger.error(f"Failed to seek to offset {seek_offset}: {e}")
raise

# Read messages from seek_offset towards previous seek_offset
# or until all TPs are found
messages_read = 0
max_messages_to_read = lookback_step

# Timeout for this specific seek iteration (30 seconds)
iteration_timeout = 30.0
iteration_start_time = monotonic()
consecutive_poll_timeouts = 0
max_consecutive_poll_timeouts = 5 # Stop after 5 consecutive empty polls

while messages_read < max_messages_to_read:
# Check if this iteration has timed out
if monotonic() - iteration_start_time > iteration_timeout:
missing_tps = expected_tps - found_tps
raise TimeoutError(
f"Bootstrap failed: polling timeout after {iteration_timeout}s for seek offset {seek_offset}. "
f"Found {len(found_tps)}/{len(expected_tps)} topic-partitions. "
f"Missing: {missing_tps}. Cannot start application without complete watermark state."
)

msg = consumer.poll(timeout=1.0)
if msg is None:
consecutive_poll_timeouts += 1
# If we've had many consecutive timeouts, assume we've read all available messages
# in this range and move to the next iteration
if consecutive_poll_timeouts >= max_consecutive_poll_timeouts:
logger.info(
f"No more messages available after {consecutive_poll_timeouts} empty polls at offset {seek_offset}, "
f"moving to next iteration (read {messages_read}/{max_messages_to_read} messages)"
)
break
continue

# Reset consecutive timeout counter when we get a message
consecutive_poll_timeouts = 0

# Skip messages from other topics (shouldn't happen but be safe)
if msg.topic() != watermarks_topic_name:
continue

messages_read += 1

# Deserialize and process the watermark message
try:
# Raise if message has an error
msg = raise_for_msg_error(msg)
watermark_msg = self.watermarks_topic.deserialize(msg).value
tp_key = (watermark_msg["topic"], watermark_msg["partition"])

# Only track if it's an expected TP
if tp_key in expected_tps:
timestamp = watermark_msg["timestamp"]
# Update the watermark (use max to handle out-of-order reads)
current = self._watermarks.get(tp_key, -1)
self._watermarks[tp_key] = max(current, timestamp)
found_tps.add(tp_key)

logger.debug(
f"Bootstrapped watermark for {watermark_msg['topic']}[{watermark_msg['partition']}]: "
f"{format_timestamp(timestamp)}"
)

# Stop if we've found all TPs
if found_tps == expected_tps:
logger.info(
f"Successfully bootstrapped all {len(expected_tps)} topic-partitions "
f"after reading {messages_read} messages"
)
return

except Exception as e:
logger.warning(f"Failed to deserialize watermark message: {e}")
continue

# If we've read everything and still missing TPs, expand lookback exponentially
if found_tps != expected_tps:
if seek_offset == 0:
# We've read the entire topic from the beginning
missing_tps = expected_tps - found_tps
logger.warning(
f"Reached beginning of watermarks topic but {len(missing_tps)} "
f"topic-partitions still have no watermarks: {missing_tps}. "
f"They will remain at -1 until new watermarks arrive."
)
return

# Double the step and seek further back from current position
lookback_step = min(lookback_step * 2, seek_offset)
seek_offset = max(0, seek_offset - lookback_step)

logger.info(
f"Finished bootstrapping watermarks: found {len(found_tps)}/{len(expected_tps)} topic-partitions"
)
Loading
Loading