Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 98 additions & 23 deletions numpy_financial/_financial.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"""

from decimal import Decimal
from typing import Optional

import numba as nb
import numpy as np
Expand Down Expand Up @@ -60,6 +61,35 @@ def _convert_when(when):
return [_when_to_num[x] for x in when]


def _validate_arrays(
*,
rate: Optional[np.ndarray],
pmt: Optional[np.ndarray],
pv: Optional[np.ndarray],
fv: Optional[np.ndarray],
when: Optional[np.ndarray],
):
if rate is not None and rate.ndim != 1:
msg = "invalid shape for rates. Rate must be either a scalar or 1d array"
raise ValueError(msg)

if pmt is not None and pmt.ndim != 1:
msg = "invalid shape for pmt. Payments must be either a scalar or 1d array"
raise ValueError(msg)

if pv is not None and pv.ndim != 1:
msg = "invalid shape for pv. Present value must be either a scalar or 1d array"
raise ValueError(msg)

if fv is not None and fv.ndim != 1:
msg = "invalid shape for fv. Future value must be either a scalar or 1d array"
raise ValueError(msg)

if when is not None and when.ndim != 1:
msg = "invalid shape for when. When must be either a scalar or 1d array"
raise ValueError(msg)


def fv(rate, nper, pmt, pv, when='end'):
"""Compute the future value.

Expand Down Expand Up @@ -261,6 +291,40 @@ def pmt(rate, nper, pv, fv=0, when='end'):
return -(fv + pv * temp) / fact


@nb.njit
def _nper_inner_loop(rate, pmt, pv, fv, when):
if rate == 0.0:
if pmt == 0.0:
# If no repayments are made the payments will go on forever
return np.inf
else:
return -(fv + pv) / pmt
else:
# We know that rate != 0.0, so we are sure this won't cause a ZeroDivisionError
z = pmt * (1.0 + rate * when) / rate
try:
numer = np.log((-fv + z) / (pv + z))
denom = np.log(1.0 + rate)
return numer / denom
except Exception: # As of March 24, numba only supports generic exceptions
# TODO: There are several ``ZeroDivisionError``s here.
# We need to figure out exactly what's causing these
# and return financially sensible values.
return np.nan


@nb.njit
def _nper_native(rates, pmts, pvs, fvs, whens, out):
for rate in range(rates.shape[0]):
for pmt in range(pmts.shape[0]):
for pv in range(pvs.shape[0]):
for fv in range(fvs.shape[0]):
for when in range(whens.shape[0]):
out[rate, pmt, pv, fv, when] = _nper_inner_loop(
rates[rate], pmts[pmt], pvs[pv], fvs[fv], whens[when]
)


def nper(rate, pmt, pv, fv=0, when='end'):
"""Compute the number of periodic payments.

Expand Down Expand Up @@ -297,43 +361,54 @@ def nper(rate, pmt, pv, fv=0, when='end'):
If you only had $150/month to pay towards the loan, how long would it take
to pay-off a loan of $8,000 at 7% annual interest?

>>> print(np.round(npf.nper(0.07/12, -150, 8000), 5))
>>> round(npf.nper(0.07/12, -150, 8000), 5)
64.07335

So, over 64 months would be required to pay off the loan.

The same analysis could be done with several different interest rates
and/or payments and/or total amounts to produce an entire table.

>>> npf.nper(*(np.ogrid[0.07/12: 0.08/12: 0.01/12,
... -150 : -99 : 50 ,
... 8000 : 9001 : 1000]))
array([[[ 64.07334877, 74.06368256],
[108.07548412, 127.99022654]],
>>> rates = [0.05, 0.06, 0.07]
>>> payments = [100, 200, 300]
>>> amounts = [7_000, 8_000, 9_000]
>>> npf.nper(rates, payments, amounts).round(3)
array([[[-30.827, -32.987, -34.94 ],
[-20.734, -22.517, -24.158],
[-15.847, -17.366, -18.78 ]],
<BLANKLINE>
[[-28.294, -30.168, -31.857],
[-19.417, -21.002, -22.453],
[-15.025, -16.398, -17.67 ]],
<BLANKLINE>
[[ 66.12443902, 76.87897353],
[114.70165583, 137.90124779]]])
[[-26.234, -27.891, -29.381],
[-18.303, -19.731, -21.034],
[-14.311, -15.566, -16.722]]])

"""
when = _convert_when(when)
rate, pmt, pv, fv, when = np.broadcast_arrays(rate, pmt, pv, fv, when)
nper_array = np.empty_like(rate, dtype=np.float64)

zero = rate == 0
nonzero = ~zero

with np.errstate(divide='ignore'):
# Infinite numbers of payments are okay, so ignore the
# potential divide by zero.
nper_array[zero] = -(fv[zero] + pv[zero]) / pmt[zero]
rate_inner = np.atleast_1d(rate)
pmt_inner = np.atleast_1d(pmt)
pv_inner = np.atleast_1d(pv)
fv_inner = np.atleast_1d(fv)
when_inner = np.atleast_1d(when)

_validate_arrays(
rate=rate_inner,
pmt=pmt_inner,
pv=pv_inner,
fv=fv_inner,
when=when_inner,
)

nonzero_rate = rate[nonzero]
z = pmt[nonzero] * (1 + nonzero_rate * when[nonzero]) / nonzero_rate
nper_array[nonzero] = (
np.log((-fv[nonzero] + z) / (pv[nonzero] + z))
/ np.log(1 + nonzero_rate)
out_shape = _get_output_array_shape(
rate_inner, pmt_inner, pv_inner, fv_inner, when_inner
)
out = np.empty(out_shape)
_nper_native(rate_inner, pmt_inner, pv_inner, fv_inner, when_inner, out)

return nper_array
return _ufunc_like(out)


def _value_like(arr, value):
Expand Down
48 changes: 48 additions & 0 deletions tests/strategies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from typing import Literal

from hypothesis import strategies as st
from hypothesis.extra import numpy as npst

# numba only supports 32-bit or 64-bit little endian values
NUMBA_ALLOWED_SIZES: list[Literal[32, 64]] = [32, 64]
NUMBA_ALLOWED_ENDIANNESS = "<"


def float_dtype():
return npst.floating_dtypes(
sizes=NUMBA_ALLOWED_SIZES,
endianness=NUMBA_ALLOWED_ENDIANNESS,
)


def int_dtype():
return npst.integer_dtypes(
sizes=NUMBA_ALLOWED_SIZES,
endianness=NUMBA_ALLOWED_ENDIANNESS,
)


def uint_dtype():
return npst.unsigned_integer_dtypes(
sizes=NUMBA_ALLOWED_SIZES,
endianness=NUMBA_ALLOWED_ENDIANNESS,
)


real_scalar_dtypes = st.one_of(float_dtype(), int_dtype(), uint_dtype())
cashflow_array_strategy = npst.arrays(
dtype=real_scalar_dtypes,
shape=npst.array_shapes(min_dims=1, max_dims=2, min_side=0, max_side=25),
)
cashflow_list_strategy = cashflow_array_strategy.map(lambda x: x.tolist())
cashflow_array_like_strategy = st.one_of(
cashflow_array_strategy,
cashflow_list_strategy,
)
short_scalar_array_strategy = npst.arrays(
dtype=real_scalar_dtypes,
shape=npst.array_shapes(min_dims=0, max_dims=1, min_side=0, max_side=5),
)
when_strategy = st.sampled_from(
['end', 'begin', 'e', 'b', 0, 1, 'beginning', 'start', 'finish']
)
Loading