Skip to content

Conversation

@soffer-anyscale
Copy link
Contributor

Description

This PR introduces a new ray.data.expectations module that enables users to define and enforce both data quality rules and execution time constraints (enterprise SLA requirements) for Ray Data pipelines. The expectations API integrates seamlessly with Ray Data's expression system to provide declarative validation and execution time monitoring.

Key Features:

  1. Data Quality Validation

    • Expression-based validation using Ray Data's expression API (col("value") > 0)
    • Custom validator functions for complex validation logic
    • Automatic dataset splitting into passed/failed datasets for quarantine workflows
    • Batch-level validation with efficient Arrow-based evaluation
  2. Execution Time Constraints (SLA Requirements)

    • Execution time constraints (max_execution_time_seconds)
    • Automatic execution monitoring and timeout handling
  3. Pythonic API Design

    • Simple Dataset.expect() method that matches Ray Data patterns
    • Direct parameter passing for inline expectation creation
    • List support for multiple expectations
    • Expression-based validation directly integrated with Ray Data's expressions API

Why ds.expect() vs filter() or Expressions API?

While filter() and the expressions API are powerful for data transformations, ds.expect() serves a different purpose focused on data quality validation and enterprise SLA requirements:

Key Differences:

  1. Quarantine Workflows: filter() only returns passing rows, while expect() returns both passed_ds and failed_ds, enabling automated quarantine workflows where invalid data is preserved for investigation rather than discarded.

  2. Validation Results: expect() provides rich validation metadata (ExpectationResult) including pass/fail status, failure counts, execution time, and descriptive messages—essential for monitoring, alerting, and compliance reporting.

  3. Enterprise SLAs: expect() uniquely supports execution time constraints, enabling Ray Data to monitor execution time and halt processing if SLA requirements are exceeded.

  4. Batch-Level Validation: Unlike filter() which operates row-by-row, expect() validates entire batches and provides aggregate statistics, making it more efficient for data quality checks across large datasets.

  5. Declarative Validation: expect() provides a clean, declarative API for expressing data quality rules that integrates seamlessly with Ray Data's expression system.

Use Cases:

  • Data Quality Monitoring: Validate data pipelines and automatically route invalid records to quarantine for investigation
  • SLA Compliance: Ensure jobs complete within time constraints with automatic execution monitoring
  • Compliance & Auditing: Track validation results with detailed metadata for regulatory requirements
  • Pipeline Reliability: Catch data quality issues early with clear error messages and failure reporting

In essence, filter() transforms data, while expect() validates and monitors it—two complementary operations that serve different needs in production data pipelines.

Usage Examples:

import ray
from ray.data.expectations import expect
from ray.data.expressions import col

# Simple expression-based validation (most common)
ds = ray.data.from_items([{"value": 1}, {"value": 2}, {"value": -1}])
passed_ds, failed_ds, result = ds.expect(expr=col("value") > 0)
print(result.passed)  # False
print(failed_ds.take_all())  # [{'value': -1}]

# Quarantine workflows
raw_ds = ray.data.from_items([{"user_id": 1, "score": 95}, {"user_id": 2, "score": -5}])
valid_ds, invalid_ds, result = raw_ds.expect(expr=col("score") >= 0)
valid_ds.write_parquet("s3://bucket/valid/")
invalid_ds.write_parquet("s3://bucket/quarantine/")

# Multiple expectations (pass as list)
expectations = [
    expect(expr=col("age") >= 0),
    expect(expr=col("email").is_not_null())
]
passed_ds, failed_ds, results = ds.expect(expectations)

# Enterprise SLA requirements
ds = ray.data.range(1000000)
processed_ds, remaining_ds, result = ds.expect(
    max_execution_time_seconds=60  # Monitor execution time
)

# Custom validator function
def validate_batch(batch):
    return batch["value"].min() > 0 and batch["value"].max() < 100

passed_ds, failed_ds, result = ds.expect(
    validator_fn=validate_batch,
    name="value_range_check"
)

Related issues

This is a new feature with no related issues.

Additional information

Implementation Details

New Files:

  • python/ray/data/expectations.py - Core expectations module (~540 lines)
  • python/ray/data/tests/test_expectations.py - Comprehensive test suite

Modified Files:

  • python/ray/data/dataset.py - Added Dataset.expect() method and helper functions (_count_rows_in_batch, _create_dataset_from_batches)
  • python/ray/data/__init__.py - Exported expect function
  • python/ray/data/_internal/plan.py - Integrated execution time expectations into execution plan for time tracking

API Surface:

Public Functions:

  • expect() - Factory function for creating expectations (@PublicAPI(stability="alpha"))

Internal Classes (marked as @DeveloperAPI):

  • Expectation - Base class for all expectations
  • DataQualityExpectation - For data quality validation
  • ExecutionTimeExpectation - For execution time constraints (SLA requirements)
  • ExpectationResult - Result of expectation validation

Dataset Method:

  • Dataset.expect() - Applies expectations and returns (passed_ds, failed_ds, result) or (passed_ds, failed_ds, List[ExpectationResult]) for lists

Architecture Integration

Expression API Integration:

  • Expression-based expectations leverage Ray Data's existing eval_expr() infrastructure
  • Uses with_column() + filter() pattern for efficient validation (avoids double evaluation)
  • Reuses BlockAccessor for consistent batch format handling (Arrow, pandas, dict)
  • Includes URLs to third-party library documentation (PyArrow, Pandas) in comments

Execution Plan Integration:

  • Execution time expectations are stored in ExecutionPlan._execution_time_expectations
  • Execution time is tracked and monitored during dataset operations
  • Timeout handling halts execution if max_execution_time_seconds is exceeded

Design Principles:

  • Minimal API: Simple, intuitive methods matching Ray Data patterns (similar to filter())
  • Reuse Existing Code: Leverages filter(), map_batches(), BlockAccessor, eval_expr(), etc.
  • Pythonic: Direct parameter passing, supports expressions, follows Python conventions
  • Clean Separation: Internal classes are @DeveloperAPI, only expect() is @PublicAPI

Testing

The implementation includes comprehensive tests covering:

  • Core functionality (data quality and execution time expectations)
  • Expression-based validation
  • Validator function-based validation
  • Multiple expectations (lists)
  • Edge cases (empty datasets, empty batches, null values)
  • Integration with Dataset operations
  • Error handling and parameter validation
  • Execution time timeout handling
  • Execution plan integration

Performance Considerations

  • Expression-based validation uses Arrow-native evaluation (same as filter())
  • Validator functions use map_batches() for efficient batch processing
  • Validation results are aggregated using a Ray actor for distributed collection
  • No performance impact when expectations are not used
  • Expression evaluation is optimized to avoid double evaluation when splitting datasets

- Add helper functions for common expectation patterns:
  - expect_column_values_to_be_between()
  - expect_column_values_to_not_be_null()
  - expect_column_values_to_be_unique()
  - expect_column_values_to_be_in_set()
  - expect_table_row_count_to_be_between()
- Add ExpectationSuite class for organizing multiple expectations
- Improve error messages with context and suggestions
- Enhance documentation with more examples
- Fix linting issues (whitespace, boolean comparisons)

Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com>
Hide implementation detail from users - always use map_batches for validator
functions since they expect batch input. This simplifies the API and removes
unnecessary complexity.

Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com>
Remove implementation details from public API:
- Remove name and description parameters (auto-generated for expressions)
- Remove batch_format parameter (use default internally)
- Make expectation optional to allow ds.expect(expr=...)
- Improve error messages and documentation
- Align API signature with filter() and other Dataset methods

Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com>
- Accept expectations in constructor: ExpectationSuite(name, [exp1, exp2])
- Rename add_expectation() to append() for list-like API
- Rename add_expectations() to extend() for list-like API
- Add __getitem__ and __iter__ for list-like iteration
- Support single expectation or list/tuple in constructor
- Support extending from another ExpectationSuite
- Update documentation examples to show simpler usage patterns

Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com>
- Make all classes (@DeveloperAPI): Expectation, DataQualityExpectation, SLAExpectation, ExpectationResult, ExpectationSuite
- Make all helper functions @DeveloperAPI (not public)
- Only export expect() from ray.data.expectations
- Update Dataset.expect() to accept lists directly instead of ExpectationSuite
- Remove ExpectationSuite from public API, use simple lists instead
- Update documentation to show only expect() usage patterns

Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com>
- Accept name, description, validator_fn, max_execution_time_seconds, etc. directly in Dataset.expect()
- Support ds.expect(col('age') >= 0, name='check') pattern
- Remove need to create expectation objects separately
- Still supports passing expectation objects for advanced use cases
- Update examples to show simpler usage patterns

Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com>
- Rename optimization_strategy parameter to optimize_for for clarity
- optimize_for='performance' (default) is more intuitive than optimization_strategy
- Still supports optimization_strategy for backward compatibility
- Update examples to show simpler usage: ds.expect(max_execution_time_seconds=60)
- Default to 'performance' when time constraint is set (most common use case)

Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com>
- Add expect() factory function for creating expectation objects
- Add Dataset.expect() method for validating data quality and SLA requirements
- Support expression-based validation using Ray Data expressions API
- Support custom validator functions for complex validation logic
- Support SLA expectations with max_execution_time_seconds
- Return (passed_ds, failed_ds, result) tuples for quarantine workflows
- Extract helper functions for batch row counting and dataset creation
- Add comprehensive test coverage in test_expectations.py
- Follow Ray Data coding standards and architecture patterns

Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com>
- Rename SLAExpectation to ExecutionTimeExpectation
- Rename ExpectationType.SLA to ExpectationType.EXECUTION_TIME
- Rename _expect_sla to _expect_execution_time
- Rename _sla_expectations to _execution_time_expectations
- Update all method names and variable names
- Update docstrings to use generic developer terminology
- Fix lint errors (lambda expressions, missing imports)
- Keep SLA terminology only in PR documentation

Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com>
@soffer-anyscale soffer-anyscale requested a review from a team as a code owner November 6, 2025 18:21
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new ray.data.expectations module that enables data quality validation and execution time constraints (SLA requirements) for Ray Data pipelines. The implementation provides a declarative API for validating datasets and monitoring execution times.

Key changes:

  • Expression-based validation using Ray Data's expression API
  • Dataset splitting into passed/failed datasets for quarantine workflows
  • Execution time monitoring and timeout handling
  • Pythonic API with Dataset.expect() method

Reviewed Changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
python/ray/data/expectations.py Core expectations module with Expectation, DataQualityExpectation, and ExecutionTimeExpectation classes
python/ray/data/dataset.py Added Dataset.expect() method and helper functions for validation
python/ray/data/_internal/plan.py Integrated execution time expectations tracking into ExecutionPlan
python/ray/data/tests/test_expectations.py Comprehensive test suite for expectations functionality
python/ray/data/__init__.py Exported expect function
python/ray/data/_internal/execution/resource_manager.py Minor refactoring of resource reservation logic
python/ray/data/_internal/execution/interfaces/execution_options.py Added copy() method to ExecutionOptions

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 15 to 20
else:
# Import for runtime type checking
try:
from ray.data.expressions import Expr as _ExprType # noqa: F401
except ImportError:
_ExprType = None # type: ignore
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runtime type checking import pattern is unnecessarily complex. The _ExprType imported in the else block is never used. Consider simplifying this by either removing the runtime import or using it consistently throughout the file.

Suggested change
else:
# Import for runtime type checking
try:
from ray.data.expressions import Expr as _ExprType # noqa: F401
except ImportError:
_ExprType = None # type: ignore

Copilot uses AI. Check for mistakes.
if self.max_execution_time_seconds is None and self.max_execution_time is None:
if self.target_completion_time is None:
raise ValueError(
"Must specify either max_execution_time_seconds, "
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected error message to use 'at least one time constraint' for clarity and consistency with the method's purpose

Suggested change
"Must specify either max_execution_time_seconds, "
"Must specify at least one time constraint: max_execution_time_seconds, "

Copilot uses AI. Check for mistakes.
Comment on lines 1968 to 1969
total_rows_validated=total_rows,
failed_rows=failed_rows,
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExpectationResult uses field names total_count and failure_count (as defined in expectations.py line 238-239), but this code passes total_rows_validated and failed_rows. This will cause a TypeError at runtime. Use the correct field names: total_count and failure_count.

Suggested change
total_rows_validated=total_rows,
failed_rows=failed_rows,
total_count=total_rows,
failure_count=failed_rows,

Copilot uses AI. Check for mistakes.
Comment on lines +2170 to +2176
result = ExpectationResult(
expectation=expectation,
passed=passed,
message=message,
failure_count=failure_count,
total_count=total_count,
)
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as Comment 4 - inconsistent field naming between the dataclass definition and usage. This instance correctly uses failure_count and total_count, but other usages in the same file use failed_rows and total_rows_validated. All usages should be consistent with the dataclass definition.

Copilot uses AI. Check for mistakes.
Comment on lines 2335 to 2341
result = ExpectationResult(
expectation=expectation,
passed=passed,
message=message,
execution_time_seconds=elapsed_time,
total_count=processed_rows,
failure_count=0 if passed else 1, # 1 = timeout failure
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment # 1 = timeout failure on line 2341 suggests that failure_count=1 represents a timeout failure, but this is semantically incorrect. The failure_count field should represent the number of failed rows/batches, not a boolean flag for timeout. Consider using a separate field or status code to indicate timeout failures rather than overloading failure_count.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a powerful and well-designed expect() API for data quality and SLA enforcement in Ray Data. The implementation is comprehensive, covering expression-based validation, custom validator functions, and execution time constraints, and it integrates nicely with the existing expression system. My review focuses on improving scalability, correctness in edge cases, and maintainability. I've identified a critical bug in how validation results are created, a couple of high-severity issues related to potential memory-overload and schema preservation, and some medium-severity issues for code cleanup and maintainability. The tests are extensive, though I found one that needs to be corrected to align with the API. Overall, this is an excellent and valuable addition to the Ray Data library.

Comment on lines 1964 to 1970
result = ExpectationResult(
expectation=expectation,
passed=passed,
message=message,
total_rows_validated=total_rows,
failed_rows=failed_rows,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There's a bug here where ExpectationResult is instantiated with incorrect keyword arguments. The ExpectationResult dataclass expects total_count and failure_count, but total_rows_validated and failed_rows are used instead. This will cause a TypeError at runtime.

Suggested change
result = ExpectationResult(
expectation=expectation,
passed=passed,
message=message,
total_rows_validated=total_rows,
failed_rows=failed_rows,
)
result = ExpectationResult(
expectation=expectation,
passed=passed,
message=message,
total_count=total_rows,
failure_count=failed_rows,
)

# Failed dataset = remaining unprocessed data
# For V1, we return empty dataset as we can't easily track unprocessed data
# This could be enhanced in future versions
failed_ds = ray.data.from_items([])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When a timeout occurs, the failed_ds is created from an empty list using ray.data.from_items([]). This creates a new dataset without a schema. If the original dataset had a schema, this could lead to errors in downstream operations that expect the schema to be preserved.

To ensure schema preservation, it's better to create an empty dataset from the original dataset's context. A simple way to do this is self.limit(0).

Suggested change
failed_ds = ray.data.from_items([])
failed_ds = self.limit(0)

Comment on lines 2389 to 2391
if failed_ds.count() > 0:
failed_rows = failed_ds.take_all()
all_failed_rows.extend(failed_rows)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of _expect_list collects all failed rows from each expectation into a list on the driver node by calling failed_ds.take_all(). For large datasets with even a moderate failure rate, this can lead to an out-of-memory error on the driver, which undermines the benefits of distributed computation.

A more scalable approach would be to collect the failed_ds Dataset objects and then union them at the end. This would keep the data distributed in the object store.

Example of a more scalable approach:

all_failed_datasets = []
# ... in the loop ...
if failed_ds.count() > 0:
    all_failed_datasets.append(failed_ds)

# ... after the loop ...
if all_failed_datasets:
    failed_ds = all_failed_datasets[0].union(*all_failed_datasets[1:])
else:
    # create empty dataset with correct schema
    failed_ds = current_ds.limit(0)

Comment on lines 584 to 599
def test_expectation_result_with_all_fields():
"""Test ExpectationResult with all fields."""
result = ExpectationResult(
expectation_name="test_expectation",
passed=True,
total_rows_validated=1000,
failed_rows=0,
execution_time_seconds=5.2,
message="All checks passed",
)

assert result.passed is True
assert result.total_rows_validated == 1000
assert result.failed_rows == 0
assert result.execution_time_seconds == 5.2
assert "test_expectation" in result.message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This test case for ExpectationResult is incorrect and will fail. It attempts to initialize ExpectationResult with fields that do not exist (expectation_name, total_rows_validated, failed_rows) and misses the required expectation field.

The assertions also reference these non-existent fields. The test should be updated to correctly instantiate ExpectationResult and assert on its actual fields (expectation, total_count, failure_count).

Suggested change
def test_expectation_result_with_all_fields():
"""Test ExpectationResult with all fields."""
result = ExpectationResult(
expectation_name="test_expectation",
passed=True,
total_rows_validated=1000,
failed_rows=0,
execution_time_seconds=5.2,
message="All checks passed",
)
assert result.passed is True
assert result.total_rows_validated == 1000
assert result.failed_rows == 0
assert result.execution_time_seconds == 5.2
assert "test_expectation" in result.message
def test_expectation_result_with_all_fields():
"""Test ExpectationResult with all fields."""
exp = expect(expr=col("value") > 0, name="test_expectation")
result = ExpectationResult(
expectation=exp,
passed=True,
total_count=1000,
failure_count=0,
execution_time_seconds=5.2,
message="All checks passed for test_expectation",
)
assert result.passed is True
assert result.total_count == 1000
assert result.failure_count == 0
assert result.execution_time_seconds == 5.2
assert "test_expectation" in result.message

Comment on lines 2020 to 2045
try:
import pandas as pd

if isinstance(batch, pd.DataFrame):
batch["_validation_passed"] = True
return batch
except Exception:
pass
try:
import pyarrow as pa

if isinstance(batch, pa.Table):
import pyarrow.compute as pc

passed_col = pc.fill_null(pc.scalar(True), True)
return batch.append_column("_validation_passed", passed_col)
except Exception:
pass
# Fallback: add validation flag to dict batch
if isinstance(batch, dict):
import numpy as np

batch["_validation_passed"] = np.array(
[True] * len(batch.get(list(batch.keys())[0], []))
)
return batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is significant code duplication within the validation_fn wrapper for handling different batch types (pandas, pyarrow, dict). This logic for adding the _validation_passed column is repeated three times: for empty batches, for passed batches, and for failed batches.

To improve maintainability and reduce redundancy, consider refactoring this logic into a single helper function, for example _add_validation_flag(batch, flag_value), that can be called from all three locations.

Comment on lines 23 to 44
def _is_expr(obj: Any) -> bool:
"""Check if an object is a Ray Data expression.
Uses isinstance if Expr is available, otherwise falls back to duck typing.
This ensures we can detect expressions even if the import fails.
"""
# Import here to avoid circular dependencies and handle lazy import
try:
from ray.data.expressions import Expr as ExprType

return isinstance(obj, ExprType)
except (ImportError, TypeError):
pass

# Duck typing fallback for Expr detection
# Expectation is defined later in the file, but we can check by class name
obj_type = type(obj).__name__
if obj_type == "Expectation" or obj_type.endswith("Expectation"):
return False
if callable(obj):
return False
return hasattr(obj, "to_pyarrow") and hasattr(obj, "data_type")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _is_expr function appears to be unused within the codebase. It seems like dead code that might have been part of a previous implementation. To improve code clarity and maintainability, it should be removed.

- Fix ExpectationResult field names (total_count/failure_count)
- Fix schema preservation for failed_ds using limit(0)
- Fix memory issue in _expect_list by using union instead of take_all
- Fix test_expectation_result_with_all_fields with correct fields
- Refactor validation flag logic into _add_validation_flag helper
- Remove unused _is_expr function and _ExprType import
- Improve error message clarity for time constraints
- Fix failure_count semantics for timeout (use passed=False instead)

Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com>
- Extract _add_validation_flag helper function
- Remove duplicate code for pandas/pyarrow/dict batch handling
- Reduces code duplication from 3 locations to 1 helper function

Signed-off-by: soffer-anyscale <stephen.offer@anyscale.com>
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Nov 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants