4141 GroupbyIndexer ,
4242)
4343from pandas .core .window .numba_ import generate_numba_ewma_func
44+ from pandas .core .window .online import (
45+ EWMMeanState ,
46+ generate_online_numba_ewma_func ,
47+ )
4448from pandas .core .window .rolling import (
4549 BaseWindow ,
4650 BaseWindowGroupby ,
@@ -263,7 +267,7 @@ def __init__(
263267 span : float | None = None ,
264268 halflife : float | TimedeltaConvertibleTypes | None = None ,
265269 alpha : float | None = None ,
266- min_periods : int = 0 ,
270+ min_periods : int | None = 0 ,
267271 adjust : bool = True ,
268272 ignore_na : bool = False ,
269273 axis : Axis = 0 ,
@@ -273,7 +277,7 @@ def __init__(
273277 ):
274278 super ().__init__ (
275279 obj = obj ,
276- min_periods = max (int (min_periods ), 1 ),
280+ min_periods = 1 if min_periods is None else max (int (min_periods ), 1 ),
277281 on = None ,
278282 center = False ,
279283 closed = None ,
@@ -338,6 +342,48 @@ def _get_window_indexer(self) -> BaseIndexer:
338342 """
339343 return ExponentialMovingWindowIndexer ()
340344
345+ def online (self , engine = "numba" , engine_kwargs = None ):
346+ """
347+ Return an ``OnlineExponentialMovingWindow`` object to calculate
348+ exponentially moving window aggregations in an online method.
349+
350+ .. versionadded:: 1.3.0
351+
352+ Parameters
353+ ----------
354+ engine: str, default ``'numba'``
355+ Execution engine to calculate online aggregations.
356+ Applies to all supported aggregation methods.
357+
358+ engine_kwargs : dict, default None
359+ Applies to all supported aggregation methods.
360+
361+ * For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil``
362+ and ``parallel`` dictionary keys. The values must either be ``True`` or
363+ ``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is
364+ ``{{'nopython': True, 'nogil': False, 'parallel': False}}`` and will be
365+ applied to the function
366+
367+ Returns
368+ -------
369+ OnlineExponentialMovingWindow
370+ """
371+ return OnlineExponentialMovingWindow (
372+ obj = self .obj ,
373+ com = self .com ,
374+ span = self .span ,
375+ halflife = self .halflife ,
376+ alpha = self .alpha ,
377+ min_periods = self .min_periods ,
378+ adjust = self .adjust ,
379+ ignore_na = self .ignore_na ,
380+ axis = self .axis ,
381+ times = self .times ,
382+ engine = engine ,
383+ engine_kwargs = engine_kwargs ,
384+ selection = self ._selection ,
385+ )
386+
341387 @doc (
342388 _shared_docs ["aggregate" ],
343389 see_also = dedent (
@@ -655,3 +701,167 @@ def _get_window_indexer(self) -> GroupbyIndexer:
655701 window_indexer = ExponentialMovingWindowIndexer ,
656702 )
657703 return window_indexer
704+
705+
706+ class OnlineExponentialMovingWindow (ExponentialMovingWindow ):
707+ def __init__ (
708+ self ,
709+ obj : FrameOrSeries ,
710+ com : float | None = None ,
711+ span : float | None = None ,
712+ halflife : float | TimedeltaConvertibleTypes | None = None ,
713+ alpha : float | None = None ,
714+ min_periods : int | None = 0 ,
715+ adjust : bool = True ,
716+ ignore_na : bool = False ,
717+ axis : Axis = 0 ,
718+ times : str | np .ndarray | FrameOrSeries | None = None ,
719+ engine : str = "numba" ,
720+ engine_kwargs : dict [str , bool ] | None = None ,
721+ * ,
722+ selection = None ,
723+ ):
724+ if times is not None :
725+ raise NotImplementedError (
726+ "times is not implemented with online operations."
727+ )
728+ super ().__init__ (
729+ obj = obj ,
730+ com = com ,
731+ span = span ,
732+ halflife = halflife ,
733+ alpha = alpha ,
734+ min_periods = min_periods ,
735+ adjust = adjust ,
736+ ignore_na = ignore_na ,
737+ axis = axis ,
738+ times = times ,
739+ selection = selection ,
740+ )
741+ self ._mean = EWMMeanState (
742+ self ._com , self .adjust , self .ignore_na , self .axis , obj .shape
743+ )
744+ if maybe_use_numba (engine ):
745+ self .engine = engine
746+ self .engine_kwargs = engine_kwargs
747+ else :
748+ raise ValueError ("'numba' is the only supported engine" )
749+
750+ def reset (self ):
751+ """
752+ Reset the state captured by `update` calls.
753+ """
754+ self ._mean .reset ()
755+
756+ def aggregate (self , func , * args , ** kwargs ):
757+ return NotImplementedError
758+
759+ def std (self , bias : bool = False , * args , ** kwargs ):
760+ return NotImplementedError
761+
762+ def corr (
763+ self ,
764+ other : FrameOrSeriesUnion | None = None ,
765+ pairwise : bool | None = None ,
766+ ** kwargs ,
767+ ):
768+ return NotImplementedError
769+
770+ def cov (
771+ self ,
772+ other : FrameOrSeriesUnion | None = None ,
773+ pairwise : bool | None = None ,
774+ bias : bool = False ,
775+ ** kwargs ,
776+ ):
777+ return NotImplementedError
778+
779+ def var (self , bias : bool = False , * args , ** kwargs ):
780+ return NotImplementedError
781+
782+ def mean (self , * args , update = None , update_times = None , ** kwargs ):
783+ """
784+ Calculate an online exponentially weighted mean.
785+
786+ Parameters
787+ ----------
788+ update: DataFrame or Series, default None
789+ New values to continue calculating the
790+ exponentially weighted mean from the last values and weights.
791+ Values should be float64 dtype.
792+
793+ ``update`` needs to be ``None`` the first time the
794+ exponentially weighted mean is calculated.
795+
796+ update_times: Series or 1-D np.ndarray, default None
797+ New times to continue calculating the
798+ exponentially weighted mean from the last values and weights.
799+ If ``None``, values are assumed to be evenly spaced
800+ in time.
801+ This feature is currently unsupported.
802+
803+ Returns
804+ -------
805+ DataFrame or Series
806+
807+ Examples
808+ --------
809+ >>> df = pd.DataFrame({"a": range(5), "b": range(5, 10)})
810+ >>> online_ewm = df.head(2).ewm(0.5).online()
811+ >>> online_ewm.mean()
812+ a b
813+ 0 0.00 5.00
814+ 1 0.75 5.75
815+ >>> online_ewm.mean(update=df.tail(3))
816+ a b
817+ 2 1.615385 6.615385
818+ 3 2.550000 7.550000
819+ 4 3.520661 8.520661
820+ >>> online_ewm.reset()
821+ >>> online_ewm.mean()
822+ a b
823+ 0 0.00 5.00
824+ 1 0.75 5.75
825+ """
826+ result_kwargs = {}
827+ is_frame = True if self ._selected_obj .ndim == 2 else False
828+ if update_times is not None :
829+ raise NotImplementedError ("update_times is not implemented." )
830+ else :
831+ update_deltas = np .ones (
832+ max (self ._selected_obj .shape [self .axis - 1 ] - 1 , 0 ), dtype = np .float64
833+ )
834+ if update is not None :
835+ if self ._mean .last_ewm is None :
836+ raise ValueError (
837+ "Must call mean with update=None first before passing update"
838+ )
839+ result_from = 1
840+ result_kwargs ["index" ] = update .index
841+ if is_frame :
842+ last_value = self ._mean .last_ewm [np .newaxis , :]
843+ result_kwargs ["columns" ] = update .columns
844+ else :
845+ last_value = self ._mean .last_ewm
846+ result_kwargs ["name" ] = update .name
847+ np_array = np .concatenate ((last_value , update .to_numpy ()))
848+ else :
849+ result_from = 0
850+ result_kwargs ["index" ] = self ._selected_obj .index
851+ if is_frame :
852+ result_kwargs ["columns" ] = self ._selected_obj .columns
853+ else :
854+ result_kwargs ["name" ] = self ._selected_obj .name
855+ np_array = self ._selected_obj .astype (np .float64 ).to_numpy ()
856+ ewma_func = generate_online_numba_ewma_func (self .engine_kwargs )
857+ result = self ._mean .run_ewm (
858+ np_array if is_frame else np_array [:, np .newaxis ],
859+ update_deltas ,
860+ self .min_periods ,
861+ ewma_func ,
862+ )
863+ if not is_frame :
864+ result = result .squeeze ()
865+ result = result [result_from :]
866+ result = self ._selected_obj ._constructor (result , ** result_kwargs )
867+ return result
0 commit comments