@@ -179,15 +179,48 @@ func TestCoordinatorFrontier(t *testing.T) {
179179}
180180
181181type frontier interface {
182- AddSpansAt (startAt hlc.Timestamp , spans ... roachpb.Span ) error
182+ AddSpansAt (hlc.Timestamp , ... roachpb.Span ) error
183183 Frontier () hlc.Timestamp
184+ Forward (roachpb.Span , hlc.Timestamp ) (bool , error )
184185 ForwardResolvedSpan (jobspb.ResolvedSpan ) (bool , error )
185186 InBackfill (jobspb.ResolvedSpan ) bool
186187 AtBoundary () (bool , jobspb.ResolvedSpan_BoundaryType , hlc.Timestamp )
187188 All () iter.Seq [jobspb.ResolvedSpan ]
188189 Frontiers () iter.Seq2 [descpb.ID , span.ReadOnlyFrontier ]
189190}
190191
192+ func newFrontier (
193+ t testing.TB ,
194+ frontierType string ,
195+ statementTime hlc.Timestamp ,
196+ initialHighwater hlc.Timestamp ,
197+ codec resolvedspan.TableCodec ,
198+ perTableTracking bool ,
199+ spans ... roachpb.Span ,
200+ ) (frontier , error ) {
201+ switch frontierType {
202+ case "aggregator" :
203+ return resolvedspan .NewAggregatorFrontier (
204+ statementTime ,
205+ initialHighwater ,
206+ codec ,
207+ perTableTracking ,
208+ spans ... ,
209+ )
210+ case "coordinator" :
211+ return resolvedspan .NewCoordinatorFrontier (
212+ statementTime ,
213+ initialHighwater ,
214+ codec ,
215+ perTableTracking ,
216+ spans ... ,
217+ )
218+ default :
219+ t .Fatalf ("unknown frontier type: %s" , frontierType )
220+ }
221+ panic ("unreachable" )
222+ }
223+
191224func testBackfillSpan (
192225 t * testing.T , f frontier , start , end string , ts hlc.Timestamp , frontierAfterSpan hlc.Timestamp ,
193226) {
@@ -379,29 +412,15 @@ func TestFrontierPerTableResolvedTimestamps(t *testing.T) {
379412 initialHighWater := hlc.Timestamp {}
380413
381414 // Create frontier with multiple table spans.
382- f , err := func () (frontier , error ) {
383- switch frontierType {
384- case "aggregator" :
385- return resolvedspan .NewAggregatorFrontier (
386- statementTime ,
387- initialHighWater ,
388- codec ,
389- true , /* perTableTracking */
390- tableSpans ... ,
391- )
392- case "coordinator" :
393- return resolvedspan .NewCoordinatorFrontier (
394- statementTime ,
395- initialHighWater ,
396- codec ,
397- true , /* perTableTracking */
398- tableSpans ... ,
399- )
400- default :
401- t .Fatalf ("unknown frontier type: %s" , frontierType )
402- }
403- panic ("unreachable" )
404- }()
415+ f , err := newFrontier (
416+ t ,
417+ frontierType ,
418+ statementTime ,
419+ initialHighWater ,
420+ codec ,
421+ true , /* perTableTracking */
422+ tableSpans ... ,
423+ )
405424 require .NoError (t , err )
406425 require .Equal (t , initialHighWater , f .Frontier ())
407426
@@ -503,29 +522,15 @@ func TestFrontierForwardFullTableSpan(t *testing.T) {
503522 statementTime := makeTS (5 )
504523 var initialHighWater hlc.Timestamp
505524
506- f , err := func () (span.Frontier , error ) {
507- switch frontierType {
508- case "aggregator" :
509- return resolvedspan .NewAggregatorFrontier (
510- statementTime ,
511- initialHighWater ,
512- codec ,
513- true , /* perTableTracking */
514- tableSpans ... ,
515- )
516- case "coordinator" :
517- return resolvedspan .NewCoordinatorFrontier (
518- statementTime ,
519- initialHighWater ,
520- codec ,
521- true , /* perTableTracking */
522- tableSpans ... ,
523- )
524- default :
525- t .Fatalf ("unknown frontier type: %s" , frontierType )
526- }
527- panic ("unreachable" )
528- }()
525+ f , err := newFrontier (
526+ t ,
527+ frontierType ,
528+ statementTime ,
529+ initialHighWater ,
530+ codec ,
531+ true , /* perTableTracking */
532+ tableSpans ... ,
533+ )
529534 require .NoError (t , err )
530535 require .Equal (t , initialHighWater , f .Frontier ())
531536
@@ -610,26 +615,14 @@ FROM [SHOW RANGES FROM TABLE foo WITH KEYS]`)
610615 now := makeTS (timeutil .Now ().Unix ())
611616
612617 // Create the frontier and add all the spans.
613- f , err := func () (frontier , error ) {
614- switch frontierType {
615- case "aggregator" :
616- return resolvedspan .NewAggregatorFrontier (
617- now ,
618- now ,
619- codec ,
620- perTableTracking ,
621- )
622- case "coordinator" :
623- return resolvedspan .NewCoordinatorFrontier (
624- now ,
625- now ,
626- codec ,
627- perTableTracking ,
628- )
629- default :
630- panic ("unreachable" )
631- }
632- }()
618+ f , err := newFrontier (
619+ b ,
620+ frontierType ,
621+ now ,
622+ now ,
623+ codec ,
624+ perTableTracking ,
625+ )
633626 require .NoError (b , err )
634627 require .NoError (b , f .AddSpansAt (now , spans ... ))
635628
@@ -655,3 +648,63 @@ FROM [SHOW RANGES FROM TABLE foo WITH KEYS]`)
655648 }
656649 }
657650}
651+
652+ func TestFrontierAtBoundary (t * testing.T ) {
653+ defer leaktest .AfterTest (t )()
654+ defer log .Scope (t ).Close (t )
655+
656+ rnd , _ := randutil .NewTestRand ()
657+ perTableTracking := rnd .Intn (2 ) == 0
658+ t .Logf ("per-table tracking: %t" , perTableTracking )
659+
660+ testutils .RunValues (t , "frontier type" , []string {"aggregator" , "coordinator" },
661+ func (t * testing.T , frontierType string ) {
662+ statementTime := makeTS (timeutil .Now ().Unix ())
663+ var initialHighWater hlc.Timestamp
664+ f , err := newFrontier (
665+ t ,
666+ frontierType ,
667+ statementTime ,
668+ initialHighWater ,
669+ mockCodec {},
670+ perTableTracking ,
671+ makeSpan ("a" , "f" ),
672+ )
673+ require .NoError (t , err )
674+
675+ // We can't be at boundary until a boundary is set.
676+ atBoundary , _ , _ := f .AtBoundary ()
677+ require .False (t , atBoundary )
678+ _ , err = f .ForwardResolvedSpan (jobspb.ResolvedSpan {
679+ Span : makeSpan ("a" , "f" ),
680+ Timestamp : statementTime ,
681+ })
682+ require .NoError (t , err )
683+ atBoundary , _ , _ = f .AtBoundary ()
684+ require .False (t , atBoundary )
685+
686+ // Set a boundary by forwarding part of the span space.
687+ ts := statementTime .AddDuration (3 * time .Second )
688+ _ , err = f .ForwardResolvedSpan (jobspb.ResolvedSpan {
689+ Span : makeSpan ("a" , "c" ),
690+ Timestamp : ts ,
691+ BoundaryType : jobspb .ResolvedSpan_BACKFILL ,
692+ })
693+ require .NoError (t , err )
694+ atBoundary , _ , _ = f .AtBoundary ()
695+ require .False (t , atBoundary )
696+
697+ // Verify the boundary is reached after forwarding
698+ // the rest of the span space.
699+ _ , err = f .ForwardResolvedSpan (jobspb.ResolvedSpan {
700+ Span : makeSpan ("c" , "f" ),
701+ Timestamp : ts ,
702+ BoundaryType : jobspb .ResolvedSpan_BACKFILL ,
703+ })
704+ require .NoError (t , err )
705+ atBoundary , boundaryType , boundaryTS := f .AtBoundary ()
706+ require .True (t , atBoundary )
707+ require .Equal (t , jobspb .ResolvedSpan_BACKFILL , boundaryType )
708+ require .Equal (t , ts , boundaryTS )
709+ })
710+ }
0 commit comments