diff --git a/janitor/functions/scale_mad.py b/janitor/functions/scale_mad.py new file mode 100644 index 000000000..7de12e328 --- /dev/null +++ b/janitor/functions/scale_mad.py @@ -0,0 +1,46 @@ +from __future__ import annotations +from typing import Iterable, Optional, Union, Callable +import pandas as pd, numpy as np + +def _mad(series: pd.Series) -> float: + med = series.median(skipna=True) + return (series.sub(med).abs()).median(skipna=True) + +def scale_mad( + df: pd.DataFrame, + columns: Optional[Union[Iterable[str], Callable[[pd.DataFrame], Iterable[str]]]] = None, + clip: Optional[float] = None, + zero_mad: str = "skip", # 'skip' | 'one' | 'raise' + suffix: Optional[str] = None, +) -> pd.DataFrame: + """Robustly scale numeric columns using Median and MAD.""" + if not isinstance(df, pd.DataFrame): + raise TypeError("df must be a pandas DataFrame") + out = df.copy() + if columns is None: + cols = out.select_dtypes(include=[np.number]).columns + elif callable(columns): + cols = list(columns(out)) + else: + cols = list(columns) + for col in cols: + if col not in out.columns: + continue + s = out[col] + if not np.issubdtype(s.dtype, np.number): + continue + med = s.median(skipna=True) + mad = _mad(s) + if mad == 0 or np.isnan(mad): + if zero_mad == "skip": + scaled = s + elif zero_mad == "one": + scaled = s - med + else: + raise ValueError(f"MAD is zero for column '{col}'") + else: + scaled = (s - med) / (mad * 1.4826) + if clip is not None: + scaled = scaled.clip(-clip, clip) + out[f"{col}{suffix}" if suffix else col] = scaled + return out diff --git a/tests/functions/test_scale_mad.py b/tests/functions/test_scale_mad.py new file mode 100644 index 000000000..4b87397fe --- /dev/null +++ b/tests/functions/test_scale_mad.py @@ -0,0 +1,29 @@ +import numpy as np, pandas as pd, pytest +from janitor.functions.scale_mad import scale_mad + +def test_scales_numeric_columns_default(): + df = pd.DataFrame({"x":[1,2,3,4], "y":[10,10,10,10]}) + res = scale_mad(df) + assert set(res.columns) == {"x","y"} + assert (res["y"] == 10).all() + assert abs(res["x"].median()) < 1e-9 + +def test_zero_mad_center_only(): + df = pd.DataFrame({"y":[10,10,10,10]}) + res = scale_mad(df, zero_mad="one") + assert np.isclose(res["y"].mean(), 0.0) + +def test_suffix_and_clip(): + df = pd.DataFrame({"x":[1,2,3,100]}) + res = scale_mad(df, columns=["x"], clip=3, suffix="_mad") + assert "x_mad" in res.columns and (res["x_mad"].abs() <= 3).all() + +def test_callable_column_selector(): + df = pd.DataFrame({"a":[1,2,3], "b":["x","y","z"]}) + res = scale_mad(df, columns=lambda d: d.select_dtypes("number").columns, suffix="_mad") + assert "a_mad" in res.columns + +def test_zero_mad_raise(): + df = pd.DataFrame({"y":[1,1,1]}) + with pytest.raises(ValueError): + scale_mad(df, columns=["y"], zero_mad="raise")