Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ def filter(self, record):
"numpy",
"pandas",
"pyarrow",
"pyarrow.compute",
"pytorch_lightning",
"scipy",
"setproctitle",
Expand Down
95 changes: 93 additions & 2 deletions doc/source/data/api/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Expressions API

Expressions provide a way to specify column-based operations on datasets.
Use :func:`col` to reference columns and :func:`lit` to create literal values.
These can be combined with operators to create complex expressions for filtering,
You can combine these with operators to create complex expressions for filtering,
transformations, and computations.

Public API
Expand All @@ -21,6 +21,7 @@ Public API
col
lit
udf
pyarrow_udf
download

Expression Classes
Expand All @@ -39,4 +40,94 @@ instantiate them directly, but you may encounter them when working with expressi
BinaryExpr
UnaryExpr
UDFExpr
StarExpr
StarExpr

Expression namespaces
------------------------------------

These namespace classes provide specialized operations for list, string, and struct columns.
You access them through properties on expressions: ``.list``, ``.str``, and ``.struct``.

The following example shows how to use the string namespace to transform text columns:

.. testcode::

import ray
from ray.data.expressions import col

# Create a dataset with a text column
ds = ray.data.from_items([
{"name": "alice"},
{"name": "bob"},
{"name": "charlie"}
])

# Use the string namespace to uppercase the names
ds = ds.with_column("upper_name", col("name").str.upper())
ds.show()

.. testoutput::

{'name': 'alice', 'upper_name': 'ALICE'}
{'name': 'bob', 'upper_name': 'BOB'}
{'name': 'charlie', 'upper_name': 'CHARLIE'}

The following example demonstrates using the list namespace to work with array columns:

.. testcode::

import ray
from ray.data.expressions import col

# Create a dataset with list columns
ds = ray.data.from_items([
{"scores": [85, 90, 78]},
{"scores": [92, 88]},
{"scores": [76, 82, 88, 91]}
])

# Use the list namespace to get the length of each list
ds = ds.with_column("num_scores", col("scores").list.len())
ds.show()

.. testoutput::

{'scores': [85, 90, 78], 'num_scores': 3}
{'scores': [92, 88], 'num_scores': 2}
{'scores': [76, 82, 88, 91], 'num_scores': 4}

The following example shows how to use the struct namespace to access nested fields:

.. testcode::

import ray
from ray.data.expressions import col

# Create a dataset with struct columns
ds = ray.data.from_items([
{"user": {"name": "alice", "age": 25}},
{"user": {"name": "bob", "age": 30}},
{"user": {"name": "charlie", "age": 35}}
])

# Use the struct namespace to extract a specific field
ds = ds.with_column("user_name", col("user").struct.field("name"))
ds.show()

.. testoutput::

{'user': {'name': 'alice', 'age': 25}, 'user_name': 'alice'}
{'user': {'name': 'bob', 'age': 30}, 'user_name': 'bob'}
{'user': {'name': 'charlie', 'age': 35}, 'user_name': 'charlie'}

.. autoclass:: _ListNamespace
:members:
:exclude-members: _expr

.. autoclass:: _StringNamespace
:members:
:exclude-members: _expr

.. autoclass:: _StructNamespace
:members:
:exclude-members: _expr
14 changes: 14 additions & 0 deletions python/ray/data/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1649,6 +1649,20 @@ py_test(
],
)

py_test(
name = "test_namespace_expressions",
size = "medium",
srcs = ["tests/test_namespace_expressions.py"],
tags = [
"exclusive",
"team:data",
],
deps = [
":conftest",
"//:ray_lib",
],
)

py_test(
name = "test_context",
size = "small",
Expand Down
192 changes: 188 additions & 4 deletions python/ray/data/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,29 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, Generic, List, TypeVar, Union
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
List,
TypeVar,
Union,
)

import pyarrow
import pyarrow.compute as pc

from ray.data.block import BatchColumn
from ray.data.datatype import DataType
from ray.util.annotations import DeveloperAPI, PublicAPI

if TYPE_CHECKING:
from ray.data.namespace_expressions.list_namespace import _ListNamespace
from ray.data.namespace_expressions.string_namespace import _StringNamespace
from ray.data.namespace_expressions.struct_namespace import _StructNamespace

T = TypeVar("T")


Expand Down Expand Up @@ -123,18 +138,15 @@ class _PyArrowExpressionVisitor(_ExprVisitor["pyarrow.compute.Expression"]):
"""Visitor that converts Ray Data expressions to PyArrow compute expressions."""

def visit_column(self, expr: "ColumnExpr") -> "pyarrow.compute.Expression":
import pyarrow.compute as pc

return pc.field(expr.name)

def visit_literal(self, expr: "LiteralExpr") -> "pyarrow.compute.Expression":
import pyarrow.compute as pc

return pc.scalar(expr.value)

def visit_binary(self, expr: "BinaryExpr") -> "pyarrow.compute.Expression":
import pyarrow as pa
import pyarrow.compute as pc

if expr.op in (Operation.IN, Operation.NOT_IN):
left = self.visit(expr.left)
Expand Down Expand Up @@ -405,6 +417,75 @@ def alias(self, name: str) -> "Expr":
data_type=self.data_type, expr=self, _name=name, _is_rename=False
)

@property
def list(self) -> "_ListNamespace":
"""Access list operations for this expression.

Returns:
A _ListNamespace that provides list-specific operations.

Example:
>>> from ray.data.expressions import col
>>> import ray
>>> ds = ray.data.from_items([
... {"items": [1, 2, 3]},
... {"items": [4, 5]}
... ])
>>> ds = ds.with_column("num_items", col("items").list.len())
>>> ds = ds.with_column("first_item", col("items").list[0])
>>> ds = ds.with_column("slice", col("items").list[1:3])
"""
from ray.data.namespace_expressions.list_namespace import _ListNamespace

return _ListNamespace(self)

@property
def str(self) -> "_StringNamespace":
"""Access string operations for this expression.

Returns:
A _StringNamespace that provides string-specific operations.

Example:
>>> from ray.data.expressions import col
>>> import ray
>>> ds = ray.data.from_items([
... {"name": "Alice"},
... {"name": "Bob"}
... ])
>>> ds = ds.with_column("upper_name", col("name").str.upper())
>>> ds = ds.with_column("name_len", col("name").str.len())
>>> ds = ds.with_column("starts_a", col("name").str.starts_with("A"))
"""
from ray.data.namespace_expressions.string_namespace import _StringNamespace

return _StringNamespace(self)

@property
def struct(self) -> "_StructNamespace":
"""Access struct operations for this expression.

Returns:
A _StructNamespace that provides struct-specific operations.

Example:
>>> from ray.data.expressions import col
>>> import ray
>>> import pyarrow as pa
>>> ds = ray.data.from_arrow(pa.table({
... "user": pa.array([
... {"name": "Alice", "age": 30}
... ], type=pa.struct([
... pa.field("name", pa.string()),
... pa.field("age", pa.int32())
... ]))
... }))
>>> ds = ds.with_column("age", col("user").struct["age"]) # doctest: +SKIP
"""
from ray.data.namespace_expressions.struct_namespace import _StructNamespace

return _StructNamespace(self)

def _unalias(self) -> "Expr":
return self

Expand Down Expand Up @@ -694,6 +775,88 @@ def decorator(func: Callable[..., BatchColumn]) -> Callable[..., UDFExpr]:
return decorator


def _create_pyarrow_wrapper(
fn: Callable[..., BatchColumn]
) -> Callable[..., BatchColumn]:
"""Wrap a PyArrow compute function to auto-convert inputs to PyArrow format.

This wrapper ensures that pandas Series and numpy arrays are converted to
PyArrow Arrays before being passed to the function, enabling PyArrow compute
functions to work seamlessly with any block format.

Args:
fn: The PyArrow compute function to wrap

Returns:
A wrapped function that handles format conversion
"""

@functools.wraps(fn)
def arrow_wrapper(*args, **kwargs):
import numpy as np
import pandas as pd
import pyarrow as pa

def to_arrow(val):
"""Convert a value to PyArrow Array if needed."""
if isinstance(val, (pa.Array, pa.ChunkedArray)):
return val, False
elif isinstance(val, pd.Series):
return pa.Array.from_pandas(val), True
elif isinstance(val, np.ndarray):
return pa.array(val), False
else:
return val, False

# Convert inputs to PyArrow and track pandas flags
args_results = [to_arrow(arg) for arg in args]
kwargs_results = {k: to_arrow(v) for k, v in kwargs.items()}

converted_args = [v[0] for v in args_results]
converted_kwargs = {k: v[0] for k, v in kwargs_results.items()}
input_was_pandas = any(v[1] for v in args_results) or any(
v[1] for v in kwargs_results.values()
)

# Call function with converted inputs
result = fn(*converted_args, **converted_kwargs)

# Convert result back to pandas if input was pandas
if input_was_pandas and isinstance(result, (pa.Array, pa.ChunkedArray)):
result = result.to_pandas()

return result

return arrow_wrapper


@PublicAPI(stability="alpha")
def pyarrow_udf(return_dtype: DataType) -> Callable[..., UDFExpr]:
"""Decorator for PyArrow compute functions with automatic format conversion.

This decorator wraps PyArrow compute functions to automatically convert pandas
Series and numpy arrays to PyArrow Arrays, ensuring the function works seamlessly
regardless of the underlying block format (pandas, arrow, or items).

Used internally by namespace methods (list, str, struct) that wrap PyArrow
compute functions.

Args:
return_dtype: The data type of the return value

Returns:
A callable that creates UDFExpr instances with automatic conversion
"""

def decorator(func: Callable[..., BatchColumn]) -> Callable[..., UDFExpr]:
# Wrap the function with PyArrow conversion logic
wrapped_fn = _create_pyarrow_wrapper(func)
# Create UDFExpr callable using the wrapped function
return _create_udf_callable(wrapped_fn, return_dtype)

return decorator


@DeveloperAPI(stability="alpha")
@dataclass(frozen=True, eq=False, repr=False)
class DownloadExpr(Expr):
Expand Down Expand Up @@ -889,9 +1052,30 @@ def download(uri_column_name: str) -> DownloadExpr:
"DownloadExpr",
"AliasExpr",
"StarExpr",
"pyarrow_udf",
"udf",
"col",
"lit",
"download",
"star",
"_ListNamespace",
"_StringNamespace",
"_StructNamespace",
]


def __getattr__(name: str):
"""Lazy import of namespace classes to avoid circular imports."""
if name == "_ListNamespace":
from ray.data.namespace_expressions.list_namespace import _ListNamespace

return _ListNamespace
elif name == "_StringNamespace":
from ray.data.namespace_expressions.string_namespace import _StringNamespace

return _StringNamespace
elif name == "_StructNamespace":
from ray.data.namespace_expressions.struct_namespace import _StructNamespace

return _StructNamespace
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
Empty file.
Loading