@@ -630,6 +630,62 @@ def on_watermark(value, key, timestamp, headers):
630630 assert ("branch2" , watermark_timestamp ) in watermark_calls
631631 assert ("main" , watermark_timestamp ) in watermark_calls
632632
633+ def test_watermarks_not_passed_to_apply_functions (self ):
634+ """
635+ Test that watermarks are not passed to apply/filter/update functions.
636+ User functions should only process actual data, not watermarks.
637+ """
638+ apply_calls = []
639+ filter_calls = []
640+ update_calls = []
641+
642+ def track_apply (v ):
643+ apply_calls .append (v )
644+ return v + 1
645+
646+ def track_filter (v ):
647+ filter_calls .append (v )
648+ return True
649+
650+ def track_update (v ):
651+ update_calls .append (v )
652+
653+ # Create a stream with various function types
654+ stream = Stream ()
655+ stream = stream .add_apply (track_apply )
656+ stream = stream .add_filter (track_filter )
657+ stream = stream .add_update (track_update )
658+
659+ sink = Sink ()
660+ key , timestamp , headers = "key" , 1000 , []
661+
662+ # Execute with regular messages
663+ executor = stream .compose_single (sink = sink .append_record )
664+ executor (10 , key , timestamp , headers , is_watermark = False )
665+ executor (20 , key , timestamp , headers , is_watermark = False )
666+
667+ # Verify regular messages were processed
668+ assert apply_calls == [10 , 20 ]
669+ assert filter_calls == [11 , 21 ]
670+ assert update_calls == [11 , 21 ]
671+ assert len (sink ) == 2
672+
673+ # Clear tracking
674+ apply_calls .clear ()
675+ filter_calls .clear ()
676+ update_calls .clear ()
677+ sink .clear ()
678+
679+ # Send watermarks
680+ executor (None , None , 2000 , [], is_watermark = True )
681+ executor (None , None , 3000 , [], is_watermark = True )
682+
683+ # Verify watermarks were NOT processed by user functions
684+ assert apply_calls == []
685+ assert filter_calls == []
686+ assert update_calls == []
687+ assert len (sink ) == 0 # Watermarks should not reach the sink
688+
633689
634690class TestStreamMerge :
635691 def test_merge_different_streams_success (self ):
0 commit comments