Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
)
from ray.data._internal.logging import configure_logging
from ray.data.context import DataContext, DatasetContext
from ray.data.dataset import Dataset, Schema, SinkMode, ClickHouseTableSettings
from ray.data.expectations import (
expect,
)
from ray.data.datasource import (
BlockBasedFileDatasink,
Datasink,
Expand Down Expand Up @@ -127,6 +129,7 @@
"Datasource",
"ExecutionOptions",
"ExecutionResources",
"expect",
"FileShuffleConfig",
"NodeIdStr",
"ReadTask",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import math
import os
from typing import Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from .common import NodeIdStr
from ray.data._internal.execution.util import memory_string
from ray.util.annotations import DeveloperAPI

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces.execution_options import (
ExecutionResources,
)


class ExecutionResources:
"""Specifies resources usage or resource limits for execution.
Expand Down Expand Up @@ -346,6 +351,37 @@ def is_resource_limits_default(self):
"""Returns True if resource_limits is the default value."""
return self._resource_limits == ExecutionResources.for_limits()

def copy(
self,
resource_limits: Optional[ExecutionResources] = None,
exclude_resources: Optional[ExecutionResources] = None,
locality_with_output: Optional[Union[bool, List[NodeIdStr]]] = None,
preserve_order: Optional[bool] = None,
actor_locality_enabled: Optional[bool] = None,
verbose_progress: Optional[bool] = None,
) -> "ExecutionOptions":
"""Return a copy of this object, overriding specified fields.

Args:
resource_limits: Optional resource limits override.
exclude_resources: Optional exclude resources override.
locality_with_output: Optional locality override.
preserve_order: Optional preserve order override.
actor_locality_enabled: Optional actor locality override.
verbose_progress: Optional verbose progress override.

Returns:
A new ExecutionOptions object with overridden fields.
"""
return ExecutionOptions(
resource_limits=resource_limits if resource_limits is not None else self.resource_limits,
exclude_resources=exclude_resources if exclude_resources is not None else self.exclude_resources,
locality_with_output=locality_with_output if locality_with_output is not None else self.locality_with_output,
preserve_order=preserve_order if preserve_order is not None else self.preserve_order,
actor_locality_enabled=actor_locality_enabled if actor_locality_enabled is not None else self.actor_locality_enabled,
verbose_progress=verbose_progress if verbose_progress is not None else self.verbose_progress,
)

def validate(self) -> None:
"""Validate the options."""
for attr in ["cpu", "gpu", "object_store_memory"]:
Expand Down
13 changes: 9 additions & 4 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ def __init__(

self._op_resource_allocator: Optional["OpResourceAllocator"] = None

# Use reservation ratio from data context
reservation_ratio = data_context.op_resource_reservation_ratio

if data_context.op_resource_reservation_enabled:
# We'll enable memory reservation if all operators have
# implemented accurate memory accounting.
Expand All @@ -98,10 +101,11 @@ def __init__(
)
if should_enable:
self._op_resource_allocator = ReservationOpResourceAllocator(
self, data_context.op_resource_reservation_ratio
self, reservation_ratio
)

self._object_store_memory_limit_fraction = (
# Set object store memory limit fraction
base_memory_fraction = (
data_context.override_object_store_memory_limit_fraction
if data_context.override_object_store_memory_limit_fraction is not None
else (
Expand All @@ -110,6 +114,7 @@ def __init__(
else self.DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION_NO_RESERVATION
)
)
self._object_store_memory_limit_fraction = base_memory_fraction

self._warn_about_object_store_memory_if_needed()

Expand All @@ -135,8 +140,8 @@ def _warn_about_object_store_memory_if_needed(self):
):
logger.warning(
f"{WARN_PREFIX} Ray's object store is configured to use only "
f"{object_store_fraction:.1%} of available memory ({object_store_memory/GiB:.1f}GiB "
f"out of {total_memory/GiB:.1f}GiB total). For optimal Ray Data performance, "
f"{object_store_fraction:.1%} of available memory ({object_store_memory / GiB:.1f}GiB "
f"out of {total_memory / GiB:.1f}GiB total). For optimal Ray Data performance, "
f"we recommend setting the object store to at least 50% of available memory. "
f"You can do this by setting the 'object_store_memory' parameter when calling "
f"ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable."
Expand Down
58 changes: 57 additions & 1 deletion python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ray.data.block import BlockMetadataWithSchema, _take_first_non_empty_schema
from ray.data.context import DataContext
from ray.data.exceptions import omit_traceback_stdout
from ray.data.expectations import ExecutionTimeExpectation
from ray.util.debug import log_once

if TYPE_CHECKING:
Expand Down Expand Up @@ -88,6 +89,57 @@ def __init__(

self._context = data_context

# Track execution time expectations for optimization hints
self._execution_time_expectations: List[ExecutionTimeExpectation] = []

def add_execution_time_expectation(
self, expectation: ExecutionTimeExpectation
) -> None:
"""Add an execution time expectation to this execution plan.

This allows the execution plan to track execution time requirements and
inform optimization strategies.

Args:
expectation: The execution time expectation to add.

Raises:
TypeError: If expectation is not an ExecutionTimeExpectation instance.
"""
if not isinstance(expectation, ExecutionTimeExpectation):
raise TypeError(
f"Expected ExecutionTimeExpectation, got {type(expectation).__name__}"
)
self._execution_time_expectations.append(expectation)

def get_execution_time_expectations(self) -> List[ExecutionTimeExpectation]:
"""Get all execution time expectations attached to this plan.

Returns:
List of execution time expectations.
"""
return self._execution_time_expectations.copy()

def get_max_execution_time_seconds(self) -> Optional[float]:
"""Get the maximum execution time from execution time expectations.

Returns:
Minimum max execution time from all execution time expectations, or None if no time constraints.
"""
if not self._execution_time_expectations:
return None

max_times = [
exp.get_max_execution_time_seconds()
for exp in self._execution_time_expectations
if exp.get_max_execution_time_seconds() is not None
]

if not max_times:
return None

return min(max_times)

def get_dataset_id(self) -> str:
"""Unique ID of the dataset, including the dataset name,
UUID, and current execution index.
Expand All @@ -101,6 +153,7 @@ def create_executor(self) -> "StreamingExecutor":
from ray.data._internal.execution.streaming_executor import StreamingExecutor

self._run_index += 1

executor = StreamingExecutor(self._context, self.get_dataset_id())
return executor

Expand Down Expand Up @@ -128,7 +181,6 @@ def explain(self) -> str:

sections = []
for title, convert_fn in zip(titles, convert_fns):

# 2. Convert plan to new plan
plan = convert_fn(plan)

Expand Down Expand Up @@ -359,6 +411,7 @@ def copy(self) -> "ExecutionPlan":
plan_copy._snapshot_operator = self._snapshot_operator
plan_copy._snapshot_stats = self._snapshot_stats
plan_copy._dataset_name = self._dataset_name
plan_copy._execution_time_expectations = self._execution_time_expectations.copy()
return plan_copy

def deep_copy(self) -> "ExecutionPlan":
Expand All @@ -379,6 +432,9 @@ def deep_copy(self) -> "ExecutionPlan":
plan_copy._snapshot_operator = copy.copy(self._snapshot_operator)
plan_copy._snapshot_stats = copy.copy(self._snapshot_stats)
plan_copy._dataset_name = self._dataset_name
plan_copy._execution_time_expectations = copy.deepcopy(
self._execution_time_expectations
)
return plan_copy

def initial_num_blocks(self) -> Optional[int]:
Expand Down
Loading