Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| def _check_price_cols(df): | |
| required = ["Open", "High", "Low", "Close", "Volume"] | |
| missing = [c for c in required if c not in df.columns] | |
| if missing: | |
| raise ValueError(f"Missing required columns: {missing}") | |
| def _rma(series: pd.Series, n: int): | |
| """Wilder's moving average (RMA).""" | |
| series = series.copy().astype(float) | |
| out = pd.Series(np.nan, index=series.index) | |
| if len(series) < n: | |
| return out | |
| out.iloc[n - 1] = series.iloc[:n].mean() | |
| alpha = 1.0 / n | |
| for i in range(n, len(series)): | |
| out.iat[i] = out.iat[i - 1] * (1 - alpha) + series.iat[i] * alpha | |
| return out | |
| def _true_range(df): | |
| tr1 = df["High"] - df["Low"] | |
| tr2 = (df["High"] - df["Close"].shift(1)).abs() | |
| tr3 = (df["Low"] - df["Close"].shift(1)).abs() | |
| tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) | |
| return tr | |
| def add_daily_return(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Add Daily_Return and Log_Return | |
| """ | |
| df = df.copy() | |
| _check_price_cols(df) | |
| cols = ["Open", "High", "Low", "Close", "Volume"] | |
| df[cols] = df[cols].apply(pd.to_numeric, errors="coerce") | |
| df["Daily_Return"] = df["Close"].pct_change() | |
| df["Log_Return"] = np.log(df["Close"] / df["Close"].shift(1)) | |
| return df | |
| def add_trend_indicators( | |
| df: pd.DataFrame, | |
| sma_periods=(5, 10, 20, 50, 100, 200), | |
| ema_periods=(5, 10, 12, 20, 26, 50, 100, 200), | |
| ) -> pd.DataFrame: | |
| """ | |
| Add SMA, EMA, MACD (12/26) and MACD signal/hist | |
| """ | |
| df = df.copy() | |
| _check_price_cols(df) | |
| for p in sma_periods: | |
| df[f"SMA_{p}"] = df["Close"].rolling(window=p, min_periods=1).mean() | |
| for p in ema_periods: | |
| df[f"EMA_{p}"] = df["Close"].ewm(span=p, adjust=False).mean() | |
| ema12 = df["Close"].ewm(span=12, adjust=False).mean() | |
| ema26 = df["Close"].ewm(span=26, adjust=False).mean() | |
| df["MACD"] = ema12 - ema26 | |
| df["MACD_Signal"] = df["MACD"].ewm(span=9, adjust=False).mean() | |
| df["MACD_Hist"] = df["MACD"] - df["MACD_Signal"] | |
| return df | |
| def add_momentum_indicators( | |
| df: pd.DataFrame, rsi_n=14, sto_k=14, sto_d=3, cci_n=20, roc_n=12 | |
| ) -> pd.DataFrame: | |
| """ | |
| add common momentum indicators: | |
| -RSI | |
| -Stochastic %K/%D | |
| -Williams %R | |
| -CCI | |
| -ROC | |
| -Momentum | |
| -CMO | |
| -Ultimate Oscillator. | |
| """ | |
| df = df.copy() | |
| _check_price_cols(df) | |
| close = df["Close"] | |
| # RSI (Wilder smoothing) | |
| delta = close.diff() | |
| up = delta.clip(lower=0) | |
| down = -delta.clip(upper=0) | |
| df[f"RSI_{rsi_n}"] = 100 - (100 / (1 + (_rma(up, rsi_n) / _rma(down, rsi_n)))) | |
| # Stochastic %K and %D | |
| low_min = df["Low"].rolling(window=sto_k).min() | |
| high_max = df["High"].rolling(window=sto_k).max() | |
| df["Sto_%K"] = 100 * (close - low_min) / (high_max - low_min) | |
| df["Sto_%D"] = df["Sto_%K"].rolling(window=sto_d).mean() | |
| # Williams %R | |
| df[f"Williams_%R_{sto_k}"] = -100 * (high_max - close) / (high_max - low_min) | |
| # CCI | |
| tp = (df["High"] + df["Low"] + df["Close"]) / 3.0 | |
| sma_tp = tp.rolling(cci_n).mean() | |
| mad = tp.rolling(cci_n).apply(lambda x: np.mean(np.abs(x - np.mean(x))), raw=True) | |
| df[f"CCI_{cci_n}"] = (tp - sma_tp) / (0.015 * mad) | |
| # ROC and Momentum | |
| df[f"ROC_{roc_n}"] = close.pct_change(periods=roc_n) | |
| df[f"Momentum_{roc_n}"] = close - close.shift(roc_n) | |
| # CMO (Chande Momentum Oscillator) | |
| up_sum = delta.clip(lower=0).rolling(rsi_n).sum() | |
| down_sum = -delta.clip(upper=0).rolling(rsi_n).sum() | |
| df[f"CMO_{rsi_n}"] = 100 * (up_sum - down_sum) / (up_sum + down_sum) | |
| # Ultimate Oscillator (7,14,28 default) | |
| def _ultimate_osc(df_, s1=7, s2=14, s3=28): | |
| bp = df_["Close"] - df_[["Low", "Close"]].shift(1).min(axis=1) | |
| tr = _true_range(df_) | |
| avg1 = bp.rolling(s1).sum() / tr.rolling(s1).sum() | |
| avg2 = bp.rolling(s2).sum() / tr.rolling(s2).sum() | |
| avg3 = bp.rolling(s3).sum() / tr.rolling(s3).sum() | |
| return 100 * ((4 * avg1) + (2 * avg2) + (1 * avg3)) / (4 + 2 + 1) | |
| df["Ultimate_Osc"] = _ultimate_osc(df) | |
| return df | |
| def add_volume_indicators(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Add volume-based indicators: | |
| = OBV | |
| - CMF | |
| - ADL | |
| - VPT | |
| - Volume Oscillator | |
| - MFI | |
| - Force Index. | |
| """ | |
| df = df.copy() | |
| _check_price_cols(df) | |
| # OBV | |
| df["OBV"] = ((np.sign(df["Close"].diff()) * df["Volume"]).fillna(0)).cumsum() | |
| # Money Flow Multiplier and Chaikin Money Flow | |
| mf_mult = ((df["Close"] - df["Low"]) - (df["High"] - df["Close"])) / ( | |
| df["High"] - df["Low"] | |
| ) | |
| mf_mult = mf_mult.replace([np.inf, -np.inf], 0).fillna(0) | |
| mf_volume = mf_mult * df["Volume"] | |
| df["CMF_20"] = mf_volume.rolling(20).sum() / df["Volume"].rolling(20).sum() | |
| # ADL | |
| df["ADL"] = mf_volume.cumsum() | |
| # VPT | |
| df["VPT"] = (df["Volume"] * df["Close"].pct_change()).fillna(0).cumsum() | |
| # Volume Oscillator (short=10, long=20) | |
| df["VO_short_10"] = df["Volume"].rolling(10).mean() | |
| df["VO_long_20"] = df["Volume"].rolling(20).mean() | |
| df["Volume_Osc"] = (df["VO_short_10"] - df["VO_long_20"]) / df["VO_long_20"] | |
| # MFI (Money Flow Index) | |
| tp = (df["High"] + df["Low"] + df["Close"]) / 3.0 | |
| mf = tp * df["Volume"] | |
| pos_mf = mf.where(tp > tp.shift(1), 0) | |
| neg_mf = mf.where(tp < tp.shift(1), 0) | |
| df["MFI_14"] = 100 - ( | |
| 100 / (1 + (pos_mf.rolling(14).sum() / neg_mf.rolling(14).sum())) | |
| ) | |
| # Force Index | |
| df["ForceIndex_1"] = df["Close"].diff(1) * df["Volume"] | |
| df["ForceIndex_EMA_13"] = df["ForceIndex_1"].ewm(span=13, adjust=False).mean() | |
| return df | |
| def add_volatility_indicators(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Add | |
| - ATR | |
| - Bollinger Bands | |
| - Donchian channels | |
| """ | |
| df = df.copy() | |
| _check_price_cols(df) | |
| tr = _true_range(df) | |
| df["TR"] = tr | |
| df["ATR_14"] = tr.rolling(14).mean() | |
| # Bollinger Bands (20,2) | |
| n = 20 | |
| df["BB_MID_20"] = df["Close"].rolling(n).mean() | |
| df["BB_STD_20"] = df["Close"].rolling(n).std() | |
| df["BB_UPPER_20"] = df["BB_MID_20"] + 2 * df["BB_STD_20"] | |
| df["BB_LOWER_20"] = df["BB_MID_20"] - 2 * df["BB_STD_20"] | |
| df["BB_Width"] = (df["BB_UPPER_20"] - df["BB_LOWER_20"]) / df["BB_MID_20"] | |
| # Donchian (20) | |
| dc = 20 | |
| df["Donchian_High_20"] = df["High"].rolling(dc).max() | |
| df["Donchian_Low_20"] = df["Low"].rolling(dc).min() | |
| df["Donchian_Mid_20"] = (df["Donchian_High_20"] + df["Donchian_Low_20"]) / 2.0 | |
| return df | |
| # made by chatgpt because I have no idea how formula for these even works | |
| def add_hybrid_indicators(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Add VWAP (cumulative), Heikin-Ashi candles, ADX, Aroon, Vortex. | |
| Each is implemented in a focused manner. | |
| """ | |
| df = df.copy() | |
| _check_price_cols(df) | |
| # VWAP cumulative | |
| tp = (df["High"] + df["Low"] + df["Close"]) / 3.0 | |
| cum_vp = (tp * df["Volume"]).cumsum() | |
| cum_vol = df["Volume"].cumsum().replace(0, np.nan) | |
| df["VWAP_cum"] = cum_vp / cum_vol | |
| # Heikin-Ashi | |
| ha_close = (df["Open"] + df["High"] + df["Low"] + df["Close"]) / 4.0 | |
| ha_open = ha_close.copy() | |
| if len(ha_open) > 0: | |
| ha_open.iloc[0] = (df["Open"].iloc[0] + df["Close"].iloc[0]) / 2.0 | |
| for i in range(1, len(ha_open)): | |
| ha_open.iat[i] = (ha_open.iat[i - 1] + ha_close.iat[i - 1]) / 2.0 | |
| df["HA_Open"] = ha_open | |
| df["HA_Close"] = ha_close | |
| df["HA_High"] = df[["High", "HA_Open", "HA_Close"]].max(axis=1) | |
| df["HA_Low"] = df[["Low", "HA_Open", "HA_Close"]].min(axis=1) | |
| # ADX (using DI sums approach) | |
| def _adx(df_, n=14): | |
| up_move = df_["High"].diff() | |
| down_move = -df_["Low"].diff() | |
| plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0.0) | |
| minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0.0) | |
| tr = _true_range(df_) | |
| atr = tr.rolling(n).mean() | |
| plus_dm_sm = pd.Series(plus_dm, index=df_.index).rolling(window=n).sum() | |
| minus_dm_sm = pd.Series(minus_dm, index=df_.index).rolling(window=n).sum() | |
| plus_di = 100 * (plus_dm_sm / atr) | |
| minus_di = 100 * (minus_dm_sm / atr) | |
| dx = (abs(plus_di - minus_di) / (plus_di + minus_di)) * 100 | |
| adx = dx.rolling(n).mean() | |
| return plus_di, minus_di, adx | |
| df["+DI_14"], df["-DI_14"], df["ADX_14"] = _adx(df, 14) | |
| # Aroon (n=25) | |
| def _aroon(df_, n=25): | |
| # Aroon up/down in percentage. This implementation uses rolling apply. | |
| def single_aroon_up(arr): | |
| # arr is an array of highs in the window | |
| idx = np.argmax(arr) | |
| periods_since_high = (len(arr) - 1) - idx | |
| return ((n - periods_since_high) / n) * 100.0 | |
| def single_aroon_down(arr): | |
| idx = np.argmin(arr) | |
| periods_since_low = (len(arr) - 1) - idx | |
| return ((n - periods_since_low) / n) * 100.0 | |
| aroon_up = df_["High"].rolling(window=n).apply(single_aroon_up, raw=True) | |
| aroon_down = df_["Low"].rolling(window=n).apply(single_aroon_down, raw=True) | |
| return aroon_up, aroon_down | |
| try: | |
| df["Aroon_Up_25"], df["Aroon_Down_25"] = _aroon(df, 25) | |
| except Exception: | |
| df["Aroon_Up_25"] = np.nan | |
| df["Aroon_Down_25"] = np.nan | |
| # Vortex Indicator | |
| def _vortex(df_, n=14): | |
| tr = _true_range(df_) | |
| trn = tr.rolling(n).sum() | |
| vmp = (df_["High"] - df_["Low"].shift(1)).abs().rolling(n).sum() | |
| vmm = (df_["Low"] - df_["High"].shift(1)).abs().rolling(n).sum() | |
| vip = vmp / trn | |
| vim = vmm / trn | |
| return vip, vim | |
| df["Vortex_Pos_14"], df["Vortex_Neg_14"] = _vortex(df, 14) | |
| return df | |
| def add_all_indicators(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Convenience wrapper that runs all modular functions in a safe order. | |
| """ | |
| df = df.copy() | |
| df = add_daily_return(df) | |
| df = add_trend_indicators(df) | |
| df = add_momentum_indicators(df) | |
| df = add_volume_indicators(df) | |
| df = add_volatility_indicators(df) | |
| df = add_hybrid_indicators(df) | |
| # cleanup infinities | |
| df.replace([np.inf, -np.inf], np.nan, inplace=True) | |
| return df | |