CatPtain's picture
Upload 225 files
a2afe2f verified
"""Technical Analysis Helpers."""
# pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments
from typing import TYPE_CHECKING, Any, List, Literal, Optional, Tuple, Union
from warnings import warn
if TYPE_CHECKING:
from pandas import DataFrame, Series, Timestamp
def validate_data(data: list, length: Union[int, List[int]]) -> None:
"""Validate data."""
if isinstance(length, int):
length = [length]
for item in length:
if item > len(data):
raise ValueError(
f"Data length is less than required by parameters: {max(length)}"
)
def parkinson(
data: "DataFrame",
window: int = 30,
trading_periods: Optional[int] = None,
is_crypto: bool = False,
clean=True,
) -> "DataFrame":
"""Parkinson volatility.
Uses the high and low price of the day rather than just close to close prices.
It is useful for capturing large price movements during the day.
Parameters
----------
data : DataFrame
Dataframe of OHLC prices.
window : int [default: 30]
Length of window to calculate over.
trading_periods : Optional[int] [default: 252]
Number of trading periods in a year.
is_crypto : bool [default: False]
If true, trading_periods is defined as 365.
clean : bool [default: True]
Whether to clean the data or not by dropping NaN values.
Returns
-------
DataFrame : results
Dataframe with results.
"""
# pylint: disable=import-outside-toplevel
from numpy import log
if window < 1:
warn("Error: Window must be at least 1, defaulting to 30.")
window = 30
if trading_periods and is_crypto:
warn("is_crypto is overridden by trading_periods.")
if not trading_periods:
trading_periods = 365 if is_crypto else 252
rs = (1.0 / (4.0 * log(2.0))) * ((data["high"] / data["low"]).apply(log)) ** 2.0
def f(v):
return (trading_periods * v.mean()) ** 0.5
result = rs.rolling(window=window, center=False).apply(func=f)
if clean:
return result.dropna()
return result
def standard_deviation(
data: "DataFrame",
window: int = 30,
trading_periods: Optional[int] = None,
is_crypto: bool = False,
clean: bool = True,
) -> "DataFrame":
"""Calculate the Standard deviation.
Measures how widely returns are dispersed from the average return.
It is the most common (and biased) estimator of volatility.
Parameters
----------
data : DataFrame
Dataframe of OHLC prices.
window : int [default: 30]
Length of window to calculate over.
trading_periods : Optional[int] [default: 252]
Number of trading periods in a year.
is_crypto : bool [default: False]
If true, trading_periods is defined as 365.
clean : bool [default: True]
Whether to clean the data or not by dropping NaN values.
Returns
-------
DataFrame : results
Dataframe with results.
"""
# pylint: disable=import-outside-toplevel
from numpy import log, sqrt
if window < 2:
warn("Error: Window must be at least 2, defaulting to 30.")
window = 30
if trading_periods and is_crypto:
warn("is_crypto is overridden by trading_periods.")
if not trading_periods:
trading_periods = 365 if is_crypto else 252
log_return = (data["close"] / data["close"].shift(1)).apply(log)
result = log_return.rolling(window=window, center=False).std() * sqrt(
trading_periods
)
if clean:
return result.dropna()
return result
def garman_klass(
data: "DataFrame",
window: int = 30,
trading_periods: Optional[int] = None,
is_crypto: bool = False,
clean=True,
) -> "DataFrame":
"""Garman-Klass volatility.
Extends Parkinson volatility by taking into account the opening and closing price.
As markets are most active during the opening and closing of a trading session.
It makes volatility estimation more accurate.
Parameters
----------
data : DataFrame
Dataframe of OHLC prices.
window : int [default: 30]
Length of window to calculate over.
trading_periods : Optional[int] [default: 252]
Number of trading periods in a year.
is_crypto : bool [default: False]
If true, trading_periods is defined as 365.
clean : bool [default: True]
Whether to clean the data or not by dropping NaN values.
Returns
-------
DataFrame : results
Dataframe with results.
"""
# pylint: disable=import-outside-toplevel
from numpy import log
if window < 1:
warn("Error: Window must be at least 1, defaulting to 30.")
window = 30
if trading_periods and is_crypto:
warn("is_crypto is overridden by trading_periods.")
if not trading_periods:
trading_periods = 365 if is_crypto else 252
log_hl = (data["high"] / data["low"]).apply(log)
log_co = (data["close"] / data["open"]).apply(log)
rs = 0.5 * log_hl**2 - (2 * log(2) - 1) * log_co**2
def f(v):
return (trading_periods * v.mean()) ** 0.5
result = rs.rolling(window=window, center=False).apply(func=f)
if clean:
return result.dropna()
return result
def hodges_tompkins(
data: "DataFrame",
window: int = 30,
trading_periods: Optional[int] = None,
is_crypto: bool = False,
clean=True,
) -> "DataFrame":
"""Hodges-Tompkins volatility.
Is a bias correction for estimation using an overlapping data sample.
It produces unbiased estimates and a substantial gain in efficiency.
Parameters
----------
data : DataFrame
Dataframe of OHLC prices.
window : int [default: 30]
Length of window to calculate over.
trading_periods : Optional[int] [default: 252]
Number of trading periods in a year.
is_crypto : bool [default: False]
If true, trading_periods is defined as 365.
clean : bool [default: True]
Whether to clean the data or not by dropping NaN values.
Returns
-------
DataFrame : results
Dataframe with results.
Example
-------
>>> data = obb.equity.price.historical('BTC-USD')
>>> df = obb.technical.hodges_tompkins(data, is_crypto = True)
"""
# pylint: disable=import-outside-toplevel
from numpy import log, sqrt
if window < 2:
warn("Error: Window must be at least 2, defaulting to 30.")
window = 30
if trading_periods and is_crypto:
warn("is_crypto is overridden by trading_periods.")
if not trading_periods:
trading_periods = 365 if is_crypto else 252
log_return = (data["close"] / data["close"].shift(1)).apply(log)
vol = log_return.rolling(window=window, center=False).std() * sqrt(trading_periods)
h = window
n = (log_return.count() - h) + 1
adj_factor = 1.0 / (1.0 - (h / n) + ((h**2 - 1) / (3 * n**2)))
result = vol * adj_factor
if clean:
return result.dropna()
return result
def rogers_satchell(
data: "DataFrame",
window: int = 30,
trading_periods: Optional[int] = None,
is_crypto: bool = False,
clean=True,
) -> "Series":
"""Rogers-Satchell Estimator.
Is an estimator for measuring the volatility with an average return not equal to zero.
Unlike Parkinson and Garman-Klass estimators, Rogers-Satchell incorporates a drift term,
mean return not equal to zero.
Parameters
----------
data : DataFrame
Dataframe of OHLC prices.
window : int [default: 30]
Length of window to calculate over.
trading_periods : Optional[int] [default: 252]
Number of trading periods in a year.
is_crypto : bool [default: False]
If true, trading_periods is defined as 365.
clean : bool [default: True]
Whether to clean the data or not by dropping NaN values.
Returns
-------
Series : results
Pandas Series with results.
"""
# pylint: disable=import-outside-toplevel
from numpy import log
if window < 1:
warn("Error: Window must be at least 1, defaulting to 30.")
window = 30
if trading_periods and is_crypto:
warn("is_crypto is overridden by trading_periods.")
if not trading_periods:
trading_periods = 365 if is_crypto else 252
log_ho = (data["high"] / data["open"]).apply(log)
log_lo = (data["low"] / data["open"]).apply(log)
log_co = (data["close"] / data["open"]).apply(log)
rs = log_ho * (log_ho - log_co) + log_lo * (log_lo - log_co)
def f(v):
return (trading_periods * v.mean()) ** 0.5
result = rs.rolling(window=window, center=False).apply(func=f)
if clean:
return result.dropna()
return result
def yang_zhang(
data: "DataFrame",
window: int = 30,
trading_periods: Optional[int] = None,
is_crypto: bool = False,
clean=True,
) -> "DataFrame":
"""Yang-Zhang Volatility.
Is the combination of the overnight (close-to-open volatility).
It is a weighted average of the Rogers-Satchell volatility and the open-to-close volatility.
Parameters
----------
data : DataFrame
Dataframe of OHLC prices.
window : int [default: 30]
Length of window to calculate standard deviation.
trading_periods : Optional[int] [default: 252]
Number of trading periods in a year.
is_crypto : bool [default: False]
If true, trading_periods is defined as 365.
clean : bool [default: True]
Whether to clean the data or not by dropping NaN values.
Returns
-------
DataFrame : results
Dataframe with results.
"""
# pylint: disable=import-outside-toplevel
from numpy import log, sqrt
if window < 2:
warn("Error: Window must be at least 2, defaulting to 30.")
window = 30
if trading_periods and is_crypto:
warn("is_crypto is overridden by trading_periods.")
if not trading_periods:
trading_periods = 365 if is_crypto else 252
log_ho = (data["high"] / data["open"]).apply(log)
log_lo = (data["low"] / data["open"]).apply(log)
log_co = (data["close"] / data["open"]).apply(log)
log_oc = (data["open"] / data["close"].shift(1)).apply(log)
log_oc_sq = log_oc**2
log_cc = (data["close"] / data["close"].shift(1)).apply(log)
log_cc_sq = log_cc**2
rs = log_ho * (log_ho - log_co) + log_lo * (log_lo - log_co)
close_vol = log_cc_sq.rolling(window=window, center=False).sum() * (
1.0 / (window - 1.0)
)
open_vol = log_oc_sq.rolling(window=window, center=False).sum() * (
1.0 / (window - 1.0)
)
window_rs = rs.rolling(window=window, center=False).sum() * (1.0 / (window - 1.0))
k = 0.34 / (1.34 + (window + 1) / (window - 1))
result = (open_vol + k * close_vol + (1 - k) * window_rs).apply(sqrt) * sqrt(
trading_periods
)
if clean:
return result.dropna()
return result
def calculate_cones(
data: "DataFrame",
lower_q: float,
upper_q: float,
is_crypto: bool,
model: Literal[
"std",
"parkinson",
"garman_klass",
"hodges_tompkins",
"rogers_satchell",
"yang_zhang",
],
trading_periods: Optional[int] = None,
) -> "DataFrame":
"""Calculate Cones."""
# pylint: disable=import-outside-toplevel
from pandas import DataFrame
estimator = DataFrame()
if lower_q > upper_q:
lower_q, upper_q = upper_q, lower_q
if (lower_q >= 1) or (upper_q >= 1):
raise ValueError("Error: lower_q and upper_q must be between 0 and 1")
lower_q_label = str(int(lower_q * 100))
upper_q_label = str(int(upper_q * 100))
quantiles = [lower_q, upper_q]
windows = [3, 10, 30, 60, 90, 120, 150, 180, 210, 240, 300, 360]
min_ = []
max_ = []
median = []
top_q = []
bottom_q = []
realized = []
allowed_windows = []
data = data.sort_index(ascending=True)
model_functions = {
"std": standard_deviation,
"parkinson": parkinson,
"garman_klass": garman_klass,
"hodges_tompkins": hodges_tompkins,
"rogers_satchell": rogers_satchell,
"yang_zhang": yang_zhang,
}
for window in windows:
estimator = model_functions[model]( # type: ignore
window=window,
data=data,
is_crypto=is_crypto,
trading_periods=trading_periods,
)
if estimator.empty:
continue
min_.append(estimator.min()) # type: ignore
max_.append(estimator.max()) # type: ignore
median.append(estimator.median()) # type: ignore
top_q.append(estimator.quantile(quantiles[1])) # type: ignore
bottom_q.append(estimator.quantile(quantiles[0])) # type: ignore
realized.append(estimator.iloc[-1]) # type: ignore
allowed_windows.append(window)
df_ = [realized, min_, bottom_q, median, top_q, max_]
df_windows = allowed_windows
df = DataFrame(df_, columns=df_windows)
df = df.rename(
index={
0: "realized",
1: "min",
2: f"lower_{lower_q_label}%",
3: "median",
4: f"upper_{upper_q_label}%",
5: "max",
}
)
cones_df = df.copy()
return cones_df.transpose().reset_index().rename(columns={"index": "window"})
def clenow_momentum(
values: "Series", window: int = 90
) -> Tuple[float, float, "Series"]:
"""Clenow Volatility Adjusted Momentum.
This is defined as the regression coefficient on log prices multiplied by the R^2
value of the regression.
Parameters
----------
values: Series
Values to perform regression for
window: int
Length of look back period
Returns
-------
float:
R2 of fit to log data
float:
Coefficient of linear regression
Series:
Values for best fit line
"""
# pylint: disable=import-outside-toplevel
from numpy import arange, exp, log
from pandas import Series
from sklearn.linear_model import LinearRegression
if len(values) < window:
raise ValueError(f"Calculation asks for at least last {window} days of data")
values = values[-window:]
y = log(values)
X = arange(len(y)).reshape(-1, 1) # pylint: disable=invalid-name
lr = LinearRegression()
lr.fit(X, y)
r2 = lr.score(X, y)
coef = lr.coef_[0]
annualized_coef = (exp(coef) ** 252) - 1
return r2, annualized_coef, Series(lr.predict(X))
def calculate_fib_levels(
data: "DataFrame",
close_col: str,
limit: int = 120,
start_date: Optional[Any] = None,
end_date: Optional[Any] = None,
) -> Tuple["DataFrame", "Timestamp", "Timestamp", float, float, str]:
"""Calculate Fibonacci levels.
Parameters
----------
data : DataFrame
Dataframe of prices
close_col : str
Column name of close prices
limit : int
Days to look back for retracement
start_date : Any
Custom start date for retracement
end_date : Any
Custom end date for retracement
Returns
-------
df : DataFrame
Dataframe of fib levels
min_date: Timestamp
Date of min point
max_date: Timestamp:
Date of max point
min_pr: float
Price at min point
max_pr: float
Price at max point
"""
# pylint: disable=import-outside-toplevel
from pandas import DataFrame
if close_col not in data.columns:
raise ValueError(f"Column {close_col} not in data")
if start_date and end_date:
if start_date not in data.index:
date0 = data.index[data.index.get_indexer([end_date], method="nearest")[0]]
warn(f"Start date not in data. Using nearest: {date0}")
else:
date0 = start_date
if end_date not in data.index:
date1 = data.index[data.index.get_indexer([end_date], method="nearest")[0]]
warn(f"End date not in data. Using nearest: {date1}")
else:
date1 = end_date
data0 = data.loc[date0, close_col]
data1 = data.loc[date1, close_col]
min_pr = min(data0, data1)
max_pr = max(data0, data1)
if min_pr == data0:
min_date = date0
max_date = date1
else:
min_date = date1
max_date = date0
else:
data_to_use = data.iloc[-limit:, :][close_col]
min_pr = data_to_use.min()
min_date = data_to_use.idxmin()
max_pr = data_to_use.max()
max_date = data_to_use.idxmax()
fib_levels = [0, 0.235, 0.382, 0.5, 0.618, 0.65, 1]
lvl_text: str = "left" if min_date < max_date else "right"
if min_date > max_date:
min_date, max_date = max_date, min_date
min_pr, max_pr = max_pr, min_pr
price_dif = max_pr - min_pr
levels = [
round(max_pr - price_dif * f_lev, (2 if f_lev > 1 else 4))
for f_lev in fib_levels
]
df = DataFrame()
df["Level"] = fib_levels
df["Level"] = df["Level"].apply(lambda x: str(x * 100) + "%")
df["Price"] = levels
return df, min_date, max_date, min_pr, max_pr, lvl_text