@@ -269,6 +269,74 @@ def test_trigger_backpressure(self, topic_manager_factory, internal_consumer):
269269 )
270270 )
271271
272+ def test_trigger_backpressure_pauses_and_resumes_watermarks_topic (
273+ self , topic_manager_factory , internal_consumer
274+ ):
275+ """
276+ Test that watermarks topics are paused during backpressure
277+ (along with data topics but not changelog topics),
278+ and properly resumed when backpressure is lifted.
279+ """
280+ topic_manager = topic_manager_factory ()
281+ data_topic = topic_manager .topic (
282+ name = str (uuid .uuid4 ()),
283+ create_config = TopicConfig (num_partitions = 1 , replication_factor = 1 ),
284+ )
285+ # Create a changelog topic
286+ changelog = topic_manager .changelog_topic (
287+ stream_id = data_topic .name ,
288+ store_name = "default" ,
289+ config = data_topic .broker_config ,
290+ )
291+ # Create a watermarks topic
292+ watermarks = topic_manager .watermarks_topic ()
293+ offset_to_seek = 999
294+
295+ internal_consumer .subscribe ([data_topic , changelog , watermarks ])
296+ while not internal_consumer .assignment ():
297+ internal_consumer .poll (0.1 )
298+
299+ # Trigger backpressure with immediate resume (resume_after=0)
300+ with patch .object (InternalConsumer , "pause" ) as pause_mock :
301+ internal_consumer .trigger_backpressure (
302+ resume_after = 0 , # Allow immediate resume for testing
303+ offsets_to_seek = {(data_topic .name , 0 ): offset_to_seek },
304+ )
305+
306+ # Verify data topic and watermarks topic are paused, but not changelog
307+ paused_topics = {
308+ call .kwargs ["partitions" ][0 ].topic for call in pause_mock .call_args_list
309+ }
310+ assert data_topic .name in paused_topics
311+ assert watermarks .name in paused_topics
312+ assert changelog .name not in paused_topics
313+
314+ # Verify they're marked as backpressured
315+ assert (
316+ TopicPartition (topic = data_topic .name , partition = 0 )
317+ in internal_consumer .backpressured_tps
318+ )
319+ assert (
320+ TopicPartition (topic = watermarks .name , partition = 0 )
321+ in internal_consumer .backpressured_tps
322+ )
323+
324+ # Test resuming
325+ with patch .object (InternalConsumer , "resume" ) as resume_mock :
326+ internal_consumer .resume_backpressured ()
327+
328+ # Verify both data topic and watermarks topic are resumed
329+ resumed_topics = {
330+ call .kwargs ["partitions" ][0 ].topic for call in resume_mock .call_args_list
331+ }
332+ assert data_topic .name in resumed_topics
333+ assert watermarks .name in resumed_topics
334+ # Ensure changelog was never resumed (it was never paused)
335+ assert changelog .name not in resumed_topics
336+
337+ # Verify backpressured set is cleared
338+ assert len (internal_consumer .backpressured_tps ) == 0
339+
272340 def test_resume_backpressured_nothing_paused (
273341 self , internal_consumer , topic_manager_factory
274342 ):
0 commit comments