Skip to content

Commit fb6b8b8

Browse files
authored
Fixed #772 Add Ray Support (#986)
* Initial commit for adding ray support * Fixed missing import * Fixed typo * Refactored coverage reporting * Force codecov to use coverage.py report instead of generating its own * Specify coverage.xml in PWD * Added verbose flag * Minor change * Added new codecov token * Added temporary break in test converage * Minor change * Changed name of coverage file * Expand coverage to all tests, removed break * Removed comments in workflow * Removed codecov patch/project status * Minor change * Added ability to specify coverage.xml file * Split xml report and displaying report * Updated docstrings to include `ray` example * Fixed flake8 problem * Added ray docstring examples * Fixed typo * Reverted missing docstring * Minor change * Reverted docstring * Minor changes * Added ray.shutdown() * Changed how `step` is calculated to be more precise * Fixed missing paratheses
1 parent 5892869 commit fb6b8b8

File tree

17 files changed

+1200
-67
lines changed

17 files changed

+1200
-67
lines changed

.github/workflows/github-actions.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,12 @@ jobs:
152152
- name: Run Coverage Tests
153153
run: ./test.sh coverage
154154
shell: bash
155-
- name: Check Coverage Report
156-
run: coverage report -m --fail-under=100 --skip-covered --omit=docstring.py,min.py,stumpy/cache.py
155+
- name: Generate Coverage Report
156+
run: ./test.sh report coverage.stumpy.xml
157157
shell: bash
158158
- name: Upload Coverage Tests Results
159159
uses: codecov/codecov-action@v4
160+
with:
161+
file: ./coverage.stumpy.xml
162+
verbose: true
163+
token: ${{ secrets.CODECOV_TOKEN }}

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ LOG*
99
PID
1010
.coverage*
1111
coverage.xml
12+
stumpy.coverage.xml
1213
dask-worker-space
1314
stumpy.egg-info
1415
build
@@ -20,4 +21,4 @@ docs/_build
2021
.mypy_cache
2122
.directory
2223
test.py
23-
*.nbconvert.ipynb
24+
*.nbconvert.ipynb

codecov.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
coverage:
2+
status:
3+
project: off
4+
patch: off

conda.sh

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ if [[ $# -gt 0 ]]; then
1313
if [ $1 == "min" ]; then
1414
install_mode="min"
1515
echo "Installing minimum dependencies with install_mode=\"min\""
16+
elif [[ $1 == "ray" ]]; then
17+
install_mode="ray"
18+
echo "Installing ray dependencies with install_mode=\"ray\""
1619
elif [[ $1 == "numba" ]] && [[ "${arch_name}" != "arm64" ]]; then
1720
install_mode="numba"
1821
echo "Installing numba release candidate dependencies with install_mode=\"numba\""
@@ -57,6 +60,14 @@ generate_numba_environment_yaml()
5760
grep -Ev "numba|python" environment.yml > environment.numba.yml
5861
}
5962

63+
generate_ray_environment_yaml()
64+
{
65+
# Limit max Python version and append pip install ray
66+
echo "Generating \"environment.ray.yml\" File"
67+
ray_python=`./ray_python_version.py`
68+
sed "/ - python/ s/$/,<=$ray_python/" environment.yml | cat - <(echo $' - pip\n - pip:\n - ray>=2.23.0') > environment.ray.yml
69+
}
70+
6071
fix_libopenblas()
6172
{
6273
if [ ! -f $CONDA_PREFIX/lib/libopenblas.dylib ]; then
@@ -71,6 +82,7 @@ clean_up()
7182
echo "Cleaning Up"
7283
rm -rf "environment.min.yml"
7384
rm -rf "environment.numba.yml"
85+
rm -rf "environment.ray.yml"
7486
}
7587

7688
###########
@@ -92,6 +104,9 @@ fi
92104
if [[ $install_mode == "min" ]]; then
93105
generate_min_environment_yaml
94106
mamba env update --name $conda_env --file environment.min.yml || conda env update --name $conda_env --file environment.min.yml
107+
elif [[ $install_mode == "ray" ]]; then
108+
generate_ray_environment_yaml
109+
mamba env update --name $conda_env --file environment.ray.yml || conda env update --name $conda_env --file environment.ray.yml
95110
elif [[ $install_mode == "numba" ]]; then
96111
echo ""
97112
echo "Installing python=$python_version"
File renamed without changes.

ray_python_version.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/usr/bin/env python
2+
3+
import requests
4+
from packaging.version import Version
5+
6+
classifiers = (
7+
requests.get("https://pypi.org/pypi/ray/json").json().get("info").get("classifiers")
8+
)
9+
10+
versions = []
11+
for c in classifiers:
12+
x = c.split()
13+
if "Python" in x:
14+
versions.append(x[-1])
15+
16+
versions.sort(key=Version)
17+
print(versions[-1])

stumpy/aamp_stimp.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -486,16 +486,15 @@ def __init__(
486486

487487
class aamp_stimped(_aamp_stimp):
488488
"""
489-
Compute the Pan Matrix Profile with a distributed dask cluster
489+
Compute the Pan Matrix Profile with a `dask`/`ray` cluster
490490
491491
This is based on the SKIMP algorithm.
492492
493493
Parameters
494494
----------
495495
client : client
496-
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
497-
the scope of this library. Please refer to the Dask or Ray Distributed
498-
documentation.
496+
A `dask`/`ray` client. Setting up a cluster is beyond the scope of this library.
497+
Please refer to the `dask`/`ray` documentation.
499498
500499
T : numpy.ndarray
501500
The time series or sequence for which to compute the pan matrix profile
@@ -556,9 +555,8 @@ def __init__(
556555
Parameters
557556
----------
558557
client : client
559-
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
560-
the scope of this library. Please refer to the Dask or Ray Distributed
561-
documentation.
558+
A `dask`/`ray` client. Setting up a cluster is beyond the scope of this
559+
library. Please refer to the `dask`/`ray` documentation.
562560
563561
T : numpy.ndarray
564562
The time series or sequence for which to compute the pan matrix profile

stumpy/aamped.py

Lines changed: 154 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def _dask_aamped(
2424
):
2525
"""
2626
Compute the non-normalized (i.e., without z-normalization) matrix profile with a
27-
distributed dask cluster
27+
`dask` cluster
2828
2929
This is a highly distributed implementation around the Numba JIT-compiled
3030
parallelized `_aamp` function which computes the non-normalized matrix profile
@@ -33,17 +33,15 @@ def _dask_aamped(
3333
Parameters
3434
----------
3535
dask_client : client
36-
A Dask Distributed client. Setting up a distributed cluster is beyond
37-
the scope of this library. Please refer to the Dask Distributed
38-
documentation.
36+
A `dask` client. Setting up a cluster is beyond the scope of this library.
37+
Please refer to the `dask` documentation.
3938
4039
T_A : numpy.ndarray
4140
The time series or sequence for which to compute the matrix profile
4241
4342
T_B : numpy.ndarray
4443
The time series or sequence that will be used to annotate T_A. For every
45-
subsequence in T_A, its nearest neighbor in T_B will be recorded. Default is
46-
`None` which corresponds to a self-join.
44+
subsequence in T_A, its nearest neighbor in T_B will be recorded.
4745
4846
m : int
4947
Window size
@@ -159,9 +157,157 @@ def _dask_aamped(
159157
return out
160158

161159

160+
def _ray_aamped(
161+
ray_client,
162+
T_A,
163+
T_B,
164+
m,
165+
T_A_subseq_isfinite,
166+
T_B_subseq_isfinite,
167+
p,
168+
diags,
169+
ignore_trivial,
170+
k,
171+
):
172+
"""
173+
Compute the non-normalized (i.e., without z-normalization) matrix profile with a
174+
`ray` cluster
175+
176+
This is a highly distributed implementation around the Numba JIT-compiled
177+
parallelized `_aamp` function which computes the non-normalized matrix profile
178+
according to AAMP.
179+
180+
Parameters
181+
----------
182+
ray_client : client
183+
A `ray` client. Setting up a cluster is beyond the scope of this library.
184+
Please refer to the `ray` documentation.
185+
186+
T_A : numpy.ndarray
187+
The time series or sequence for which to compute the matrix profile
188+
189+
T_B : numpy.ndarray
190+
The time series or sequence that will be used to annotate T_A. For every
191+
subsequence in T_A, its nearest neighbor in T_B will be recorded.
192+
193+
m : int
194+
Window size
195+
196+
T_A_subseq_isfinite : numpy.ndarray
197+
A boolean array that indicates whether a subsequence in `T_A` contains a
198+
`np.nan`/`np.inf` value (False)
199+
200+
T_B_subseq_isfinite : numpy.ndarray
201+
A boolean array that indicates whether a subsequence in `T_B` contains a
202+
`np.nan`/`np.inf` value (False)
203+
204+
p : float
205+
The p-norm to apply for computing the Minkowski distance. Minkowski distance is
206+
typically used with `p` being 1 or 2, which correspond to the Manhattan distance
207+
and the Euclidean distance, respectively.
208+
209+
diags : numpy.ndarray
210+
The diagonal indices
211+
212+
ignore_trivial : bool, default True
213+
Set to `True` if this is a self-join. Otherwise, for AB-join, set this
214+
to `False`. Default is `True`.
215+
216+
k : int, default 1
217+
The number of top `k` smallest distances used to construct the matrix profile.
218+
Note that this will increase the total computational time and memory usage
219+
when k > 1. If you have access to a GPU device, then you may be able to
220+
leverage `gpu_stump` for better performance and scalability.
221+
222+
Returns
223+
-------
224+
out : numpy.ndarray
225+
When k = 1 (default), the first column consists of the matrix profile,
226+
the second column consists of the matrix profile indices, the third column
227+
consists of the left matrix profile indices, and the fourth column consists
228+
of the right matrix profile indices. However, when k > 1, the output array
229+
will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k])
230+
consists of the top-k matrix profile, the next set of k columns
231+
(i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile
232+
indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or,
233+
equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left
234+
matrix profile indices and the top-1 right matrix profile indices, respectively.
235+
"""
236+
core.check_ray(ray_client)
237+
238+
n_A = T_A.shape[0]
239+
n_B = T_B.shape[0]
240+
l = n_A - m + 1
241+
242+
nworkers = core.get_ray_nworkers(ray_client)
243+
244+
ndist_counts = core._count_diagonal_ndist(diags, m, n_A, n_B)
245+
diags_ranges = core._get_array_ranges(ndist_counts, nworkers, False)
246+
diags_ranges += diags[0]
247+
248+
# Scatter data to Ray cluster
249+
T_A_ref = ray_client.put(T_A)
250+
T_B_ref = ray_client.put(T_B)
251+
T_A_subseq_isfinite_ref = ray_client.put(T_A_subseq_isfinite)
252+
T_B_subseq_isfinite_ref = ray_client.put(T_B_subseq_isfinite)
253+
254+
diags_refs = []
255+
for i in range(nworkers):
256+
diags_ref = ray_client.put(
257+
np.arange(diags_ranges[i, 0], diags_ranges[i, 1], dtype=np.int64)
258+
)
259+
diags_refs.append(diags_ref)
260+
261+
ray_aamp_func = ray_client.remote(core.deco_ray_tor(_aamp))
262+
263+
refs = []
264+
for i in range(nworkers):
265+
refs.append(
266+
ray_aamp_func.remote(
267+
T_A_ref,
268+
T_B_ref,
269+
m,
270+
T_A_subseq_isfinite_ref,
271+
T_B_subseq_isfinite_ref,
272+
p,
273+
diags_refs[i],
274+
ignore_trivial,
275+
k,
276+
)
277+
)
278+
279+
results = ray_client.get(refs)
280+
# Must make a mutable copy from Ray's object store (ndarrays are immutable)
281+
profile, profile_L, profile_R, indices, indices_L, indices_R = [
282+
arr.copy() for arr in results[0]
283+
]
284+
285+
for i in range(1, nworkers):
286+
P, PL, PR, I, IL, IR = results[i] # Read-only variables
287+
# Update top-k matrix profile and matrix profile indices
288+
core._merge_topk_PI(profile, P, indices, I)
289+
290+
# Update top-1 left matrix profile and matrix profile index
291+
mask = PL < profile_L
292+
profile_L[mask] = PL[mask]
293+
indices_L[mask] = IL[mask]
294+
295+
# Update top-1 right matrix profile and matrix profile index
296+
mask = PR < profile_R
297+
profile_R[mask] = PR[mask]
298+
indices_R[mask] = IR[mask]
299+
300+
out = np.empty((l, 2 * k + 2), dtype=object)
301+
out[:, :k] = profile
302+
out[:, k : 2 * k + 2] = np.column_stack((indices, indices_L, indices_R))
303+
304+
return out
305+
306+
162307
def aamped(client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
163308
"""
164309
Compute the non-normalized (i.e., without z-normalization) matrix profile
310+
with a `dask`/`ray` cluster
165311
166312
This is a highly distributed implementation around the Numba JIT-compiled
167313
parallelized `_aamp` function which computes the non-normalized matrix profile
@@ -170,9 +316,8 @@ def aamped(client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
170316
Parameters
171317
----------
172318
client : client
173-
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
174-
the scope of this library. Please refer to the Dask or Ray Distributed
175-
documentation.
319+
A `dask`/`ray` client. Setting up a cluster is beyond the scope of this library.
320+
Please refer to the `dask`/`ray` documentation.
176321
177322
T_A : numpy.ndarray
178323
The time series or sequence for which to compute the matrix profile

0 commit comments

Comments
 (0)