CatPtain's picture
Upload 225 files
a2afe2f verified
"""Relative Rotation Model."""
# pylint: disable=too-many-arguments, too-many-instance-attributes, protected-access
# pylint: disable=too-many-locals, too-few-public-methods, unused-argument
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple, Union
from openbb_core.provider.abstract.data import Data
from openbb_core.provider.abstract.fetcher import Fetcher
from openbb_core.provider.abstract.query_params import QueryParams
from pydantic import Field, field_validator
if TYPE_CHECKING:
from pandas import DataFrame, Series
def absolute_maximum_scale(data: "Series") -> "Series":
"""Absolute Maximum Scale Normaliztion Method."""
return data / data.abs().max()
def min_max_scaling(data: "Series") -> "Series":
"""Min/Max ScalingNormalization Method."""
return (data - data.min()) / (data.max() - data.min())
def z_score_standardization(data: "Series") -> "Series":
"""Z-Score Standardization Method."""
return (data - data.mean()) / data.std()
def normalize(data: "DataFrame", method: Literal["z", "m", "a"] = "z") -> "DataFrame":
"""
Normalize a Pandas DataFrame based on method.
Parameters
----------
data: "DataFrame"
Pandas DataFrame with any number of columns to be normalized.
method: Literal["z", "m", "a"]
Normalization method.
z: Z-Score Standardization
m: Min/Max Scaling
a: Absolute Maximum Scale
Returns
-------
DataFrame
Normalized DataFrame.
"""
methods = {
"z": z_score_standardization,
"m": min_max_scaling,
"a": absolute_maximum_scale,
}
df = data.copy()
for col in df.columns:
df.loc[:, col] = methods[f"{method}"](df.loc[:, col])
return df
def standard_deviation(
data: "DataFrame",
window: int = 21,
trading_periods: int = 252,
) -> "DataFrame":
"""
Measures how widely returns are dispersed from the average return.
It is the most common (and biased) estimator of volatility.
Parameters
----------
data : pd.DataFrame
Dataframe of OHLC prices.
window : int [default: 21]
Length of window to calculate over.
trading_periods : Optional[int] [default: 252]
Number of trading periods in a year.
Returns
-------
pd.DataFrame : results
Dataframe with results.
"""
# pylint: disable=import-outside-toplevel
from numpy import log, sqrt
from pandas import DataFrame
data = data.copy()
results = DataFrame()
if window < 2:
window = 21
for col in data.columns.tolist():
log_return = (data[col] / data[col].shift(1)).apply(log)
result = log_return.rolling(window=window, center=False).std() * sqrt(
trading_periods
)
results[col] = result
return results.dropna()
def calculate_momentum(
data: "Series", long_period: int = 252, short_period: int = 21
) -> "Series":
"""
Momentum is calculated as the log trailing 12-month return minus trailing one-month return.
Higher values indicate larger, positive momentum exposure.
Momentum = ln(1 + r12) - ln(1 + r1)
Parameters
----------
data: "Series"
Time series data to calculate the momentum for.
long_period: Optional[int]
Long period to base the calculation on. Default is one standard trading year.
short_period: Optional[int]
Short period to subtract from the long period. Default is one trading month.
Returns
-------
Series
Pandas Series with the calculated momentum.
"""
# pylint: disable=import-outside-toplevel
from numpy import log
df = data.copy()
epsilon = 1e-10
momentum_long = log(1 + df.pct_change(long_period) + epsilon)
momentum_short = log(1 + df.pct_change(short_period) + epsilon)
data = momentum_long - momentum_short # type: ignore
return data
def get_momentum(
data: "DataFrame", long_period: int = 252, short_period: int = 21
) -> "DataFrame":
"""
Calculate the Relative-Strength Momentum Indicator.
Takes the Relative Strength Ratio as the input.
Parameters
----------
data: "DataFrame"
Indexed time series data formatted with each column representing a ticker.
long_period: Optional[int]
Long period to base the calculation on. Default is one standard trading year.
short_period: Optional[int]
Short period to subtract from the long period. Default is one trading month.
Returns
-------
DataFrame
Pandas DataFrame with the calculated historical momentum factor exposure score.
"""
# pylint: disable=import-outside-toplevel
from pandas import DataFrame
df = data.copy()
rs_momentum = DataFrame()
for ticker in df.columns.to_list():
rs_momentum.loc[:, ticker] = calculate_momentum(
df.loc[:, ticker], long_period, short_period
) # type: ignore
return rs_momentum
def calculate_relative_strength_ratio(
symbols_data: "DataFrame",
benchmark_data: "DataFrame",
) -> "DataFrame":
"""Calculate the Relative Strength Ratio for each ticker (column) in a DataFrame against the benchmark.
Symbols data and benchmark data should have the same index,
and each column should represent a ticker.
Parameters
----------
symbols_data: "DataFrame"
Pandas DataFrame with the symbols data to compare against the benchmark.
benchmark_data: "DataFrame"
Pandas DataFrame with the benchmark data.
Returns
-------
DataFrame
Pandas DataFrame with the calculated relative strength
ratio for each ticker joined with the benchmark values.
"""
return (
symbols_data.div(benchmark_data.iloc[:, 0], axis=0)
.multiply(100)
.join(benchmark_data.iloc[:, 0])
.dropna()
)
def process_data(
symbols_data: "DataFrame",
benchmark_data: "DataFrame",
long_period: int = 252,
short_period: int = 21,
normalize_method: Literal["z", "m", "a"] = "z",
) -> Tuple["DataFrame", "DataFrame"]:
"""Process the raw data into normalized indicator values.
Parameters
----------
symbols_data: "DataFrame"
Indexed time series data formatted with each column representing a ticker.
benchmark_data: "DataFrame"
Indexed time series data of the benchmark symbol.
long_period: Optional[int]
Long period to base the calculation on. Default is one standard trading year.
short_period: Optional[int]
Short period to subtract from the long period. Default is one trading month.
normalize_method: Literal["z", "m", "a"]
Returns
-------
Tuple[DataFrame, DataFrame]
Tuple of Pandas DataFrames with the normalized ratio and momentum indicator values.
"""
ratio_data = calculate_relative_strength_ratio(symbols_data, benchmark_data)
momentum_data = get_momentum(ratio_data, long_period, short_period)
normalized_ratio = normalize(ratio_data, normalize_method)
normalized_momentum = normalize(momentum_data, normalize_method)
return normalized_ratio, normalized_momentum
class RelativeRotation:
"""Relative Rotation Class."""
def __init__(
self,
data: Union[List[Data], "DataFrame"],
benchmark: str,
study: Optional[Literal["price", "volume", "volatility"]] = "price",
long_period: Optional[int] = 252,
short_period: Optional[int] = 21,
window: Optional[int] = 21,
trading_periods: Optional[int] = 252,
):
"""Initialize the class."""
# pylint: disable=import-outside-toplevel
import contextlib # noqa
from openbb_core.app.model.obbject import OBBject # noqa
from openbb_core.app.utils import ( # noqa
basemodel_to_df,
convert_to_basemodel,
df_to_basemodel,
)
from pandas import DataFrame # noqa
benchmark = benchmark.upper()
df = DataFrame()
target_col = "volume" if study == "volume" else "close"
if isinstance(data, OBBject):
data = data.results # type: ignore
if isinstance(data, List) and (
all(isinstance(d, Data) for d in data)
or all(isinstance(d, dict) for d in data)
):
with contextlib.suppress(Exception):
df = basemodel_to_df(convert_to_basemodel(data), index="date")
if isinstance(data, DataFrame) and not df.empty:
df = data.copy()
if "date" in df.columns:
df.set_index("date", inplace=True)
if df.empty:
raise ValueError(
"Data must be a list of Data objects or a DataFrame with a 'date' column."
)
if "symbol" in df.columns:
df = df.pivot(columns="symbol", values=target_col)
if benchmark not in df.columns:
raise RuntimeError("The benchmark symbol was not found in the data.")
benchmark_data = df.pop(benchmark).to_frame()
symbols_data = df
if len(symbols_data) <= 252 and study in ["price", "volume"]: # type: ignore
raise ValueError(
"Supplied data must be daily intervals and have more than one year of back data to calculate"
" the most recent day in the time series."
)
if study == "volatility" and len(symbols_data) <= 504: # type: ignore
raise ValueError(
"Supplied data must be daily intervals and have more than two years of back data to calculate"
" the most recent day in the time series as a volatility study."
)
self.symbols = df.columns.to_list()
self.benchmark = benchmark
self.study = study
self.long_period = long_period
self.short_period = short_period
self.window = window
self.trading_periods = trading_periods
self.symbols_data = symbols_data # type: ignore
self.benchmark_data = benchmark_data # type: ignore
self._process_data() # type: ignore
self.symbols_data = df_to_basemodel(self.symbols_data.reset_index()) # type: ignore
self.benchmark_data = df_to_basemodel(self.benchmark_data.reset_index()) # type: ignore
def _process_data(self):
"""Process the data."""
# pylint: disable=import-outside-toplevel
from openbb_core.app.utils import df_to_basemodel
from pandas import to_datetime
if self.study == "volatility":
self.symbols_data = standard_deviation(
self.symbols_data, # type: ignore
window=self.window, # type: ignore
trading_periods=self.trading_periods, # type: ignore
)
self.benchmark_data = standard_deviation(
self.benchmark_data, # type: ignore
window=self.window, # type: ignore
trading_periods=self.trading_periods, # type: ignore
)
ratios, momentum = process_data(
self.symbols_data, # type: ignore
self.benchmark_data, # type: ignore
long_period=self.long_period, # type: ignore
short_period=self.short_period, # type: ignore
)
# Re-index rs_ratios using the new index
index_after_dropping_nans = momentum.dropna().index
ratios = ratios.reindex(index_after_dropping_nans)
self.rs_ratios = df_to_basemodel(ratios.reset_index())
self.rs_momentum = df_to_basemodel(momentum.dropna().reset_index())
self.end_date = to_datetime(ratios.index[-1]).strftime("%Y-%m-%d")
self.start_date = to_datetime(ratios.index[0]).strftime("%Y-%m-%d")
return self
def _get_type_name(t):
"""Get the type name of a type hint."""
if hasattr(t, "__origin__"):
if hasattr(t.__origin__, "__name__"):
return f"{t.__origin__.__name__}[{', '.join([_get_type_name(arg) for arg in t.__args__])}]"
if hasattr(t.__origin__, "_name"):
return f"{t.__origin__._name}[{', '.join([_get_type_name(arg) for arg in t.__args__])}]"
if isinstance(t, str):
return t
if hasattr(t, "__name__"):
return t.__name__
if hasattr(t, "_name"):
return t._name
return str(t)
class RelativeRotationQueryParams(QueryParams):
"""Relative Rotation Query Parameters."""
data: List[Data] = Field(
description="The data to be used for the relative rotation calculations."
+ " This should be the multi-symbol output from the"
+ " 'equity.price.historical' endpoint, or similar, at a daily interval."
+ " Or a pivot table with the 'date' column as the index, the symbols as the columns,"
+ " and the 'study' as the values."
+ " It is recommended to use the 'equity.price.historical' endpoint to get the data,"
+ " and feed the results as-is."
)
benchmark: str = Field(description="The symbol to be used as the benchmark.")
study: Literal["price", "volume", "volatility"] = Field(
default="price",
description="The data point for the calculations."
+ " If 'price', the closing price will be used."
+ " If 'volatility', the standard deviation of the closing price will be used."
+ " If 'data' is supplied as a pivot table,"
+ " the 'study' will assume the values are the closing price and 'volume' will be ignored.",
)
long_period: Optional[int] = Field(
default=252,
description="The length of the long period for momentum calculation, by default is 252."
+ " Adjust this value, to 365, when supplying assets such as crypto.",
)
short_period: Optional[int] = Field(
default=21,
description="The length of the short period for momentum calculation, by default is 21."
+ " Adjust this value, to 30, when supplying assets such as crypto.",
)
window: Optional[int] = Field(
default=21,
description="The length of window for the standard deviation calculation, by default is 21."
+ " Adjust this value, to 30, when supplying assets such as crypto.",
)
trading_periods: Optional[int] = Field(
default=252,
description="The number of trading periods per year,"
+ " for the standard deviation calculation, by default is 252."
+ " Adjust this value, to 365, when supplying assets such as crypto.",
)
chart_params: Optional[Dict[str, Any]] = Field(
default=None,
description="Additional parameters to pass when `chart=True` and the `openbb-charting` extension is installed."
+ " Parameters can be passed again to redraw the chart using the charting.to_chart() method of the response."
+ "\n"
+ "\n ChartParams"
+ "\n -----------"
+ "\n date: Optional[str]"
+ "\n A target end date within the data, by default is the last date in the data."
+ "\n show_tails: bool"
+ "\n Show the tails on the chart, by default is True."
+ "\n tail_periods: Optional[int]"
+ "\n Number of periods to show in the tails, by default is 16."
+ "\n tail_interval: Literal['day', 'week', 'month']"
+ "\n Interval to show the tails, by default is 'week'."
+ "\n title: Optional[str]"
+ "\n Title of the chart.",
)
@field_validator("benchmark", mode="before", check_fields=False)
@classmethod
def to_upper(cls, v):
"""Convert the benchmark symbol to uppercase."""
return v.upper()
@field_validator("data", mode="before", check_fields=False)
@classmethod
def convert_data(cls, v):
"""Validate the data format."""
# pylint: disable=import-outside-toplevel
from openbb_core.app.model.obbject import OBBject
from openbb_core.app.utils import convert_to_basemodel, df_to_basemodel
from pandas import DataFrame
if isinstance(v, OBBject):
return v.results
if isinstance(v, Data):
return v
if isinstance(v, (list, dict)):
return convert_to_basemodel(v)
if isinstance(v, DataFrame):
return df_to_basemodel(v.reset_index())
return v
def __init__(self, **data):
"""Initialize the class."""
super().__init__(**data)
fields = self.__class__.model_fields
doc_str = (
"\n"
+ self.__class__.__name__
+ "\n\n"
+ " Parameters\n"
+ " ----------\n"
+ "\n".join(
[
f" {k} : {_get_type_name(v.annotation)}\n {v.description}"
for k, v in fields.items()
]
)
+ "\n"
)
self.__doc__ = doc_str
class RelativeRotationData(Data):
"""Relative Rotation Data Model."""
symbols: List[str] = Field(
description="The symbols that are being compared against the benchmark."
)
benchmark: str = Field(description="The benchmark symbol, as entered by the user.")
study: Literal["price", "volume", "volatility"] = Field(
description="The data point for the study, as entered by the user."
)
long_period: int = Field(
description="The length of the long period for momentum calculation,"
+ " as entered by the user."
)
short_period: int = Field(
description="The length of the short period for momentum calculation,"
+ " as entered by the user."
)
window: int = Field(
description="The length of window for the standard deviation calculation,"
+ " as entered by the user.",
)
trading_periods: int = Field(
description="The number of trading periods per year,"
+ " for the standard deviation calculation, as entered by the user."
)
start_date: str = Field(
description="The start date of the data after adjusting"
+ " the length of the data for the calculations."
)
end_date: str = Field(description="The end date of the data.")
symbols_data: List[Data] = Field(
description="The data representing the selected 'study' for each symbol."
)
benchmark_data: List[Data] = Field(
description="The data representing the selected 'study' for the benchmark."
)
rs_ratios: List[Data] = Field(
description="The normalized relative strength ratios data."
)
rs_momentum: List[Data] = Field(
description="The normalized relative strength momentum data."
)
def __init__(self, **data):
"""Initialize the class."""
super().__init__(**data)
fields = self.__class__.model_fields
doc_str = (
"\n"
+ self.__class__.__name__
+ "\n\n"
+ " Attributes\n"
+ " ----------\n"
+ "\n".join(
[
f" {k} : {_get_type_name(v.annotation)}\n {v.description}"
for k, v in fields.items()
]
)
+ "\n"
)
self.__doc__ = doc_str
class RelativeRotationFetcher(
Fetcher[RelativeRotationQueryParams, RelativeRotationData]
):
"""Relative Rotation Fetcher."""
@staticmethod
def transform_query(params: Dict[str, Any]) -> RelativeRotationQueryParams:
"""Transform the query parameters."""
return RelativeRotationQueryParams.model_validate(**params)
@staticmethod
def extract_data(
query: RelativeRotationQueryParams,
credentials: Optional[Dict[str, str]],
**kwargs: Any,
) -> Dict:
"""Extract the data."""
return RelativeRotation(
query.data,
query.benchmark,
study=query.study,
long_period=query.long_period,
short_period=query.short_period,
window=query.window,
trading_periods=query.trading_periods,
).__dict__
@staticmethod
def transform_data(
query: RelativeRotationQueryParams,
data: Dict,
**kwargs: Any,
) -> RelativeRotationData:
"""Transform the data."""
return RelativeRotationData.model_validate(data)