@@ -409,7 +409,7 @@ def test_quorum_loss(self):
409409 self .call ("send_message" , "terminal" )
410410
411411
412- class EnableReplication (ReplicationTest ):
412+ class EnableReplicationTest (ReplicationTest ):
413413 AUTOPUBLISH = False
414414
415415 def __init__ (self , * args , ** kwargs ):
@@ -423,32 +423,41 @@ def run_counter(self, id, n = 100):
423423 self .expected_counter_rows .append ({"id" : id , "value" : n })
424424 self .assertEqual (self .collect_counter_rows (), self .expected_counter_rows )
425425
426- def test_enable_replication (self ):
427- """Tests enabling and disabling replication"""
426+ def subscribe_to_enable_replication_events (self ):
427+ id = self .cluster .get_db_id ()
428+ return self .subscribe (
429+ f"select * from staged_enable_replication_event where database_id={ id } " ,
430+ n = 2 ,
431+ database = "spacetime-control"
432+ )
433+
434+ def assert_bootstrap_complete (self , sub ):
435+ events = sub ()
436+ self .assertEqual (
437+ events [- 1 ]['staged_enable_replication_event' ]['inserts' ][0 ]['message' ],
438+ 'bootstrap complete' ,
439+ )
440+
441+ def enable_replication (self , database_name ):
442+ sub = self .subscribe_to_enable_replication_events ()
443+ self .call_control ("enable_replication" , {"Name" : database_name }, 3 )
444+ self .assert_bootstrap_complete (sub )
445+
446+ class EnableReplicationUnsuspended (EnableReplicationTest ):
447+ def test_enable_replication_fails_if_not_suspended (self ):
448+ """Tests that the database to enable replication on must be suspended"""
428449
429450 self .add_me_as_admin ()
430451 name = random_string ()
431- n = 100
432452
433453 self .publish_module (name , num_replicas = 1 )
434454 self .cluster .wait_for_leader_change (None )
435455
436- # start un-replicated
437- self .run_counter (1 , n )
438- # enable replication
439- self .call_control ("enable_replication" , {"Name" : name }, 3 )
440- self .run_counter (2 , n )
441- # disable replication
442- self .call_control ("disable_replication" , {"Name" : name })
443- self .run_counter (3 , n )
444- # enable it one more time
445- self .call_control ("enable_replication" , {"Name" : name }, 3 )
446- self .run_counter (4 , n )
456+ with self .assertRaises (Exception ):
457+ self .call_control ("enable_replication" , {"Name" : name }, 3 )
447458
448459
449- class EnableReplicationSuspended (ReplicationTest ):
450- AUTOPUBLISH = False
451-
460+ class EnableReplicationSuspended (EnableReplicationTest ):
452461 def test_enable_replication_on_suspended_database (self ):
453462 """Tests that we can enable replication on a suspended database"""
454463
@@ -459,14 +468,12 @@ def test_enable_replication_on_suspended_database(self):
459468 self .cluster .wait_for_leader_change (None )
460469 self .cluster .ensure_leader_health (1 )
461470
462- id = self .cluster .get_db_id ()
463-
464471 self .call_control ("suspend_database" , {"Name" : name })
465472 # Database is now unreachable.
466473 with self .assertRaises (Exception ):
467474 self .call ("send_message" , "hi" )
468475
469- self .call_control ( " enable_replication" , { "Name" : name }, 3 )
476+ self .enable_replication ( name )
470477 # Still unreachable until we call unsuspend.
471478 with self .assertRaises (Exception ):
472479 self .call ("send_message" , "hi" )
@@ -475,11 +482,25 @@ def test_enable_replication_on_suspended_database(self):
475482 self .cluster .wait_for_leader_change (None )
476483 self .cluster .ensure_leader_health (2 )
477484
478- # We can't direcly observe that there are indeed three replicas running,
479- # so as a sanity check inspect the event log.
480- rows = self .cluster .read_controldb (
481- f"select message from staged_enable_replication_event where database_id={ id } " )
482- self .assertEqual (rows , [
483- {'message' : '"bootstrap requested"' },
484- {'message' : '"bootstrap complete"' },
485- ])
485+ class EnableDisableReplication (EnableReplicationTest ):
486+ def test_enable_disable_replication (self ):
487+ """Tests that we can enable then disable replication"""
488+
489+ self .add_me_as_admin ()
490+ name = random_string ()
491+
492+ self .publish_module (name , num_replicas = 1 )
493+ self .cluster .wait_for_leader_change (None )
494+
495+ # suspend first
496+ self .call_control ("suspend_database" , {"Name" : name })
497+ # enable replication and wait for it to complete
498+ self .enable_replication (name )
499+ # unsuspend
500+ self .call_control ("unsuspend_database" , {"Name" : name })
501+
502+ self .cluster .wait_for_leader_change (None )
503+ self .run_counter (1 , 100 )
504+
505+ self .call_control ("disable_replication" , {"Name" : name })
506+ self .run_counter (2 , 100 )
0 commit comments