Skip to content

Commit 04da7ef

Browse files
committed
Add watermarks bootstrapping
1 parent 4914ef1 commit 04da7ef

File tree

3 files changed

+364
-5
lines changed

3 files changed

+364
-5
lines changed

quixstreams/app.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,12 +1111,12 @@ def _on_assign(self, _, topic_partitions: List[TopicPartition]):
11111111
]
11121112
# TODO: The set is used because the watermark tp can already be present in the "topic_partitions"
11131113
# because we use `subscribe()` earlier. Fix the mess later.
1114-
# TODO: Also, how to avoid reading the whole WM topic on each restart?
1115-
# We really need only the most recent data
1116-
# Is it fine to read it from the end? The active partitions must still publish something.
1117-
# Or should we commit it?
11181114
self._consumer.assign(list(set(topic_partitions + watermarks_partitions)))
11191115

1116+
# Bootstrap watermarks by progressively reading the watermarks topic
1117+
# This uses an exponential backoff strategy to minimize startup time
1118+
self._watermark_manager.bootstrap_watermarks(self._consumer)
1119+
11201120
# Pause changelog topic+partitions immediately after assignment
11211121
changelog_topics = {t.name for t in self._topic_manager.changelog_topics_list}
11221122
changelog_tps = [tp for tp in topic_partitions if tp.topic in changelog_topics]

quixstreams/processing/watermarking.py

Lines changed: 179 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,19 @@
11
import logging
22
from time import monotonic
3-
from typing import Optional, TypedDict
3+
from typing import TYPE_CHECKING, Optional, TypedDict
4+
5+
from confluent_kafka import TopicPartition
46

57
from quixstreams.internal_producer import InternalProducer
8+
from quixstreams.kafka.consumer import raise_for_msg_error
69
from quixstreams.models import Topic
710
from quixstreams.models.topics.manager import TopicManager
811
from quixstreams.utils.format import format_timestamp
912
from quixstreams.utils.json import dumps
1013

14+
if TYPE_CHECKING:
15+
from quixstreams.kafka import BaseConsumer
16+
1117
logger = logging.getLogger(__name__)
1218

1319
__all__ = ("WatermarkManager", "WatermarkMessage")
@@ -155,3 +161,175 @@ def _get_watermark(self) -> int:
155161
if watermarks := self._watermarks.values():
156162
watermark = min(watermarks)
157163
return watermark
164+
165+
def bootstrap_watermarks(self, consumer: "BaseConsumer") -> None:
166+
"""
167+
Bootstrap watermarks by reading the watermarks topic progressively.
168+
169+
This method uses an exponential backoff strategy:
170+
1. Try to read N messages from the end of the topic
171+
2. If not all topic-partitions are found, seek further back exponentially
172+
3. Continue until all TPs have watermarks or the beginning is reached
173+
174+
:param consumer: The Kafka consumer to use for reading watermarks
175+
"""
176+
watermarks_topic_name = self.watermarks_topic.name
177+
watermarks_partition = 0 # Watermarks topic always has 1 partition
178+
179+
# Get the expected topic-partitions that need watermarks
180+
expected_tps = set(self._watermarks.keys())
181+
if not expected_tps:
182+
logger.info("No topic-partitions to bootstrap watermarks for")
183+
return
184+
185+
logger.info(
186+
f"Bootstrapping watermarks for {len(expected_tps)} topic-partitions "
187+
f"from topic '{watermarks_topic_name}'. Expected TPs: {expected_tps}"
188+
)
189+
190+
# Get the high watermark (end offset) of the watermarks topic
191+
tp = TopicPartition(watermarks_topic_name, watermarks_partition)
192+
logger.debug(f"Getting watermark offsets for {watermarks_topic_name}...")
193+
try:
194+
_, high_offset = consumer.get_watermark_offsets(tp, timeout=5.0)
195+
logger.debug(f"Watermarks topic high offset: {high_offset}")
196+
except Exception as e:
197+
# If we can't get watermark offsets, the topic might not be ready yet
198+
# Log a warning but allow the application to start with -1 watermarks
199+
logger.warning(
200+
f"Failed to get watermark offsets for topic {watermarks_topic_name}: {e}. "
201+
f"Watermarks will start at -1 and be updated as messages arrive."
202+
)
203+
return
204+
205+
if high_offset == 0:
206+
logger.info("Watermarks topic is empty, no bootstrapping needed")
207+
return
208+
209+
# Progressive search parameters
210+
initial_lookback = 100 # Start by looking at last 100 messages
211+
lookback_step = min(initial_lookback, high_offset)
212+
found_tps: set[tuple[str, int]] = set()
213+
seek_offset = max(0, high_offset - lookback_step)
214+
215+
iteration_count = 0
216+
max_iterations = 20 # Safety limit to prevent infinite loops
217+
while found_tps != expected_tps:
218+
iteration_count += 1
219+
if iteration_count > max_iterations:
220+
missing_tps = expected_tps - found_tps
221+
raise RuntimeError(
222+
f"Bootstrap failed: exceeded {max_iterations} iterations. "
223+
f"Found {len(found_tps)}/{len(expected_tps)} topic-partitions. "
224+
f"Missing: {missing_tps}. This suggests a bug in the bootstrap logic."
225+
)
226+
logger.info(
227+
f"Bootstrap iteration {iteration_count}: seeking to offset {seek_offset} "
228+
f"(lookback_step={lookback_step}, found {len(found_tps)}/{len(expected_tps)} TPs)"
229+
)
230+
231+
# Seek to the calculated position
232+
tp_with_offset = TopicPartition(
233+
watermarks_topic_name, watermarks_partition, seek_offset
234+
)
235+
try:
236+
consumer.seek(tp_with_offset)
237+
logger.debug(f"Seeked to offset {seek_offset}")
238+
except Exception as e:
239+
logger.error(f"Failed to seek to offset {seek_offset}: {e}")
240+
raise
241+
242+
# Read messages from seek_offset towards previous seek_offset
243+
# or until all TPs are found
244+
messages_read = 0
245+
max_messages_to_read = lookback_step
246+
247+
# Timeout for this specific seek iteration (30 seconds)
248+
iteration_timeout = 30.0
249+
iteration_start_time = monotonic()
250+
consecutive_poll_timeouts = 0
251+
max_consecutive_poll_timeouts = 5 # Stop after 5 consecutive empty polls
252+
253+
while messages_read < max_messages_to_read:
254+
# Check if this iteration has timed out
255+
if monotonic() - iteration_start_time > iteration_timeout:
256+
missing_tps = expected_tps - found_tps
257+
raise TimeoutError(
258+
f"Bootstrap failed: polling timeout after {iteration_timeout}s for seek offset {seek_offset}. "
259+
f"Found {len(found_tps)}/{len(expected_tps)} topic-partitions. "
260+
f"Missing: {missing_tps}. Cannot start application without complete watermark state."
261+
)
262+
263+
msg = consumer.poll(timeout=1.0)
264+
if msg is None:
265+
consecutive_poll_timeouts += 1
266+
# If we've had many consecutive timeouts, assume we've read all available messages
267+
# in this range and move to the next iteration
268+
if consecutive_poll_timeouts >= max_consecutive_poll_timeouts:
269+
logger.info(
270+
f"No more messages available after {consecutive_poll_timeouts} empty polls at offset {seek_offset}, "
271+
f"moving to next iteration (read {messages_read}/{max_messages_to_read} messages)"
272+
)
273+
break
274+
continue
275+
276+
# Reset consecutive timeout counter when we get a message
277+
consecutive_poll_timeouts = 0
278+
279+
# Skip messages from other topics (shouldn't happen but be safe)
280+
if msg.topic() != watermarks_topic_name:
281+
continue
282+
283+
messages_read += 1
284+
285+
# Deserialize and process the watermark message
286+
try:
287+
# Raise if message has an error
288+
msg = raise_for_msg_error(msg)
289+
watermark_msg = self.watermarks_topic.deserialize(msg).value
290+
tp_key = (watermark_msg["topic"], watermark_msg["partition"])
291+
292+
# Only track if it's an expected TP
293+
if tp_key in expected_tps:
294+
timestamp = watermark_msg["timestamp"]
295+
# Update the watermark (use max to handle out-of-order reads)
296+
current = self._watermarks.get(tp_key, -1)
297+
self._watermarks[tp_key] = max(current, timestamp)
298+
found_tps.add(tp_key)
299+
300+
logger.debug(
301+
f"Bootstrapped watermark for {watermark_msg['topic']}[{watermark_msg['partition']}]: "
302+
f"{format_timestamp(timestamp)}"
303+
)
304+
305+
# Stop if we've found all TPs
306+
if found_tps == expected_tps:
307+
logger.info(
308+
f"Successfully bootstrapped all {len(expected_tps)} topic-partitions "
309+
f"after reading {messages_read} messages"
310+
)
311+
return
312+
313+
except Exception as e:
314+
logger.warning(f"Failed to deserialize watermark message: {e}")
315+
continue
316+
317+
# If we've read everything and still missing TPs, expand lookback exponentially
318+
if found_tps != expected_tps:
319+
if seek_offset == 0:
320+
# We've read the entire topic from the beginning
321+
missing_tps = expected_tps - found_tps
322+
logger.warning(
323+
f"Reached beginning of watermarks topic but {len(missing_tps)} "
324+
f"topic-partitions still have no watermarks: {missing_tps}. "
325+
f"They will remain at -1 until new watermarks arrive."
326+
)
327+
return
328+
329+
# Double the step and seek further back from current position
330+
lookback_step = min(lookback_step * 2, seek_offset)
331+
seek_offset = max(0, seek_offset - lookback_step)
332+
333+
logger.info(
334+
f"Finished bootstrapping watermarks: found {len(found_tps)}/{len(expected_tps)} topic-partitions"
335+
)
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
from unittest.mock import Mock
2+
3+
from quixstreams.models import Topic, TopicConfig
4+
from quixstreams.processing.watermarking import WatermarkManager
5+
6+
7+
class TestWatermarkBootstrap:
8+
"""
9+
Basic tests for watermark bootstrapping.
10+
11+
Note: Full testing of the progressive search algorithm requires integration
12+
tests with a real Kafka broker, as mocking the complex message polling and
13+
deserialization flow is error-prone and doesn't provide meaningful coverage.
14+
"""
15+
16+
def test_bootstrap_watermarks_empty_topic(self, topic_manager_factory):
17+
"""
18+
Test that bootstrap handles an empty watermarks topic gracefully.
19+
"""
20+
topic_manager = topic_manager_factory()
21+
producer = Mock()
22+
wm_manager = WatermarkManager(
23+
producer=producer, topic_manager=topic_manager, interval=1.0
24+
)
25+
26+
test_topic = Topic(
27+
name="topic1",
28+
value_deserializer="json",
29+
create_config=TopicConfig(num_partitions=1, replication_factor=1),
30+
)
31+
test_topic.broker_config = test_topic.create_config
32+
wm_manager.set_topics([test_topic])
33+
34+
consumer = Mock()
35+
consumer.get_watermark_offsets.return_value = (0, 0) # Empty topic
36+
37+
wm_manager.bootstrap_watermarks(consumer)
38+
39+
# Watermark should remain at -1
40+
assert wm_manager._watermarks[("topic1", 0)] == -1
41+
42+
# No seek should be called for empty topic
43+
consumer.seek.assert_not_called()
44+
45+
def test_bootstrap_watermarks_no_expected_tps(self, topic_manager_factory):
46+
"""
47+
Test that bootstrap handles the case where no topic-partitions are expected.
48+
"""
49+
topic_manager = topic_manager_factory()
50+
producer = Mock()
51+
wm_manager = WatermarkManager(
52+
producer=producer, topic_manager=topic_manager, interval=1.0
53+
)
54+
55+
# Don't set any topics - no expected TPs
56+
consumer = Mock()
57+
58+
wm_manager.bootstrap_watermarks(consumer)
59+
60+
# Should exit early without calling get_watermark_offsets
61+
consumer.get_watermark_offsets.assert_not_called()
62+
63+
def test_bootstrap_watermarks_exponential_backoff(self, topic_manager_factory):
64+
"""
65+
Test that bootstrap uses exponential backoff when not all TPs are found.
66+
67+
This test verifies true exponential backoff WITHOUT re-reading:
68+
1. Initial seek: offset 900 (1000 - 100), read 100 messages to offset 1000
69+
2. If not all TPs found, seek back: offset 700 (900 - 200), read 200 messages to offset 900
70+
3. If still not found, seek back: offset 300 (700 - 400), read 400 messages to offset 700
71+
4. Continues until all TPs found or offset 0 is reached
72+
73+
Key: Each iteration seeks BACK from the previous position, not from high_offset.
74+
This avoids re-reading the same messages multiple times.
75+
"""
76+
topic_manager = topic_manager_factory()
77+
producer = Mock()
78+
wm_manager = WatermarkManager(
79+
producer=producer, topic_manager=topic_manager, interval=1.0
80+
)
81+
82+
# Set up 3 topic-partitions
83+
test_topic = Topic(
84+
name="topic1",
85+
value_deserializer="json",
86+
create_config=TopicConfig(num_partitions=3, replication_factor=1),
87+
)
88+
test_topic.broker_config = test_topic.create_config
89+
wm_manager.set_topics([test_topic])
90+
91+
consumer = Mock()
92+
consumer.get_watermark_offsets.return_value = (0, 1000)
93+
94+
# Track which iteration we're in
95+
poll_count = [0]
96+
found_partitions = set()
97+
98+
def mock_poll(timeout):
99+
poll_count[0] += 1
100+
101+
# First iteration: seeking to 900, reading toward 1000
102+
# Return messages for partitions 0 and 1 only
103+
if consumer.seek.call_count == 1:
104+
if poll_count[0] <= 100:
105+
# Return watermarks for partitions 0 and 1
106+
partition = 0 if poll_count[0] % 2 == 0 else 1
107+
if partition not in found_partitions:
108+
found_partitions.add(partition)
109+
return create_mock_watermark_message(
110+
wm_manager.watermarks_topic,
111+
"topic1",
112+
partition,
113+
1000 + poll_count[0],
114+
)
115+
else:
116+
return None # Timeout to trigger next iteration
117+
118+
# Second iteration: seeking to 700 (900 - 200), reading toward 900
119+
# Now include partition 2
120+
elif consumer.seek.call_count == 2:
121+
# Return watermark for partition 2
122+
if poll_count[0] % 3 == 0:
123+
found_partitions.add(2)
124+
return create_mock_watermark_message(
125+
wm_manager.watermarks_topic, "topic1", 2, 3000
126+
)
127+
return None # Return None to speed up the loop
128+
129+
return None
130+
131+
consumer.poll.side_effect = mock_poll
132+
133+
# Run bootstrap
134+
wm_manager.bootstrap_watermarks(consumer)
135+
136+
# Verify exponential backoff happened
137+
assert consumer.seek.call_count >= 2, "Should have seeked at least twice"
138+
139+
# Verify seek offsets show TRUE exponential backoff (seeking backwards, not from high_offset)
140+
seek_calls = consumer.seek.call_args_list
141+
first_seek = seek_calls[0][0][0]
142+
second_seek = seek_calls[1][0][0]
143+
144+
# First seek: offset = 1000 - 100 = 900
145+
assert (
146+
first_seek.offset == 900
147+
), f"First seek should be at 900, got {first_seek.offset}"
148+
149+
# Second seek: offset = 900 - 200 = 700 (seeking back from previous position)
150+
assert (
151+
second_seek.offset == 700
152+
), f"Second seek should be at 700 (900 - 200), got {second_seek.offset}"
153+
154+
# All watermarks should be set
155+
assert wm_manager._watermarks[("topic1", 0)] > -1
156+
assert wm_manager._watermarks[("topic1", 1)] > -1
157+
assert wm_manager._watermarks[("topic1", 2)] > -1
158+
159+
160+
def create_mock_watermark_message(watermarks_topic, topic, partition, timestamp):
161+
"""
162+
Helper to create a properly mocked Kafka message.
163+
"""
164+
msg = Mock()
165+
msg.error.return_value = None
166+
msg.topic.return_value = watermarks_topic.name
167+
msg.partition.return_value = 0
168+
msg.offset.return_value = 1
169+
170+
# Create a mock Row that will be returned by deserialize
171+
mock_row = Mock()
172+
mock_row.value = {
173+
"topic": topic,
174+
"partition": partition,
175+
"timestamp": timestamp,
176+
}
177+
178+
# Mock the deserialize method to return our row
179+
watermarks_topic.deserialize = Mock(return_value=mock_row)
180+
181+
return msg

0 commit comments

Comments
 (0)