diff --git a/doc/source/serve/api/index.md b/doc/source/serve/api/index.md index af6540bd5af5..2e30d20d4b57 100644 --- a/doc/source/serve/api/index.md +++ b/doc/source/serve/api/index.md @@ -108,6 +108,7 @@ See the [model composition guide](serve-model-composition) for how to update cod serve.schema.AutoscalingStatus serve.schema.ScalingDecision serve.schema.DeploymentAutoscalingDetail + serve.schema.ReplicaRank ``` ### Request Router diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 7c5e27e4e164..9460147e791a 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -70,6 +70,7 @@ from ray.serve.schema import ( DeploymentDetails, ReplicaDetails, + ReplicaRank, _deployment_info_to_schema, ) from ray.util.placement_group import PlacementGroup @@ -1458,248 +1459,180 @@ def __repr__(self): return repr(self._replicas) -class DeploymentRankManager: - """Manages replica ranks for a deployment. - This class handles rank assignment, release, consistency checking, and reassignment. - It maintains the rank system invariants and provides a clean interface for rank operations. - """ +class RankManager: + """Manages ranks for a single node.""" - def __init__(self, _fail_on_error: Optional[bool] = None): - # Maps replica_id to assigned rank - self._replica_ranks: Dict[str, int] = {} - # Set of available ranks (initially empty, grows as target replicas change) + def __init__(self): + self._ranks: Dict[str, int] = {} self._released_ranks: Set[int] = set() - # Next rank to assign (increments as new replicas are created) self._next_rank: int = 0 - # Whether to fail on rank errors (for testing control) - self._fail_on_error = ( - _fail_on_error - if _fail_on_error is not None - else RAY_SERVE_FAIL_ON_RANK_ERROR - ) - def assign_rank(self, replica_id: str) -> int: - """Assign a rank to a new replica. - Args: - replica_id: The unique ID of the replica - Returns: - The assigned rank - Raises: - RuntimeError: If the replica already has a rank assigned - """ - if replica_id in self._replica_ranks: - raise RuntimeError( - f"Replica {replica_id} already has a rank assigned: {self._replica_ranks[replica_id]}" - ) + def assign_rank(self, key: str) -> int: + if key in self._ranks: + raise RuntimeError(f"Rank for {key} already assigned: {self._ranks[key]}") - # First try to reuse an available rank if self._released_ranks: + # Reuse the smallest released rank rank = min(self._released_ranks) self._released_ranks.remove(rank) else: - # Otherwise use the next available rank + # Assign the next available rank + # This is the first time we're assigning a rank to this replica rank = self._next_rank self._next_rank += 1 - self._replica_ranks[replica_id] = rank + self._ranks[key] = rank return rank - def release_rank(self, replica_id: str) -> None: - """Release a rank when a replica is stopped. - Args: - replica_id: The unique ID of the replica whose rank should be released - """ - if replica_id not in self._replica_ranks: - raise RuntimeError(f"Replica {replica_id} has no rank assigned") - - rank = self._replica_ranks.pop(replica_id) + def release_rank(self, key: str) -> None: + if key not in self._ranks: + raise RuntimeError(f"Rank for {key} not assigned") + rank = self._ranks.pop(key) + # Add the released rank to the set of released ranks + # This rank can be reused for a new replica self._released_ranks.add(rank) - def recover_rank(self, replica_id: str, rank: int) -> None: - """Recover a rank from a live replica during controller restart. - Args: - replica_id: The unique ID of the replica - rank: The rank to recover - Raises: - RuntimeError: If the replica already has a rank or the rank is invalid - ValueError: If the rank is invalid (negative) - """ - if replica_id in self._replica_ranks: - raise RuntimeError(f"Replica {replica_id} already has a rank assigned") - - self._replica_ranks[replica_id] = rank - - # Update available ranks tracking - if rank in self._released_ranks: - self._released_ranks.remove(rank) - - # Update next_rank to ensure we don't assign duplicates + def recover_rank(self, key: str, rank: int) -> None: + if key in self._ranks: + raise RuntimeError(f"Rank for {key} already assigned: {self._ranks[key]}") + self._ranks[key] = rank + self._released_ranks.discard(rank) if rank >= self._next_rank: self._next_rank = rank + 1 - def get_replica_rank(self, replica_id: str) -> Optional[int]: - """Get the rank assigned to a replica. - Args: - replica_id: The unique ID of the replica - Returns: - The assigned rank, or None if no rank is assigned - """ - if replica_id not in self._replica_ranks: - raise RuntimeError(f"Replica {replica_id} has no rank assigned") - return self._replica_ranks.get(replica_id) + def get_rank(self, key: str) -> int: + if key not in self._ranks: + raise RuntimeError(f"Rank for {key} not assigned") + return self._ranks[key] - def get_replica_ranks_mapping(self) -> Dict[str, int]: - """Get a copy of the current replica ranks mapping. - Returns: - A copy of the replica_id to rank mapping - """ - return self._replica_ranks.copy() + def has_rank(self, key: str) -> bool: + return key in self._ranks + + def get_ranks_mapping(self) -> Dict[str, int]: + return self._ranks.copy() + + def clear(self) -> None: + self._ranks.clear() + self._released_ranks.clear() + self._next_rank = 0 def check_rank_consistency_and_reassign_minimally( self, - active_replicas: List["DeploymentReplica"], - ) -> List["DeploymentReplica"]: + active_keys: List[str], + ) -> List[str]: """Verify rank system invariants and reassign ranks when needed. + This method ensures: - 1. All active replicas have ranks + 1. All active keys have ranks 2. No duplicate ranks exist - 3. Ranks are contiguous when at target replica count + 3. Ranks are contiguous when at target count + Args: - active_replicas: List of currently active replicas + active_keys: List of currently active keys + Returns: - List of replicas that need to be reconfigured with new ranks + List of keys that need to be reconfigured with new ranks + Raises: - RuntimeError: If rank system invariants are violated + RuntimeError: If rank system invariants are violated and fail_on_error=True """ - if not active_replicas: + if not active_keys: return [] - active_replica_ids = { - replica.replica_id.unique_id for replica in active_replicas - } - replica_ids_needs_reconfiguration = set() + active_keys_set = set(active_keys) # Check for stale ranks - this should never happen - stale_replica_ids = set(self._replica_ranks.keys()) - active_replica_ids - if stale_replica_ids: + stale_keys = set(self._ranks.keys()) - active_keys_set + if stale_keys: logger.error( - f"Found stale ranks for replicas: {stale_replica_ids}. " + f"Found stale ranks for keys: {stale_keys}. " "This should never happen. Please report this as a bug." ) - if self._fail_on_error: - raise RuntimeError("Controller rank system is in an invalid state.") - # TODO (abrar): handle this case by removing the stale ranks, but remove this when - # RAY_SERVE_FAIL_ON_RANK_ERROR is set to 1 in the future - for replica_id in stale_replica_ids: - self.release_rank(replica_id) - replica_ids_needs_reconfiguration.add(replica_id) - - # Verify system invariants - all active replicas must have ranks - unranked_replica_ids = active_replica_ids - set(self._replica_ranks.keys()) - if unranked_replica_ids: + raise RuntimeError("Rank system is in an invalid state.") + + # Verify system invariants - all active keys must have ranks + unranked_keys = active_keys_set - set(self._ranks.keys()) + if unranked_keys: logger.error( - f"Found active replicas without ranks: {unranked_replica_ids}. " + f"Found active keys without ranks: {unranked_keys}. " "This should never happen. Please report this as a bug." ) - if self._fail_on_error: - raise RuntimeError("Controller rank system is in an invalid state.") - # TODO (abrar): handle this case by assigning new ranks to the unranked replicas - # but remove this when RAY_SERVE_FAIL_ON_RANK_ERROR is set to 1 in the future - for replica_id in unranked_replica_ids: - self.assign_rank(replica_id) - replica_ids_needs_reconfiguration.add(replica_id) + raise RuntimeError("Rank system is in an invalid state.") # Check for duplicate ranks - this should never happen rank_counts = {} - for replica_id, rank in self._replica_ranks.copy().items(): - if replica_id in active_replica_ids: # Only check active replicas + for key, rank in self._ranks.copy().items(): + if key in active_keys_set: # Only check active keys rank_counts[rank] = rank_counts.get(rank, 0) + 1 if rank_counts[rank] > 1: logger.error( - f"Found duplicate rank {rank} assigned to multiple replicas. " + f"Found duplicate rank {rank} assigned to multiple keys. " "This should never happen. Please report this as a bug." ) - if self._fail_on_error: - raise RuntimeError( - "Controller rank system is in an invalid state." - ) - # TODO (abrar): handle this case by releasing the rank of the replica with the duplicate rank - # and assigning a new rank to the replica with the duplicate rank - # but remove this when RAY_SERVE_FAIL_ON_RANK_ERROR is set to 1 in the future - self._replica_ranks.pop(replica_id) - self.assign_rank(replica_id) - replica_ids_needs_reconfiguration.add(replica_id) + raise RuntimeError("Rank system is in an invalid state.") # Check if we need to reassign ranks for contiguity - # Only force contiguity when at target replica count (e.g., after autoscaling down) - current_ranks = sorted(self._replica_ranks.values()) - expected_ranks = list(range(len(active_replicas))) + # Only force contiguity when at target count (e.g., after autoscaling down) + current_ranks = sorted(self._ranks.values()) + expected_ranks = list(range(len(active_keys))) - replicas_needing_reconfiguration = [] + keys_needing_reconfiguration_from_reassignment = [] if current_ranks != expected_ranks: logger.debug( - f"Deployment at target replica count but ranks are not contiguous. " + f"At target count but ranks are not contiguous. " f"Current: {current_ranks}, Expected: {expected_ranks}. " "Performing minimal reassignment." ) - replicas_needing_reconfiguration.extend( - self._perform_minimal_rank_reassignment(active_replicas) + keys_needing_reconfiguration_from_reassignment = ( + self._perform_minimal_rank_reassignment(active_keys) ) - # TODO (abrar): remove this when RAY_SERVE_FAIL_ON_RANK_ERROR is set to 1 in the future - for replica in active_replicas: - if replica.replica_id.unique_id in replica_ids_needs_reconfiguration: - replicas_needing_reconfiguration.append(replica) + return keys_needing_reconfiguration_from_reassignment - return replicas_needing_reconfiguration - - def _perform_minimal_rank_reassignment( - self, active_replicas: List["DeploymentReplica"] - ) -> List["DeploymentReplica"]: + def _perform_minimal_rank_reassignment(self, active_keys: List[str]) -> List[str]: """Perform minimal rank reassignment to achieve contiguity. - This method reassigns ranks while minimizing the number of replicas that need + + This method reassigns ranks while minimizing the number of keys that need to be reconfigured. It prioritizes keeping existing ranks when possible. + Args: - active_replicas: List of currently active replicas + active_keys: List of currently active keys + Returns: - List of replicas that need to be reconfigured with new ranks + List of keys that need to be reconfigured with new ranks """ - target_ranks_set = set(range(len(active_replicas))) + target_ranks_set = set(range(len(active_keys))) - # Find which replicas need new ranks - replicas_needing_ranks = [] - replicas_keeping_ranks = [] + # Find which keys need new ranks + keys_needing_ranks = [] + keys_keeping_ranks = [] - for replica in active_replicas: - replica_id = replica.replica_id.unique_id - current_rank = self.get_replica_rank(replica_id) + for key in active_keys: + current_rank = self.get_rank(key) if current_rank in target_ranks_set: - # This replica can keep its rank + # This key can keep its rank target_ranks_set.remove(current_rank) # O(1) operation - replicas_keeping_ranks.append(replica) + keys_keeping_ranks.append(key) else: - # This replica needs a new rank - replicas_needing_ranks.append(replica) + # This key needs a new rank + keys_needing_ranks.append(key) # Convert remaining target ranks to sorted list for deterministic assignment available_ranks = sorted(target_ranks_set) - # Assign new ranks to replicas that need them - for i, replica in enumerate(replicas_needing_ranks): - replica_id = replica.replica_id.unique_id + # Assign new ranks to keys that need them + for i, key in enumerate(keys_needing_ranks): new_rank = available_ranks[i] # O(1) operation # Store the old rank before updating - old_rank = self._replica_ranks[replica_id] + old_rank = self._ranks[key] - logger.debug( - f"Reassigning replica {replica_id}: rank {old_rank} -> {new_rank}" - ) + logger.debug(f"Reassigning key {key}: rank {old_rank} -> {new_rank}") # Update the rank mapping - self._replica_ranks[replica_id] = new_rank + self._ranks[key] = new_rank # Remove the newly assigned rank from available ranks self._released_ranks.discard(new_rank) # Add the old rank back to available ranks for reuse @@ -1707,17 +1640,204 @@ def _perform_minimal_rank_reassignment( # Log the reassignment summary logger.debug( - f"Minimal reassignment complete: {len(replicas_keeping_ranks)} replicas kept ranks, " - f"{len(replicas_needing_ranks)} replicas reassigned" + f"Minimal reassignment complete: {len(keys_keeping_ranks)} keys kept ranks, " + f"{len(keys_needing_ranks)} keys reassigned" + ) + + return keys_needing_ranks + + +class DeploymentRankManager: + """Manages replica ranks for a deployment. + This class handles rank assignment, release, consistency checking, and reassignment. + It maintains the rank system invariants and provides a clean interface for rank operations. + + Maintains one level of rank tracking: + - Global rank: Replica-level rank across all nodes (0, 1, 2, ...) + """ + + def __init__(self, fail_on_rank_error: bool = True): + # Global rank manager (existing replica-level rank) + self._replica_rank_manager = RankManager() + self._fail_on_rank_error = fail_on_rank_error + + def _execute_with_error_handling(self, func, safe_default, *args, **kwargs): + if self._fail_on_rank_error: + # Let exceptions propagate + return func(*args, **kwargs) + else: + # Catch exceptions and return safe default + try: + return func(*args, **kwargs) + except Exception as e: + logger.error(f"Error executing function {func.__name__}: {e}") + return safe_default + + def assign_rank(self, replica_id: str) -> ReplicaRank: + """Assign a rank to a new replica. + + Args: + replica_id: The unique ID of the replica + + Returns: + ReplicaRank object with the assigned rank + + Raises: + RuntimeError: If the replica already has a rank assigned + """ + + def _assign_rank_impl(): + if self.has_replica_rank(replica_id): + raise RuntimeError( + f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}" + ) + + # Assign global rank + rank = self._replica_rank_manager.assign_rank(replica_id) + + return ReplicaRank(rank=rank) + + return self._execute_with_error_handling(_assign_rank_impl, ReplicaRank(rank=0)) + + def release_rank(self, replica_id: str) -> None: + """Release rank for a replica. + + Args: + replica_id: ID of the replica + + Raises: + RuntimeError: If replica doesn't have ranks + """ + + def _release_rank_impl(): + if not self.has_replica_rank(replica_id): + raise RuntimeError(f"Rank for {replica_id} not assigned") + + # Release global rank + self._replica_rank_manager.release_rank(replica_id) + + return self._execute_with_error_handling(_release_rank_impl, None) + + def recover_rank( + self, + replica_id: str, + rank: ReplicaRank, + ) -> None: + """Recover rank for a replica (e.g., after controller restart). + + Args: + replica_id: ID of the replica + rank: The rank to recover + + Raises: + RuntimeError: If replica already has ranks assigned + """ + + def _recover_rank_impl(): + if self.has_replica_rank(replica_id): + raise RuntimeError( + f"Rank for {replica_id} already assigned: {self._replica_rank_manager.get_rank(replica_id)}" + ) + + # Recover global rank + self._replica_rank_manager.recover_rank(replica_id, rank.rank) + + return self._execute_with_error_handling(_recover_rank_impl, None) + + def has_replica_rank(self, replica_id: str) -> bool: + """Check if replica has a rank assigned. + + Args: + replica_id: The unique ID of the replica + + Returns: + True if the replica has a rank assigned, False otherwise + + Raises: + RuntimeError: If the replica doesn't have ranks assigned + """ + return self._replica_rank_manager.has_rank(replica_id) + + def get_replica_rank(self, replica_id: str) -> ReplicaRank: + """Get the rank for a replica. + + Args: + replica_id: ID of the replica + + Returns: + ReplicaRank object + + Raises: + RuntimeError: If replica doesn't have ranks assigned + """ + + def _get_replica_rank_impl(): + if not self.has_replica_rank(replica_id): + raise RuntimeError(f"Rank for {replica_id} not assigned") + + global_rank = self._replica_rank_manager.get_rank(replica_id) + return ReplicaRank(rank=global_rank) + + return self._execute_with_error_handling( + _get_replica_rank_impl, ReplicaRank(rank=0) ) - return replicas_needing_ranks + def check_rank_consistency_and_reassign_minimally( + self, + active_replicas: List["DeploymentReplica"], + ) -> List["DeploymentReplica"]: + """Verify rank system invariants and reassign ranks when needed across all three levels. + + This method ensures: + 1. Global ranks are contiguous [0, N-1] for N replicas + + Args: + active_replicas: List of currently active replicas + + Returns: + List of replicas that need to be reconfigured with new ranks + """ + + def _check_rank_consistency_impl(): + if not active_replicas: + return [] + + # Extract replica IDs from replicas + active_replica_ids = [ + replica.replica_id.unique_id for replica in active_replicas + ] + + # Create a mapping from replica ID to replica object for quick lookup + replica_id_to_replica = { + replica.replica_id.unique_id: replica for replica in active_replicas + } + + # Track all replicas needing reconfiguration from any rank system + all_replica_ids_needing_reconfiguration = set() + + # STEP 1: Check global rank consistency + replica_ids_from_global = self._replica_rank_manager.check_rank_consistency_and_reassign_minimally( + active_replica_ids + ) + all_replica_ids_needing_reconfiguration.update(replica_ids_from_global) + + # Convert replica IDs back to replica objects + # Filter out stale replicas that are not in the active set + replicas_needing_reconfiguration = [ + replica_id_to_replica[replica_id] + for replica_id in all_replica_ids_needing_reconfiguration + if replica_id in replica_id_to_replica + ] + + return replicas_needing_reconfiguration + + return self._execute_with_error_handling(_check_rank_consistency_impl, []) def clear(self) -> None: - """Clear all rank data. Used for testing and reset.""" - self._replica_ranks.clear() - self._released_ranks.clear() - self._next_rank = 0 + self._replica_rank_manager.clear() + + def get_replica_ranks_mapping(self) -> Dict[str, int]: + return self._replica_rank_manager.get_ranks_mapping() class DeploymentState: @@ -1761,7 +1881,9 @@ def __init__( DeploymentStatusTrigger.CONFIG_UPDATE_STARTED, ) - self._rank_manager = DeploymentRankManager() + self._rank_manager = DeploymentRankManager( + fail_on_rank_error=RAY_SERVE_FAIL_ON_RANK_ERROR + ) self.replica_average_ongoing_requests: Dict[str, float] = {} @@ -2297,7 +2419,7 @@ def _stop_or_update_outdated_version_replicas(self, max_to_stop=math.inf) -> boo replica.replica_id.unique_id ) actor_updating = replica.reconfigure( - self._target_state.version, rank=current_rank + self._target_state.version, rank=current_rank.rank ) if actor_updating: self._replicas.add(ReplicaState.UPDATING, replica) @@ -2418,14 +2540,14 @@ def scale_deployment_replicas( assigned_rank = self._rank_manager.assign_rank(replica_id.unique_id) logger.debug( - f"Assigned rank {assigned_rank} to new replica {replica_id.unique_id} during startup" + f"Assigned rank {assigned_rank.rank} to new replica {replica_id.unique_id} during startup" ) new_deployment_replica = DeploymentReplica( replica_id, self._target_state.version, ) scheduling_request = new_deployment_replica.start( - self._target_state.info, rank=assigned_rank + self._target_state.info, rank=assigned_rank.rank ) upscale.append(scheduling_request) @@ -2544,7 +2666,9 @@ def _check_startup_replicas( # Recover rank from the replica actor during controller restart replica_id = replica.replica_id.unique_id recovered_rank = replica.rank - self._rank_manager.recover_rank(replica_id, recovered_rank) + self._rank_manager.recover_rank( + replica_id, ReplicaRank(rank=recovered_rank) + ) # This replica should be now be added to handle's replica # set. self._replicas.add(ReplicaState.RUNNING, replica) @@ -2827,7 +2951,7 @@ def _reconfigure_replicas_with_new_ranks( # World size is calculated automatically from deployment config _ = replica.reconfigure( self._target_state.version, - rank=new_rank, + rank=new_rank.rank, ) updated_count += 1 diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index 9d5d9176259d..f7638815443b 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -1591,3 +1591,12 @@ class ScaleDeploymentRequest(BaseModel): target_num_replicas: NonNegativeInt = Field( description="The target number of replicas for the deployment." ) + + +@PublicAPI(stability="alpha") +class ReplicaRank(BaseModel): + """Replica rank model.""" + + rank: int = Field( + description="Global rank of the replica across all nodes scoped to the deployment." + ) diff --git a/python/ray/serve/tests/unit/test_deployment_rank_manager.py b/python/ray/serve/tests/unit/test_deployment_rank_manager.py index 211ce31b9471..e7be0e515308 100644 --- a/python/ray/serve/tests/unit/test_deployment_rank_manager.py +++ b/python/ray/serve/tests/unit/test_deployment_rank_manager.py @@ -2,10 +2,11 @@ from ray.serve._private.common import DeploymentID, ReplicaID from ray.serve._private.deployment_state import DeploymentRankManager +from ray.serve.schema import ReplicaRank @pytest.fixture -def rank_manager(): +def rank_manager() -> DeploymentRankManager: """Fixture providing a fresh DeploymentRankManager instance for each test.""" return DeploymentRankManager() @@ -33,17 +34,14 @@ class TestDeploymentRankManager: def test_init(self, rank_manager): """Test initialization creates empty state.""" - assert rank_manager._replica_ranks == {} - assert rank_manager._released_ranks == set() - assert rank_manager._next_rank == 0 + assert rank_manager.get_replica_ranks_mapping() == {} def test_assign_rank_first_replica(self, rank_manager): """Test assigning rank to first replica.""" rank = rank_manager.assign_rank("replica_1") - assert rank == 0 - assert rank_manager._replica_ranks["replica_1"] == 0 - assert rank_manager._next_rank == 1 - assert rank_manager._released_ranks == set() + assert rank.rank == 0 + assert rank_manager.has_replica_rank("replica_1") + assert rank_manager.get_replica_rank("replica_1").rank == 0 def test_assign_rank_multiple_replicas(self, rank_manager): """Test assigning ranks to multiple replicas.""" @@ -51,11 +49,13 @@ def test_assign_rank_multiple_replicas(self, rank_manager): rank2 = rank_manager.assign_rank("replica_2") rank3 = rank_manager.assign_rank("replica_3") - assert rank1 == 0 - assert rank2 == 1 - assert rank3 == 2 - assert rank_manager._next_rank == 3 - assert len(rank_manager._replica_ranks) == 3 + assert rank1.rank == 0 + assert rank2.rank == 1 + assert rank3.rank == 2 + + mapping = rank_manager.get_replica_ranks_mapping() + assert len(mapping) == 3 + assert mapping == {"replica_1": 0, "replica_2": 1, "replica_3": 2} def test_assign_rank_reuses_released_ranks(self, rank_manager): """Test that released ranks are reused before assigning new ones.""" @@ -66,19 +66,19 @@ def test_assign_rank_reuses_released_ranks(self, rank_manager): # Release middle rank rank_manager.release_rank("replica_2") - assert 1 in rank_manager._released_ranks + assert not rank_manager.has_replica_rank("replica_2") - # New replica should get the released rank + # New replica should get the released rank (1) rank = rank_manager.assign_rank("replica_4") - assert rank == 1 - assert 1 not in rank_manager._released_ranks + assert rank.rank == 1 + assert rank_manager.get_replica_rank("replica_4").rank == 1 def test_assign_rank_duplicate_fails(self): - """Test assigning rank to replica that already has one fails when flag is enabled.""" + """Test assigning rank to replica that already has one fails.""" rank_manager = DeploymentRankManager() rank_manager.assign_rank("replica_1") - with pytest.raises(RuntimeError, match="already has a rank assigned"): + with pytest.raises(RuntimeError, match="already assigned"): rank_manager.assign_rank("replica_1") def test_release_rank(self, rank_manager): @@ -88,33 +88,34 @@ def test_release_rank(self, rank_manager): rank_manager.release_rank("replica_1") - assert "replica_1" not in rank_manager._replica_ranks - assert 0 in rank_manager._released_ranks - assert "replica_2" in rank_manager._replica_ranks + assert not rank_manager.has_replica_rank("replica_1") + assert rank_manager.has_replica_rank("replica_2") + assert rank_manager.get_replica_rank("replica_2").rank == 1 def test_release_rank_nonexistent_replica(self): - """Test releasing rank for non-existent replica is safe.""" + """Test releasing rank for non-existent replica fails.""" rank_manager = DeploymentRankManager() - with pytest.raises(RuntimeError, match="has no rank assigned"): + with pytest.raises(RuntimeError, match="not assigned"): rank_manager.release_rank("nonexistent") def test_recover_rank_basic(self, rank_manager): """Test basic rank recovery.""" - rank_manager.recover_rank("replica_1", 5) + rank_manager.recover_rank("replica_1", ReplicaRank(rank=5)) - assert rank_manager._replica_ranks["replica_1"] == 5 - assert rank_manager._next_rank == 6 + assert rank_manager.has_replica_rank("replica_1") + assert rank_manager.get_replica_rank("replica_1").rank == 5 def test_recover_rank_updates_next_rank(self, rank_manager): """Test that recovering a high rank updates next_rank appropriately.""" rank_manager.assign_rank("replica_1") # Gets rank 0 - rank_manager.recover_rank("replica_2", 10) - - assert rank_manager._next_rank == 11 + rank_manager.recover_rank("replica_2", ReplicaRank(rank=10)) - # New replica should get rank 11 + # New replica should get rank 11 (next available after 10) rank = rank_manager.assign_rank("replica_3") - assert rank == 11 + assert rank.rank == 11 + + mapping = rank_manager.get_replica_ranks_mapping() + assert mapping == {"replica_1": 0, "replica_2": 10, "replica_3": 11} def test_recover_rank_removes_from_available(self, rank_manager): """Test that recovering a rank removes it from available ranks.""" @@ -122,32 +123,35 @@ def test_recover_rank_removes_from_available(self, rank_manager): rank_manager.assign_rank("replica_2") rank_manager.release_rank("replica_1") # Rank 0 becomes available - assert 0 in rank_manager._released_ranks + # Recover rank 0 for a new replica + rank_manager.recover_rank("replica_3", ReplicaRank(rank=0)) - # Recover rank 0 - rank_manager.recover_rank("replica_3", 0) + # Verify replica_3 has rank 0 + assert rank_manager.has_replica_rank("replica_3") + assert rank_manager.get_replica_rank("replica_3").rank == 0 - assert 0 not in rank_manager._released_ranks - assert rank_manager._replica_ranks["replica_3"] == 0 + # Next assigned replica should get rank 2 (not 0, which is now taken) + rank = rank_manager.assign_rank("replica_4") + assert rank.rank == 2 def test_recover_rank_duplicate_fails(self): - """Test recovering rank for replica that already has one fails when flag is enabled.""" + """Test recovering rank for replica that already has one fails.""" rank_manager = DeploymentRankManager() rank_manager.assign_rank("replica_1") - with pytest.raises(RuntimeError, match="already has a rank assigned"): - rank_manager.recover_rank("replica_1", 5) + with pytest.raises(RuntimeError, match="already assigned"): + rank_manager.recover_rank("replica_1", ReplicaRank(rank=5)) def test_get_replica_rank_existing(self, rank_manager): """Test getting rank for existing replica.""" rank_manager.assign_rank("replica_1") rank = rank_manager.get_replica_rank("replica_1") - assert rank == 0 + assert rank.rank == 0 def test_get_replica_rank_nonexistent_fails(self): - """Test getting rank for non-existent replica fails when flag is enabled.""" + """Test getting rank for non-existent replica fails.""" rank_manager = DeploymentRankManager() - with pytest.raises(RuntimeError, match="has no rank assigned"): + with pytest.raises(RuntimeError, match="not assigned"): rank_manager.get_replica_rank("nonexistent") def test_get_replica_ranks_mapping(self, rank_manager): @@ -160,9 +164,12 @@ def test_get_replica_ranks_mapping(self, rank_manager): assert mapping == expected - # Verify it's a copy + # Verify it's a copy by modifying it mapping["replica_3"] = 2 - assert "replica_3" not in rank_manager._replica_ranks + # Get a fresh mapping to verify the original wasn't changed + fresh_mapping = rank_manager.get_replica_ranks_mapping() + assert "replica_3" not in fresh_mapping + assert fresh_mapping == expected def test_clear(self, rank_manager): """Test clearing all rank data.""" @@ -172,9 +179,14 @@ def test_clear(self, rank_manager): rank_manager.clear() - assert rank_manager._replica_ranks == {} - assert rank_manager._released_ranks == set() - assert rank_manager._next_rank == 0 + # After clearing, should have no ranks + assert rank_manager.get_replica_ranks_mapping() == {} + assert not rank_manager.has_replica_rank("replica_1") + assert not rank_manager.has_replica_rank("replica_2") + + # Should be able to assign from 0 again + rank = rank_manager.assign_rank("replica_3") + assert rank.rank == 0 def test_check_rank_consistency_empty_replicas(self, rank_manager): """Test consistency check with no active replicas.""" @@ -205,12 +217,10 @@ def test_check_rank_consistency_non_contiguous_ranks(self, rank_manager): replica2 = MockDeploymentReplica("replica_2") replica3 = MockDeploymentReplica("replica_3") - # Manually set up non-contiguous ranks - rank_manager._replica_ranks = { - "replica_1": 0, - "replica_2": 2, # Gap at rank 1 - "replica_3": 3, - } + # Manually assign non-contiguous ranks using recover_rank + rank_manager.recover_rank("replica_1", ReplicaRank(rank=0)) + rank_manager.recover_rank("replica_2", ReplicaRank(rank=2)) # Gap at rank 1 + rank_manager.recover_rank("replica_3", ReplicaRank(rank=3)) result = rank_manager.check_rank_consistency_and_reassign_minimally( [replica1, replica2, replica3] @@ -219,8 +229,9 @@ def test_check_rank_consistency_non_contiguous_ranks(self, rank_manager): # Should reassign some replicas to make ranks contiguous assert len(result) > 0 - # After reassignment, ranks should be contiguous - final_ranks = sorted(rank_manager._replica_ranks.values()) + # After reassignment, ranks should be contiguous [0, 1, 2] + mapping = rank_manager.get_replica_ranks_mapping() + final_ranks = sorted(mapping.values()) expected_ranks = [0, 1, 2] assert final_ranks == expected_ranks @@ -231,13 +242,15 @@ def test_minimal_reassignment_keeps_existing_when_possible(self, rank_manager): replica3 = MockDeploymentReplica("replica_3") replica4 = MockDeploymentReplica("replica_4") - # Set up ranks: 0, 2, 5, 7 (non-contiguous) - rank_manager._replica_ranks = { - "replica_1": 0, # Should keep this - "replica_2": 2, # Should keep this - "replica_3": 5, # Should be reassigned to 1 - "replica_4": 7, # Should be reassigned to 3 - } + # Set up ranks: 0, 2, 5, 7 (non-contiguous) using recover_rank + rank_manager.recover_rank("replica_1", ReplicaRank(rank=0)) # Should keep this + rank_manager.recover_rank("replica_2", ReplicaRank(rank=2)) # Should keep this + rank_manager.recover_rank( + "replica_3", ReplicaRank(rank=5) + ) # Should be reassigned to 1 + rank_manager.recover_rank( + "replica_4", ReplicaRank(rank=7) + ) # Should be reassigned to 3 result = rank_manager.check_rank_consistency_and_reassign_minimally( [replica1, replica2, replica3, replica4] @@ -249,92 +262,223 @@ def test_minimal_reassignment_keeps_existing_when_possible(self, rank_manager): assert reassigned_ids == {"replica_3", "replica_4"} # Verify final ranks are contiguous - final_ranks = sorted(rank_manager._replica_ranks.values()) + mapping = rank_manager.get_replica_ranks_mapping() + final_ranks = sorted(mapping.values()) assert final_ranks == [0, 1, 2, 3] # Verify that replica_1 and replica_2 kept their original ranks - assert rank_manager._replica_ranks["replica_1"] == 0 - assert rank_manager._replica_ranks["replica_2"] == 2 + assert rank_manager.get_replica_rank("replica_1").rank == 0 + assert rank_manager.get_replica_rank("replica_2").rank == 2 - def test_check_rank_consistency_unranked_replicas_fails_when_flag_enabled(self): - """Test consistency check fails when active replicas have no ranks and flag is enabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=True) + def test_check_rank_consistency_unranked_replicas_fails(self): + """Test consistency check fails when active replicas have no ranks.""" + rank_manager = DeploymentRankManager() replica1 = MockDeploymentReplica("replica_1") - with pytest.raises( - RuntimeError, match="Controller rank system is in an invalid state" - ): + with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) - def test_check_rank_consistency_unranked_replicas_logs_when_flag_disabled(self): - """Test consistency check only logs when active replicas have no ranks and flag is disabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=False) + def test_check_rank_consistency_stale_ranks_fails(self): + """Test consistency check fails when there are stale ranks.""" + rank_manager = DeploymentRankManager() replica1 = MockDeploymentReplica("replica_1") - # When flag is disabled, it logs error but still tries to proceed with reassignment - # However, the reassignment will fail when trying to access ranks that don't exist - result = rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) - assert result == [replica1] + # Set up stale rank (replica not in active list) + rank_manager.assign_rank("replica_1") + rank_manager.assign_rank("stale_replica") - def test_check_rank_consistency_stale_ranks_fails_when_flag_enabled(self): - """Test consistency check fails when there are stale ranks and flag is enabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=True) + with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): + rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) + + def test_check_rank_consistency_duplicate_ranks_fails(self): + """Test consistency check fails when there are duplicate ranks.""" + rank_manager = DeploymentRankManager() replica1 = MockDeploymentReplica("replica_1") + replica2 = MockDeploymentReplica("replica_2") - # Set up stale rank (replica not in active list) + # Manually create duplicate ranks using recover_rank (this should never happen in normal operation) + rank_manager.recover_rank("replica_1", ReplicaRank(rank=0)) + rank_manager.recover_rank("replica_2", ReplicaRank(rank=0)) # Duplicate! + + with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): + rank_manager.check_rank_consistency_and_reassign_minimally( + [replica1, replica2] + ) + + +class TestDeploymentRankManagerErrorHandling: + """Test cases for DeploymentRankManager error handling with fail_on_rank_error flag. + + This test class can be easily removed in the future when the error handling + feature is no longer needed. + """ + + def test_assign_rank_error_with_fail_on_rank_error_true(self): + """Test that assign_rank raises exception when fail_on_rank_error=True.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=True) + rank_manager.assign_rank("replica_1") + + # Should raise RuntimeError for duplicate assignment + with pytest.raises(RuntimeError, match="already assigned"): + rank_manager.assign_rank("replica_1") + + def test_assign_rank_error_with_fail_on_rank_error_false(self): + """Test that assign_rank returns safe default when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + rank_manager.assign_rank("replica_1") + + # Should return safe default (ReplicaRank(rank=0)) instead of raising + result = rank_manager.assign_rank("replica_1") + assert result is not None + assert isinstance(result, ReplicaRank) + assert result.rank == 0 + + def test_release_rank_error_with_fail_on_rank_error_true(self): + """Test that release_rank raises exception when fail_on_rank_error=True.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=True) + + # Should raise RuntimeError for non-existent replica + with pytest.raises(RuntimeError, match="not assigned"): + rank_manager.release_rank("nonexistent") + + def test_release_rank_error_with_fail_on_rank_error_false(self): + """Test that release_rank returns safe default when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + + # Should return None instead of raising + result = rank_manager.release_rank("nonexistent") + assert result is None + + def test_recover_rank_error_with_fail_on_rank_error_true(self): + """Test that recover_rank raises exception when fail_on_rank_error=True.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=True) rank_manager.assign_rank("replica_1") - rank_manager.assign_rank("stale_replica") - with pytest.raises( - RuntimeError, match="Controller rank system is in an invalid state" - ): + # Should raise RuntimeError for duplicate recovery + with pytest.raises(RuntimeError, match="already assigned"): + rank_manager.recover_rank("replica_1", ReplicaRank(rank=5)) + + def test_recover_rank_error_with_fail_on_rank_error_false(self): + """Test that recover_rank returns safe default when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + rank_manager.assign_rank("replica_1") + + # Should return None instead of raising + result = rank_manager.recover_rank("replica_1", ReplicaRank(rank=5)) + assert result is None + + def test_get_replica_rank_error_with_fail_on_rank_error_true(self): + """Test that get_replica_rank raises exception when fail_on_rank_error=True.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=True) + + # Should raise RuntimeError for non-existent replica + with pytest.raises(RuntimeError, match="not assigned"): + rank_manager.get_replica_rank("nonexistent") + + def test_get_replica_rank_error_with_fail_on_rank_error_false(self): + """Test that get_replica_rank returns safe default when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + + # Should return safe default (ReplicaRank(rank=0)) instead of raising + result = rank_manager.get_replica_rank("nonexistent") + assert result is not None + assert isinstance(result, ReplicaRank) + assert result.rank == 0 + + def test_check_rank_consistency_error_with_fail_on_rank_error_true(self): + """Test that check_rank_consistency raises exception when fail_on_rank_error=True.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=True) + replica1 = MockDeploymentReplica("replica_1") + + # Set up invalid state: active replica without rank + with pytest.raises(RuntimeError, match="Rank system is in an invalid state"): rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) - def test_check_rank_consistency_stale_ranks_logs_when_flag_disabled(self): - """Test consistency check only logs when there are stale ranks and flag is disabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=False) + def test_check_rank_consistency_error_with_fail_on_rank_error_false(self): + """Test that check_rank_consistency returns safe default when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + replica1 = MockDeploymentReplica("replica_1") + + # Should return empty list instead of raising + result = rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) + assert result == [] + + def test_check_rank_consistency_with_stale_ranks_error_handling(self): + """Test check_rank_consistency with stale ranks and fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) replica1 = MockDeploymentReplica("replica_1") # Set up stale rank (replica not in active list) rank_manager.assign_rank("replica_1") rank_manager.assign_rank("stale_replica") - # When flag is disabled, it logs error but continues with reassignment - # Since only replica_1 is active and has rank 0, no reassignment needed + # Should return empty list instead of raising result = rank_manager.check_rank_consistency_and_reassign_minimally([replica1]) assert result == [] - def test_check_rank_consistency_duplicate_ranks_fails_when_flag_enabled(self): - """Test consistency check fails when there are duplicate ranks and flag is enabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=True) + def test_check_rank_consistency_with_duplicate_ranks_error_handling(self): + """Test check_rank_consistency with duplicate ranks and fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) replica1 = MockDeploymentReplica("replica_1") replica2 = MockDeploymentReplica("replica_2") - # Manually create duplicate ranks (this should never happen in normal operation) - rank_manager._replica_ranks = {"replica_1": 0, "replica_2": 0} # Duplicate! + # Manually create duplicate ranks + rank_manager.recover_rank("replica_1", ReplicaRank(rank=0)) + rank_manager.recover_rank("replica_2", ReplicaRank(rank=0)) - with pytest.raises( - RuntimeError, match="Controller rank system is in an invalid state" - ): - rank_manager.check_rank_consistency_and_reassign_minimally( - [replica1, replica2] - ) + # Should return empty list instead of raising + result = rank_manager.check_rank_consistency_and_reassign_minimally( + [replica1, replica2] + ) + assert result == [] - def test_check_rank_consistency_duplicate_ranks_logs_when_flag_disabled(self): - """Test consistency check only logs when there are duplicate ranks and flag is disabled.""" - rank_manager = DeploymentRankManager(_fail_on_error=False) - replica1 = MockDeploymentReplica("replica_1") - replica2 = MockDeploymentReplica("replica_2") + def test_normal_operations_work_with_fail_on_rank_error_false(self): + """Test that normal operations still work correctly with fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) - # Manually create duplicate ranks (this should never happen in normal operation) - rank_manager._replica_ranks = {"replica_1": 0, "replica_2": 0} # Duplicate! - rank_manager._next_rank = 1 + # Test normal assign + rank1 = rank_manager.assign_rank("replica_1") + assert rank1.rank == 0 + + # Test normal get + rank = rank_manager.get_replica_rank("replica_1") + assert rank.rank == 0 + + # Test normal release + rank_manager.release_rank("replica_1") + assert not rank_manager.has_replica_rank("replica_1") + + # Test normal recover + rank_manager.recover_rank("replica_2", ReplicaRank(rank=5)) + assert rank_manager.get_replica_rank("replica_2").rank == 5 + + # Test normal consistency check + replica2 = MockDeploymentReplica("replica_2") + replica3 = MockDeploymentReplica("replica_3") + rank_manager.assign_rank("replica_3") - # When flag is disabled, it logs error but still performs reassignment to fix the issue result = rank_manager.check_rank_consistency_and_reassign_minimally( - [replica1, replica2] + [replica2, replica3] ) - assert result == [replica2] or result == [replica1] + # Should reassign to make ranks contiguous + assert len(result) > 0 + + def test_multiple_errors_do_not_crash_with_fail_on_rank_error_false(self): + """Test that multiple consecutive errors don't crash when fail_on_rank_error=False.""" + rank_manager = DeploymentRankManager(fail_on_rank_error=False) + + # Multiple errors in a row should all return safe defaults + result1 = rank_manager.get_replica_rank("nonexistent1") + result2 = rank_manager.get_replica_rank("nonexistent2") + result3 = rank_manager.release_rank("nonexistent3") + + assert result1 is not None + assert result2 is not None + assert result3 is None + + # And normal operations should still work after errors + rank = rank_manager.assign_rank("replica_1") + assert rank.rank == 0 if __name__ == "__main__":