| """ |
| P170 MAXIMUM SYSTEM β dhan_financial_intelligence |
| ================================================== |
| Confirmed data layout: |
| data/lake/clean/market_data/NSE/SYMBOL_real_market_data_v2.parquet |
| columns: [timestamp, open, high, low, close, volume] β 2155 symbols |
| data/lake/ml/features/NSE/SYMBOL_features_ml_dataset_v1.parquet |
| β only 5 symbols (HDFCBANK, ICICIBANK, INFY, RELIANCE, TCS) β fallback only |
| |
| All outputs go to data/lake/ml/p170/ β nothing existing is touched. |
| |
| Usage: |
| cd /home/user1/dhan_financial_intelligence |
| source .venv/bin/activate |
| pip install lightgbm catboost --break-system-packages # recommended |
| python p170_max_system.py --check-deps |
| python p170_max_system.py --symbol RELIANCE |
| python p170_max_system.py --all-symbols --top-n 50 |
| python p170_max_system.py --all-symbols |
| python p170_max_system.py --predict RELIANCE |
| """ |
|
|
| import sys |
| import json |
| import time |
| import logging |
| import warnings |
| import argparse |
| import traceback |
| from pathlib import Path |
| from datetime import datetime |
| from typing import Optional, List, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| warnings.filterwarnings("ignore") |
|
|
| |
| Path("logs/ml").mkdir(parents=True, exist_ok=True) |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(message)s", |
| handlers=[ |
| logging.StreamHandler(sys.stdout), |
| logging.FileHandler(f"logs/ml/p170_{datetime.now():%Y%m%d_%H%M%S}.log"), |
| ], |
| ) |
| log = logging.getLogger("p170") |
|
|
| |
| |
| |
| BASE = Path(".") |
| CLEAN_DIR = BASE / "data/lake/clean/market_data/NSE" |
| FEAT_DIR = BASE / "data/lake/ml/features/NSE" |
| OUT = BASE / "data/lake/ml/p170" |
| MODELS_DIR = OUT / "models" |
| METRICS_DIR = OUT / "metrics" |
| PREDS_DIR = OUT / "predictions" |
| REGISTRY_DIR = OUT / "registry" |
|
|
| for d in [MODELS_DIR, METRICS_DIR, PREDS_DIR, REGISTRY_DIR]: |
| d.mkdir(parents=True, exist_ok=True) |
|
|
| |
| |
| |
| TARGET = "direction_class" |
| HORIZON = 3 |
| POS_TH = 0.015 |
| NEG_TH = -0.015 |
| TRAIN_R = 0.70 |
| VAL_R = 0.15 |
| TEST_R = 0.15 |
| WF_FOLDS = 5 |
| MIN_ROWS = 80 |
| SEED = 42 |
| np.random.seed(SEED) |
|
|
|
|
| |
| |
| |
|
|
| def discover_symbols() -> List[Tuple[str, Path]]: |
| """ |
| Returns [(symbol, parquet_path), ...] sorted largest-file-first (most data first). |
| Strips _real_market_data_v2 to get clean tickers. |
| """ |
| seen: dict = {} |
|
|
| |
| for f in CLEAN_DIR.glob("*_real_market_data_v2.parquet"): |
| sym = f.stem.replace("_real_market_data_v2", "").upper() |
| if sym: |
| seen[sym] = f |
|
|
| |
| for f in FEAT_DIR.glob("*_features_ml_dataset_v1.parquet"): |
| sym = f.stem.replace("_features_ml_dataset_v1", "").upper() |
| if sym and sym not in seen: |
| seen[sym] = f |
|
|
| pairs = list(seen.items()) |
| pairs.sort(key=lambda x: x[1].stat().st_size, reverse=True) |
| return pairs |
|
|
|
|
| |
| |
| |
|
|
| def load_candles(symbol: str) -> Optional[pd.DataFrame]: |
| """ |
| Load OHLCV for symbol. Returns DataFrame with column 'date' (datetime). |
| Clean market data files use 'timestamp' β renamed here. |
| Returns None if not found or too small. |
| """ |
| candidates = [ |
| CLEAN_DIR / f"{symbol}_real_market_data_v2.parquet", |
| CLEAN_DIR / f"{symbol}.parquet", |
| FEAT_DIR / f"{symbol}_features_ml_dataset_v1.parquet", |
| ] |
|
|
| for path in candidates: |
| if not path.exists(): |
| continue |
| try: |
| df = pd.read_parquet(path) |
|
|
| |
| if "date" not in df.columns: |
| if "timestamp" in df.columns: |
| df = df.rename(columns={"timestamp": "date"}) |
| elif df.index.name: |
| df = df.reset_index().rename(columns={df.index.name: "date"}) |
| else: |
| continue |
|
|
| df["date"] = pd.to_datetime(df["date"]) |
|
|
| |
| if not all(c in df.columns for c in ["open", "high", "low", "close", "volume"]): |
| continue |
|
|
| df = (df[["date", "open", "high", "low", "close", "volume"]] |
| .dropna(subset=["close", "volume"]) |
| .sort_values("date") |
| .reset_index(drop=True)) |
|
|
| if len(df) >= MIN_ROWS: |
| return df |
|
|
| except Exception as e: |
| log.debug(f"Could not load {path}: {e}") |
| continue |
|
|
| return None |
|
|
|
|
| |
| |
| |
|
|
|
|
| |
| _NIFTYBEES_CACHE: dict = {} |
|
|
| def _load_niftybees() -> "pd.DataFrame | None": |
| """ |
| Load NIFTYBEES (Nifty50 proxy) and compute market-level features. |
| Cached after first load. Returns DataFrame with columns: |
| [date, mkt_ret_1d, mkt_ret_5d, mkt_vol_20d, mkt_above_ma20] |
| """ |
| global _NIFTYBEES_CACHE |
| if "df" in _NIFTYBEES_CACHE: |
| return _NIFTYBEES_CACHE["df"] |
|
|
| path = CLEAN_DIR / "NIFTYBEES_real_market_data_v2.parquet" |
| if not path.exists(): |
| _NIFTYBEES_CACHE["df"] = None |
| return None |
|
|
| try: |
| m = pd.read_parquet(path) |
| if "timestamp" in m.columns: |
| m = m.rename(columns={"timestamp": "date"}) |
| m["date"] = pd.to_datetime(m["date"]) |
| m = m.sort_values("date").reset_index(drop=True) |
|
|
| mc = m["close"].astype(float) |
| m["mkt_ret_1d"] = mc.pct_change(1) |
| m["mkt_ret_5d"] = mc.pct_change(5) |
| m["mkt_vol_20d"] = m["mkt_ret_1d"].rolling(20).std() |
| m["mkt_ma20"] = mc.rolling(20).mean() |
| m["mkt_above_ma20"] = (mc > m["mkt_ma20"]).astype(int) |
|
|
| |
| mkt_out = m[["date","mkt_ret_1d","mkt_ret_5d", |
| "mkt_vol_20d","mkt_above_ma20"]].copy() |
| |
| for col in ["mkt_ret_1d","mkt_ret_5d","mkt_vol_20d","mkt_above_ma20"]: |
| mkt_out[col] = mkt_out[col].ffill().bfill().fillna(0) |
|
|
| _NIFTYBEES_CACHE["df"] = mkt_out |
| return mkt_out |
| except Exception as e: |
| log.debug(f"NIFTYBEES load failed: {e}") |
| _NIFTYBEES_CACHE["df"] = None |
| return None |
|
|
|
|
| |
| _CROSS_ASSET_CACHE: dict = {} |
|
|
| def _load_cross_asset(ticker: str, key: str) -> "pd.DataFrame | None": |
| """Load cross-asset daily returns. Reads from parquet cache first, falls back to yfinance.""" |
| global _CROSS_ASSET_CACHE |
| if key in _CROSS_ASSET_CACHE: |
| return _CROSS_ASSET_CACHE[key] |
|
|
| |
| parquet_map = {"nifty": "NIFTY50", "usdinr": "USDINR", "crude": "CRUDE"} |
| pname = parquet_map.get(key) |
| cross_path = BASE / f"data/lake/clean/market_data/CROSS/{pname}.parquet" if pname else None |
|
|
| try: |
| if cross_path and cross_path.exists(): |
| raw = pd.read_parquet(cross_path) |
| raw["date"] = pd.to_datetime(raw["date"]).dt.tz_localize(None) |
| result = raw[["date", "ret_1d", "ret_5d"]].copy() |
| _CROSS_ASSET_CACHE[key] = result |
| log.debug(f"Cross-asset {key}: loaded {len(result)} rows from parquet") |
| return result |
|
|
| |
| import yfinance as yf |
| raw = yf.download(ticker, period="max", progress=False, auto_adjust=True) |
| if raw is None or raw.empty: |
| _CROSS_ASSET_CACHE[key] = None |
| return None |
| if isinstance(raw.columns, pd.MultiIndex): |
| raw.columns = [c[0].lower() for c in raw.columns] |
| else: |
| raw.columns = [c.lower() for c in raw.columns] |
| raw = raw.reset_index() |
| raw = raw.rename(columns={raw.columns[0]: "date"}) |
| raw["date"] = pd.to_datetime(raw["date"]).dt.tz_localize(None) |
| raw["ret_1d"] = raw["close"].pct_change(1) |
| raw["ret_5d"] = raw["close"].pct_change(5) |
| result = raw[["date", "ret_1d", "ret_5d"]].copy() |
| _CROSS_ASSET_CACHE[key] = result |
| return result |
| except Exception as e: |
| log.debug(f"Cross-asset {ticker} failed: {e}") |
| _CROSS_ASSET_CACHE[key] = None |
| return None |
|
|
|
|
| def _hurst_rs(prices: np.ndarray) -> float: |
| """Hurst exponent via R/S analysis. >0.5=trending, <0.5=mean-reverting.""" |
| try: |
| n = len(prices) |
| if n < 20: |
| return 0.5 |
| mean_adj = prices - np.mean(prices) |
| cs = np.cumsum(mean_adj) |
| r = np.max(cs) - np.min(cs) |
| s = np.std(prices, ddof=1) |
| if s == 0: |
| return 0.5 |
| return float(np.log(r / s) / np.log(n)) |
| except Exception: |
| return 0.5 |
|
|
| def engineer_features(raw: pd.DataFrame) -> pd.DataFrame: |
| """ |
| Input : OHLCV DataFrame with [date, open, high, low, close, volume] |
| Output: same rows with 87 feature columns + future_return + direction_class |
| """ |
| df = raw.copy() |
| c = df["close"].astype(float) |
| h = df["high"].astype(float) |
| l = df["low"].astype(float) |
| o = df["open"].astype(float) |
| v = df["volume"].astype(float).replace(0, np.nan) |
|
|
| |
| |
| keltner_mid = c.ewm(span=20, adjust=False).mean() |
| _tr_k = pd.concat([h-l, (h-c.shift()).abs(), (l-c.shift()).abs()], axis=1).max(axis=1) |
| atr14_k = _tr_k.ewm(span=14, adjust=False).mean() |
| df["keltner_width"] = (keltner_mid + 2*atr14_k - (keltner_mid - 2*atr14_k)) / (keltner_mid + 1e-9) |
|
|
| |
| log_ret = np.log(c / (c.shift(1) + 1e-9)) |
| for w in [10, 20, 60]: |
| df[f"hist_vol_{w}"] = log_ret.rolling(w).std() * np.sqrt(252) |
|
|
| |
| typical = (h + l + c) / 3 |
| raw_mf = typical * v |
| pos_mf = raw_mf.where(typical > typical.shift(1), 0).rolling(14).sum() |
| neg_mf = raw_mf.where(typical < typical.shift(1), 0).rolling(14).sum() |
| df["mfi_14"] = 100 - (100 / (1 + pos_mf / (neg_mf + 1e-9))) |
|
|
| |
| clv = ((c - l) - (h - c)) / (h - l + 1e-9) |
| df["cmf_20"] = (clv * v).rolling(20).sum() / (v.rolling(20).sum() + 1e-9) |
|
|
| |
| typical_p = (h + l + c) / 3 |
| vwap = (typical_p * v).rolling(20).sum() / (v.rolling(20).sum() + 1e-9) |
| df["vwap_dist"] = (c - vwap) / (vwap + 1e-9) |
|
|
| |
| df["amihud_illiquidity"] = (c.pct_change().abs() / (v * c + 1e-9)).rolling(20).mean() |
|
|
| |
| df["vol_ratio_5"] = v / (v.rolling(5).mean() + 1e-9) |
| df["vol_ratio_20"] = v / (v.rolling(20).mean() + 1e-9) |
|
|
| |
| obv_raw = (np.sign(c.diff()) * v).cumsum() |
| obv_ema = obv_raw.ewm(span=20, adjust=False).mean() |
| df["obv_divergence"] = obv_raw - obv_ema |
|
|
| |
| df["williams_r"] = -100 * (h.rolling(14).max() - c) / (h.rolling(14).max() - l.rolling(14).min() + 1e-9) |
|
|
| |
| df["cci_20"] = (c - c.rolling(20).mean()) / (0.015 * c.rolling(20).std() + 1e-9) |
|
|
| |
| if "macd" in df.columns and "macd_sig" in df.columns: |
| df["macd_cross_up"] = ( |
| (df["macd"] > df["macd_sig"]) & (df["macd"].shift(1) <= df["macd_sig"].shift(1)) |
| ).astype(int) |
| else: |
| df["macd_cross_up"] = 0 |
|
|
| |
| if "stoch_k_14" in df.columns and "stoch_d_14" in df.columns: |
| df["stoch_cross_up"] = ( |
| (df["stoch_k_14"] > df["stoch_d_14"]) & |
| (df["stoch_k_14"].shift(1) <= df["stoch_d_14"].shift(1)) |
| ).astype(int) |
| else: |
| df["stoch_cross_up"] = 0 |
|
|
| |
| body = (c - o).abs() |
| full_range = (h - l).replace(0, np.nan) |
| upper_sh = (h - pd.concat([c, o], axis=1).max(axis=1)) / full_range |
| lower_sh = (pd.concat([c, o], axis=1).min(axis=1) - l) / full_range |
| df["gap_up"] = ((o - c.shift(1)) / (c.shift(1) + 1e-9)).clip(lower=0) |
| df["gap_down"] = ((c.shift(1) - o) / (c.shift(1) + 1e-9)).clip(lower=0) |
| df["inside_bar"] = ((h < h.shift(1)) & (l > l.shift(1))).astype(int) |
| df["doji"] = (body / (full_range + 1e-9) < 0.1).astype(int) |
| df["hammer"] = ((lower_sh > 2 * body / (full_range + 1e-9)) & (upper_sh < 0.3)).astype(int) |
| df["shooting_star"] = ((upper_sh > 2 * body / (full_range + 1e-9)) & (lower_sh < 0.3)).astype(int) |
| df["engulfing_bull"] = ( |
| (c > o) & (o < c.shift(1)) & (c > o.shift(1)) & (c.shift(1) < o.shift(1)) |
| ).astype(int) |
| df["engulfing_bear"] = ( |
| (c < o) & (o > c.shift(1)) & (c < o.shift(1)) & (c.shift(1) > o.shift(1)) |
| ).astype(int) |
|
|
| |
| df["dd_from_high_20"] = (c - c.rolling(20).max()) / (c.rolling(20).max() + 1e-9) |
| df["dd_from_high_60"] = (c - c.rolling(60).max()) / (c.rolling(60).max() + 1e-9) |
|
|
| |
| log_ret_s = np.log(c / (c.shift(1) + 1e-9)) |
| df["rolling_sharpe_20"] = (log_ret_s.rolling(20).mean() / (log_ret_s.rolling(20).std() + 1e-9)) * np.sqrt(252) |
| df["rolling_sharpe_60"] = (log_ret_s.rolling(60).mean() / (log_ret_s.rolling(60).std() + 1e-9)) * np.sqrt(252) |
|
|
| |
| df["hurst_50"] = c.pct_change().rolling(20).apply(lambda x: x.autocorr(lag=1) if len(x)>5 else 0, raw=False) |
|
|
| |
| try: |
| nifty_df = _load_cross_asset("^NSEI", "nifty") |
| usdinr_df = _load_cross_asset("USDINR=X", "usdinr") |
| crude_df = _load_cross_asset("CL=F", "crude") |
|
|
| if "date" in df.columns: |
| df["date"] = pd.to_datetime(df["date"]) |
| for cross, prefix in [(nifty_df,"nifty"), (usdinr_df,"usdinr"), (crude_df,"crude")]: |
| if cross is not None: |
| merged = df[["date"]].merge(cross, on="date", how="left") |
| if prefix == "nifty": |
| df["nifty_ret_1d"] = merged["ret_1d"].values |
| df["nifty_ret_5d"] = merged["ret_5d"].values |
| |
| sr = c.pct_change() |
| nr = pd.Series(merged["ret_1d"].values, index=df.index) |
| df["beta_20d"] = sr.rolling(20).cov(nr) / (nr.rolling(20).var() + 1e-9) |
| elif prefix == "usdinr": |
| df["usd_inr_ret"] = merged["ret_1d"].values |
| elif prefix == "crude": |
| df["crude_ret"] = merged["ret_1d"].values |
| except Exception as e: |
| log.debug(f"Cross-asset features skipped: {e}") |
| for col in ["nifty_ret_1d","nifty_ret_5d","beta_20d","usd_inr_ret","crude_ret"]: |
| if col not in df.columns: |
| df[col] = 0.0 |
|
|
| for p in [1, 3, 5, 10, 20]: |
| df[f"return_{p}d"] = c.pct_change(p) |
|
|
| |
| r1 = df["return_1d"] |
| for w in [5, 10, 20]: |
| df[f"volatility_{w}d"] = r1.rolling(w).std() |
| df["vol_ratio"] = df["volatility_5d"] / (df["volatility_20d"] + 1e-9) |
| df["skew_20"] = r1.rolling(20).skew() |
| df["kurt_20"] = r1.rolling(20).kurt() |
|
|
| |
| for w in [5, 10, 20, 50]: |
| df[f"ma_{w}"] = c.rolling(w).mean() |
| df[f"close_to_ma_{w}"] = (c - df[f"ma_{w}"]) / (df[f"ma_{w}"] + 1e-9) |
|
|
| |
| ema5 = c.ewm(span=5, adjust=False).mean() |
| ema20 = c.ewm(span=20, adjust=False).mean() |
| ema50 = c.ewm(span=50, adjust=False).mean() |
| df["ema_5_20_ratio"] = ema5 / (ema20 + 1e-9) |
| df["ema_20_50_ratio"] = ema20 / (ema50 + 1e-9) |
| df["ema_align"] = (ema5 > ema20).astype(int) + (ema20 > ema50).astype(int) |
|
|
| |
| delta = c.diff() |
| gain = delta.clip(lower=0) |
| loss = (-delta).clip(lower=0) |
| for period in [7, 14, 21]: |
| ag = gain.ewm(com=period - 1, min_periods=period).mean() |
| al = loss.ewm(com=period - 1, min_periods=period).mean() |
| df[f"rsi_{period}"] = 100 - 100 / (1 + ag / (al + 1e-9)) |
|
|
| |
| ema12 = c.ewm(span=12, adjust=False).mean() |
| ema26 = c.ewm(span=26, adjust=False).mean() |
| df["macd"] = ema12 - ema26 |
| df["macd_sig"] = df["macd"].ewm(span=9, adjust=False).mean() |
| df["macd_hist"] = df["macd"] - df["macd_sig"] |
| df["macd_cross"] = (df["macd"] > df["macd_sig"]).astype(int) |
|
|
| |
| for w in [10, 20]: |
| mid = c.rolling(w).mean() |
| std = c.rolling(w).std() |
| bb_u = mid + 2 * std |
| bb_l = mid - 2 * std |
| df[f"bb_width_{w}"] = (bb_u - bb_l) / (mid + 1e-9) |
| df[f"bb_pct_{w}"] = (c - bb_l) / (bb_u - bb_l + 1e-9) |
|
|
| |
| tr = pd.concat( |
| [h - l, (h - c.shift()).abs(), (l - c.shift()).abs()], axis=1 |
| ).max(axis=1) |
| for w in [7, 14]: |
| atr = tr.ewm(span=w, adjust=False).mean() |
| df[f"atr_{w}"] = atr |
| df[f"atr_pct_{w}"] = atr / (c + 1e-9) |
|
|
| |
| plus_dm = (h - h.shift(1)).clip(lower=0) |
| minus_dm = (l.shift(1) - l).clip(lower=0) |
| plus_dm = plus_dm.where(plus_dm > minus_dm, 0) |
| minus_dm = minus_dm.where(minus_dm > plus_dm, 0) |
| atr14 = tr.ewm(span=14, adjust=False).mean() + 1e-9 |
| plus_di = 100 * plus_dm.ewm(span=14, adjust=False).mean() / atr14 |
| minus_di = 100 * minus_dm.ewm(span=14, adjust=False).mean() / atr14 |
| dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-9) |
| df["adx_14"] = dx.ewm(span=14, adjust=False).mean() |
| df["plus_di_14"] = plus_di |
| df["minus_di_14"] = minus_di |
|
|
| |
| for k in [5, 14]: |
| lo_k = l.rolling(k).min() |
| hi_k = h.rolling(k).max() |
| stoch_k = 100 * (c - lo_k) / (hi_k - lo_k + 1e-9) |
| df[f"stoch_k_{k}"] = stoch_k |
| df[f"stoch_d_{k}"] = stoch_k.rolling(3).mean() |
|
|
| |
| for w in [10, 20]: |
| dc_hi = h.rolling(w).max() |
| dc_lo = l.rolling(w).min() |
| df[f"donchian_{w}"] = (c - dc_lo) / (dc_hi - dc_lo + 1e-9) |
|
|
| |
| rng = h - l + 1e-9 |
| df["candle_body"] = (c - o) / rng |
| df["upper_shadow"] = (h - c.clip(lower=o)) / rng |
| df["lower_shadow"] = (o.clip(upper=c) - l) / rng |
| df["gap"] = (o - c.shift(1)) / (c.shift(1) + 1e-9) |
|
|
| |
| for w in [5, 20]: |
| vmean = v.rolling(w).mean() |
| vstd = v.rolling(w).std() + 1e-9 |
| df[f"volume_z_{w}"] = (v - vmean) / vstd |
| obv = (np.sign(r1) * v).cumsum() |
| df["obv_z"] = (obv - obv.rolling(20).mean()) / (obv.rolling(20).std() + 1e-9) |
| df["vpt_5"] = (r1 * v).rolling(5).sum() |
|
|
| |
| for p in [3, 5, 10, 20]: |
| df[f"roc_{p}"] = c.pct_change(p) |
| df["momentum_score"] = ( |
| df["roc_3"].rank(pct=True) * 0.4 |
| + df["roc_5"].rank(pct=True) * 0.3 |
| + df["roc_10"].rank(pct=True) * 0.2 |
| + df["roc_20"].rank(pct=True) * 0.1 |
| ) |
|
|
|
|
| |
| |
| |
| try: |
| mkt = _load_niftybees() |
| if mkt is not None and "date" in df.columns: |
| df["date"] = pd.to_datetime(df["date"]) |
| df = df.merge(mkt, on="date", how="left") |
| |
| df["stock_vs_mkt_5d"] = df["return_5d"] - df["mkt_ret_5d"].fillna(0) |
| df["stock_vs_mkt_20d"] = df["return_20d"] - df["mkt_ret_5d"].fillna(0) * 4 |
| mkt_cols = ["mkt_ret_1d","mkt_ret_5d","mkt_vol_20d", |
| "mkt_above_ma20","stock_vs_mkt_5d","stock_vs_mkt_20d"] |
| df[mkt_cols] = df[mkt_cols].ffill().bfill().fillna(0) |
| except Exception as e: |
| log.debug(f"Market regime features skipped: {e}") |
|
|
| |
| fwd = c.shift(-HORIZON) / c - 1 |
| df["future_return"] = fwd |
| df[TARGET] = 1 |
| df.loc[fwd >= POS_TH, TARGET] = 2 |
| df.loc[fwd <= NEG_TH, TARGET] = 0 |
|
|
| |
| |
| feat_cols_now = [col for col in df.columns |
| if col not in {"date","open","high","low","close","volume", |
| "future_return", TARGET}] |
| df[feat_cols_now] = df[feat_cols_now].ffill().bfill() |
|
|
| return df |
|
|
|
|
| def feature_columns(df: pd.DataFrame) -> List[str]: |
| drop = {"date","open","high","low","close","volume", |
| "symbol","exchange","timestamp","future_return", TARGET} |
| return [c for c in df.columns if c not in drop] |
|
|
|
|
| |
| |
| |
|
|
| def make_base_models(use_gpu: bool) -> dict: |
| from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier |
| import xgboost as xgb |
|
|
| gpu_xgb = {"device": "cuda", "tree_method": "hist"} if use_gpu else {} |
|
|
| models = { |
| "xgboost": xgb.XGBClassifier( |
| n_estimators=300, max_depth=5, learning_rate=0.05, |
| subsample=0.8, colsample_bytree=0.8, |
| min_child_weight=3, gamma=0.1, reg_alpha=0.1, reg_lambda=1.0, |
| objective="multi:softprob", num_class=3, eval_metric="mlogloss", |
| random_state=SEED, verbosity=0, n_jobs=-1, **gpu_xgb, |
| ), |
| "random_forest": RandomForestClassifier( |
| n_estimators=200, max_depth=8, min_samples_leaf=5, |
| max_features="sqrt", class_weight="balanced", |
| random_state=SEED, n_jobs=-1, |
| ), |
| "extra_trees": ExtraTreesClassifier( |
| n_estimators=200, max_depth=10, min_samples_leaf=3, |
| class_weight="balanced", random_state=SEED, n_jobs=-1, |
| ), |
| } |
|
|
| try: |
| import lightgbm as lgb |
| models["lightgbm"] = lgb.LGBMClassifier( |
| n_estimators=300, num_leaves=63, learning_rate=0.05, |
| subsample=0.8, colsample_bytree=0.8, min_child_samples=10, |
| reg_alpha=0.1, reg_lambda=1.0, |
| objective="multiclass", num_class=3, |
| random_state=SEED, verbose=-1, n_jobs=-1, |
| ) |
| log.info(" LightGBM β") |
| except ImportError: |
| log.warning(" LightGBM not installed (pip install lightgbm --break-system-packages)") |
|
|
| try: |
| from catboost import CatBoostClassifier |
| cb_gpu = {"task_type": "GPU"} if use_gpu else {"task_type": "CPU"} |
| models["catboost"] = CatBoostClassifier( |
| iterations=300, depth=6, learning_rate=0.05, l2_leaf_reg=3, |
| loss_function="MultiClass", classes_count=3, |
| random_seed=SEED, verbose=False, **cb_gpu, |
| ) |
| log.info(" CatBoost β") |
| except ImportError: |
| log.warning(" CatBoost not installed (pip install catboost --break-system-packages)") |
|
|
| return models |
|
|
|
|
| |
| |
| |
|
|
| def _meta_net(in_dim: int): |
| import torch.nn as nn |
| return nn.Sequential( |
| nn.Linear(in_dim, 64), nn.LayerNorm(64), nn.GELU(), nn.Dropout(0.2), |
| nn.Linear(64, 32), nn.GELU(), nn.Dropout(0.1), |
| nn.Linear(32, 3), |
| ) |
|
|
|
|
| def train_meta_torch(net, X: np.ndarray, y: np.ndarray, device, epochs: int = 60): |
| import torch, torch.nn as nn |
| net = net.to(device) |
| opt = torch.optim.AdamW(net.parameters(), lr=1e-3, weight_decay=1e-4) |
| sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=epochs) |
| crit = nn.CrossEntropyLoss() |
| Xt = torch.FloatTensor(X).to(device) |
| yt = torch.LongTensor(y).to(device) |
| net.train() |
| for _ in range(epochs): |
| opt.zero_grad() |
| loss = crit(net(Xt), yt) |
| loss.backward() |
| torch.nn.utils.clip_grad_norm_(net.parameters(), 1.0) |
| opt.step(); sched.step() |
| net.eval() |
| return net |
|
|
|
|
| def predict_meta_torch(net, X: np.ndarray, device) -> np.ndarray: |
| import torch |
| with torch.no_grad(): |
| return torch.softmax(net(torch.FloatTensor(X).to(device)), dim=1).cpu().numpy() |
|
|
|
|
| |
| |
| |
| |
| |
|
|
| def build_global_dataset( |
| pairs: List[Tuple[str, Path]], |
| max_symbols: Optional[int] = None, |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, List[str], List[str]]: |
| """ |
| Load and engineer features for all symbols, stack into one big matrix. |
| Returns (X, y, symbol_ids, feature_cols, symbol_list) |
| Uses the LAST 20% of each symbol's rows as test β everything else is train/val. |
| Symbol IDs allow per-symbol evaluation after global training. |
| """ |
| all_X, all_y, all_syms, all_dates = [], [], [], [] |
| feature_cols_ref = None |
| skipped = 0 |
|
|
| |
| pairs = [(s,p) for s,p in pairs if not any( |
| x in s.upper() for x in [ |
| 'LIQUID','GILT','GSEC','TBILL','CASH','GOLD','SILVER','METAL', |
| 'BBETF','EBBETF','BEES','NIFTY','LIQUIDADD','LIQUIDETF', |
| 'CASHIETF','AXISBPS','LICNETF', |
| ] |
| ) and not s.upper().endswith(('ETF','BEES','LIQUID','ADD','IETF'))] |
|
|
| if max_symbols: |
| pairs = pairs[:max_symbols] |
|
|
| log.info(f" Loading {len(pairs)} equity symbols...") |
| for i, (sym, _) in enumerate(pairs): |
| if i % 200 == 0: |
| log.info(f" {i}/{len(pairs)} loaded, {sum(len(x) for x in all_X):,} rows so far") |
| try: |
| raw = load_candles(sym) |
| if raw is None: |
| skipped += 1 |
| continue |
| df = engineer_features(raw) |
| fcols = feature_columns(df) |
| df = df.dropna(subset=fcols + [TARGET]) |
| df = df[df[TARGET].isin([0, 1, 2])].reset_index(drop=True) |
| if len(df) < MIN_ROWS: |
| skipped += 1 |
| continue |
|
|
| |
| if feature_cols_ref is None: |
| feature_cols_ref = fcols |
| else: |
| fcols = feature_cols_ref |
|
|
| |
| available = [c for c in fcols if c in df.columns] |
| X_sym = np.zeros((len(df), len(fcols)), dtype=np.float32) |
| for j, fc in enumerate(fcols): |
| if fc in df.columns: |
| X_sym[:, j] = df[fc].values.astype(np.float32) |
|
|
| all_X.append(X_sym) |
| all_y.append(df[TARGET].values.astype(np.int32)) |
| all_syms.extend([sym] * len(df)) |
| if "date" in df.columns: |
| all_dates.extend(df["date"].tolist()) |
| else: |
| all_dates.extend([None] * len(df)) |
|
|
| except Exception as e: |
| log.debug(f" [{sym}] skipped: {e}") |
| skipped += 1 |
|
|
| log.info(f" Loaded: {len(pairs)-skipped} symbols, {skipped} skipped") |
|
|
| if not all_X: |
| raise ValueError("No data loaded β check CLEAN_DIR path") |
|
|
| X = np.vstack(all_X) |
| y = np.concatenate(all_y) |
| sym_ids = np.array(all_syms) |
|
|
| log.info(f" Combined dataset: {X.shape[0]:,} rows Γ {X.shape[1]} features") |
| cls, cnts = np.unique(y, return_counts=True) |
| for c, n in zip(cls, cnts): |
| log.info(f" class {c}: {n:,} ({n/len(y)*100:.1f}%)") |
|
|
| return X, y, sym_ids, feature_cols_ref or [], all_dates |
|
|
|
|
| def temporal_global_split( |
| sym_ids: np.ndarray, |
| all_dates: List, |
| test_ratio: float = 0.20, |
| ) -> Tuple[np.ndarray, np.ndarray]: |
| """ |
| For each symbol, put last test_ratio of its rows in test, rest in train. |
| This prevents future leakage while keeping temporal structure. |
| Returns (train_mask, test_mask). |
| """ |
| n = len(sym_ids) |
| train_mask = np.ones(n, dtype=bool) |
| test_mask = np.zeros(n, dtype=bool) |
|
|
| symbols = np.unique(sym_ids) |
| for sym in symbols: |
| idx = np.where(sym_ids == sym)[0] |
| cut = int(len(idx) * (1 - test_ratio)) |
| test_mask[idx[cut:]] = True |
| train_mask[idx[cut:]] = False |
|
|
| return train_mask, test_mask |
|
|
|
|
| def train_global_model(use_gpu: bool = True, max_symbols: Optional[int] = None) -> dict: |
| """ |
| Train one ensemble on all ~662K rows combined. |
| Saves models to data/lake/ml/p170/models/GLOBAL/ |
| Returns metrics dict. |
| """ |
| import joblib |
| from sklearn.preprocessing import RobustScaler |
| from sklearn.metrics import accuracy_score, classification_report |
|
|
| log.info(f"\n{'#'*54}") |
| log.info(f" GLOBAL MODEL TRAINING") |
| log.info(f" ~662K rows across 2150 NSE symbols") |
| log.info(f"{'#'*54}\n") |
| t0 = time.time() |
|
|
| |
| pairs = discover_symbols() |
| X, y, sym_ids, fcols, all_dates = build_global_dataset(pairs, max_symbols=max_symbols) |
|
|
| |
| train_mask, test_mask = temporal_global_split(sym_ids, all_dates, test_ratio=0.20) |
|
|
| |
| train_idx = np.where(train_mask)[0] |
| val_cut = int(len(train_idx) * 0.80) |
| val_idx = train_idx[val_cut:] |
| tr_idx = train_idx[:val_cut] |
| te_idx = np.where(test_mask)[0] |
|
|
| X_tr, y_tr = X[tr_idx], y[tr_idx] |
| X_val, y_val = X[val_idx], y[val_idx] |
| X_te, y_te = X[te_idx], y[te_idx] |
|
|
| log.info(f" Train: {len(X_tr):,} Val: {len(X_val):,} Test: {len(X_te):,}") |
|
|
| |
| scaler = RobustScaler() |
| X_tr_s = scaler.fit_transform(X_tr) |
| X_val_s = scaler.transform(X_val) |
| X_te_s = scaler.transform(X_te) |
|
|
| |
| base = make_base_models(use_gpu) |
| nm = len(base) |
| oof_val = np.zeros((len(X_val), nm * 3), dtype=np.float32) |
| oof_te = np.zeros((len(X_te), nm * 3), dtype=np.float32) |
| scores = {} |
|
|
| for idx, (name, model) in enumerate(base.items()): |
| log.info(f" [{name}] training on {len(X_tr):,} rows...") |
| try: |
| if name == "catboost": |
| model.fit(X_tr_s, y_tr, eval_set=(X_val_s, y_val), verbose=100) |
| elif name == "xgboost": |
| model.fit(X_tr_s, y_tr, eval_set=[(X_val_s, y_val)], |
| verbose=100) |
| elif name == "lightgbm": |
| model.fit(X_tr_s, y_tr, eval_set=[(X_val_s, y_val)]) |
| else: |
| model.fit(X_tr_s, y_tr) |
|
|
| vp = model.predict_proba(X_val_s) |
| tp = model.predict_proba(X_te_s) |
| oof_val[:, idx*3:(idx+1)*3] = vp |
| oof_te[:, idx*3:(idx+1)*3] = tp |
|
|
| acc = float(accuracy_score(y_val, np.argmax(vp, axis=1))) |
| scores[name] = round(acc, 4) |
| log.info(f" [{name}] val acc = {acc:.4f}") |
|
|
| except Exception as e: |
| log.error(f" [{name}] failed: {e}") |
| log.debug(traceback.format_exc()) |
| oof_val[:, idx*3:(idx+1)*3] = 1/3 |
| oof_te[:, idx*3:(idx+1)*3] = 1/3 |
|
|
| |
| |
| |
| log.info(f" [ensemble] averaging {len(base)} base models...") |
| nm_actual = oof_te.shape[1] // 3 |
| ens_proba = np.zeros((len(oof_te), 3), dtype=np.float32) |
| active = 0 |
| for mi in range(nm_actual): |
| block = oof_te[:, mi*3:(mi+1)*3] |
| if block.std() > 0.001: |
| ens_proba += block |
| active += 1 |
| ens_proba = ens_proba / max(active, 1) |
| meta_type = "simple_average" |
| meta_net_obj = None |
| log.info(f" averaged {active} active base models") |
| ens_pred = np.argmax(ens_proba, axis=1) |
| ens_acc = float(accuracy_score(y_te, ens_pred)) |
| log.info(f" [ensemble] global test acc = {ens_acc:.4f}") |
| log.info(f"\n{classification_report(y_te, ens_pred, target_names=['DOWN','NEUTRAL','UP'])}") |
|
|
| |
| test_syms = sym_ids[te_idx] |
| sym_accs = {} |
| for sym in np.unique(test_syms): |
| mask = test_syms == sym |
| if mask.sum() >= 5: |
| sym_accs[sym] = round(float(accuracy_score(y_te[mask], ens_pred[mask])), 4) |
|
|
| |
| top_features = [] |
| if "xgboost" in base: |
| try: |
| imp = base["xgboost"].feature_importances_ |
| top_idx = np.argsort(imp)[-15:][::-1] |
| top_features = [[fcols[i], round(float(imp[i]), 4)] for i in top_idx] |
| except Exception: |
| pass |
|
|
| |
| gdir = MODELS_DIR / "GLOBAL" |
| gdir.mkdir(exist_ok=True) |
| joblib.dump(scaler, gdir / "scaler.joblib") |
| joblib.dump(fcols, gdir / "feature_cols.joblib") |
| for name, model in base.items(): |
| try: |
| joblib.dump(model, gdir / f"{name}.joblib") |
| log.info(f" saved {name}.joblib") |
| except Exception as e: |
| log.warning(f" save {name} failed: {e}") |
| if meta_net_obj is not None: |
| try: |
| import torch |
| torch.save(meta_net_obj.state_dict(), gdir / "meta_net.pt") |
| with open(gdir / "meta_config.json", "w") as f: |
| json.dump({"type": "neural_net", "input_dim": oof_val.shape[1]}, f) |
| except Exception: |
| pass |
|
|
| |
| pd.DataFrame({ |
| "symbol": test_syms, |
| "pred_class": ens_pred, |
| "prob_down": ens_proba[:, 0], |
| "prob_neutral": ens_proba[:, 1], |
| "prob_up": ens_proba[:, 2], |
| "actual_class": y_te, |
| }).to_parquet(PREDS_DIR / "GLOBAL_p170_predictions.parquet", index=False) |
|
|
| |
| elapsed = round(time.time() - t0, 1) |
| cls, cnts = np.unique(y_tr, return_counts=True) |
|
|
| |
| sorted_syms = sorted(sym_accs.items(), key=lambda x: x[1], reverse=True) |
| top10 = sorted_syms[:10] |
| bot10 = sorted_syms[-10:] |
|
|
| metrics = { |
| "model": "GLOBAL", |
| "status": "success", |
| "timestamp": datetime.now().isoformat(), |
| "n_symbols": int(len(np.unique(sym_ids))), |
| "n_rows_total": int(len(X)), |
| "n_train": int(len(X_tr)), |
| "n_val": int(len(X_val)), |
| "n_test": int(len(X_te)), |
| "n_features": int(len(fcols)), |
| "base_models": list(base.keys()), |
| "meta_learner_type": meta_type, |
| "base_val_accuracy": scores, |
| "ensemble_test_accuracy": round(ens_acc, 4), |
| "top_features": top_features[:10], |
| "top10_symbols": top10, |
| "bottom10_symbols": bot10, |
| "elapsed_seconds": elapsed, |
| "class_distribution_train": {str(int(k)): int(v) for k, v in zip(cls, cnts)}, |
| } |
| with open(METRICS_DIR / "GLOBAL_p170_metrics.json", "w") as f: |
| json.dump(metrics, f, indent=2) |
|
|
| log.info(f"\n{'='*54}") |
| log.info(f" GLOBAL MODEL DONE {elapsed:.0f}s") |
| log.info(f" Test accuracy: {ens_acc:.4f} ({len(X_te):,} rows across {len(sym_accs)} symbols)") |
| log.info(f" Top 5 symbols: {top10[:5]}") |
| log.info(f"{'='*54}\n") |
| return metrics |
|
|
|
|
| |
| |
| |
|
|
| def predict_today(symbol: str, use_gpu: bool = True) -> Optional[dict]: |
| """Load global model and predict on the latest row for a symbol.""" |
| import joblib |
| gdir = MODELS_DIR / "GLOBAL" |
| if not (gdir / "scaler.joblib").exists(): |
| log.error("No global model found. Run: python p170_max_system.py --global-model") |
| return None |
|
|
| raw = load_candles(symbol) |
| if raw is None: |
| return None |
|
|
| df = engineer_features(raw) |
| fcols = joblib.load(gdir / "feature_cols.joblib") |
| scaler = joblib.load(gdir / "scaler.joblib") |
|
|
| |
| last = df.iloc[-1] |
| X = np.zeros((1, len(fcols)), dtype=np.float32) |
| for i, fc in enumerate(fcols): |
| if fc in last.index and not pd.isna(last[fc]): |
| X[0, i] = float(last[fc]) |
| X_s = scaler.transform(X) |
|
|
| |
| base_probas = [] |
| for name in ["xgboost", "lightgbm", "catboost", "random_forest", "extra_trees"]: |
| mp = gdir / f"{name}.joblib" |
| if mp.exists(): |
| try: |
| base_probas.append(joblib.load(mp).predict_proba(X_s)[0]) |
| except Exception: |
| pass |
|
|
| if not base_probas: |
| return None |
|
|
| meta_input = np.concatenate(base_probas).reshape(1, -1).astype(np.float32) |
| cfg_path = gdir / "meta_config.json" |
| try: |
| import torch |
| if cfg_path.exists(): |
| with open(cfg_path) as f: |
| cfg = json.load(f) |
| net = _meta_net(cfg["input_dim"]) |
| net.load_state_dict(torch.load(gdir / "meta_net.pt", map_location="cpu")) |
| net.eval() |
| device = torch.device("cuda" if use_gpu and torch.cuda.is_available() else "cpu") |
| proba = predict_meta_torch(net, meta_input, device)[0] |
| else: |
| proba = np.mean(base_probas, axis=0) |
| except Exception: |
| proba = np.mean(base_probas, axis=0) |
|
|
| pred = int(np.argmax(proba)) |
| label = {0: "DOWN", 1: "NEUTRAL", 2: "UP"}[pred] |
| return { |
| "symbol": symbol, "date": str(last.get("date", "latest")), |
| "prediction": label, |
| "prob_down": round(float(proba[0]), 4), |
| "prob_neutral": round(float(proba[1]), 4), |
| "prob_up": round(float(proba[2]), 4), |
| "confidence": round(float(proba.max()), 4), |
| "rsi_14": round(float(last.get("rsi_14", np.nan)), 2), |
| "macd_hist": round(float(last.get("macd_hist", np.nan)), 4), |
| "adx_14": round(float(last.get("adx_14", np.nan)), 2), |
| "ema_align": int(last.get("ema_align", -1)), |
| } |
|
|
|
|
| |
| |
| |
|
|
| def run_batch(pairs: List[Tuple[str, Path]], use_gpu: bool = True) -> dict: |
| """Thin wrapper β just calls train_global_model with all pairs.""" |
| return train_global_model(use_gpu=use_gpu, max_symbols=len(pairs)) |
|
|
|
|
| |
| |
| |
|
|
| def check_deps(use_gpu: bool): |
| print("\n=== P170 Dependency Check ===\n") |
| for name, mod in [("numpy","numpy"),("pandas","pandas"), |
| ("scikit-learn","sklearn"),("xgboost","xgboost"),("joblib","joblib")]: |
| try: __import__(mod); print(f" β {name}") |
| except: print(f" β {name} (pip install {name} --break-system-packages)") |
| for name, mod in [("lightgbm","lightgbm"),("catboost","catboost"),("optuna","optuna")]: |
| try: __import__(mod); print(f" β {name} (optional)") |
| except: print(f" ~ {name} (optional β pip install {name} --break-system-packages)") |
| try: |
| import torch |
| cuda = torch.cuda.is_available() |
| dev = torch.cuda.get_device_name(0) if cuda else "CPU only" |
| print(f" β pytorch ({dev})") |
| if cuda: |
| print(f" GPU memory: {torch.cuda.get_device_properties(0).total_memory/1e9:.1f} GB") |
| except ImportError: |
| print(" ~ pytorch (meta-learner falls back to LogisticRegression without it)") |
|
|
| pairs = discover_symbols() |
| print(f"\nPaths:") |
| print(f" Clean data : {CLEAN_DIR} exists={CLEAN_DIR.exists()}") |
| print(f" Features : {FEAT_DIR} exists={FEAT_DIR.exists()}") |
| print(f" Output : {OUT}") |
| print(f"\nSymbols found: {len(pairs)}") |
| if pairs: |
| print(" Largest 10 (most data first):") |
| for sym, path in pairs[:10]: |
| print(f" {sym:20s} {path.stat().st_size//1024:6d} KB {path.name}") |
| print() |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| p = argparse.ArgumentParser(description="P170 MAX SYSTEM") |
| g = p.add_mutually_exclusive_group(required=True) |
| g.add_argument("--global-model", action="store_true", help="Train global model on all 2150 symbols combined (recommended)") |
| g.add_argument("--all-symbols", action="store_true", help="Alias for --global-model") |
| g.add_argument("--predict", type=str, help="Predict today for a symbol using global model") |
| g.add_argument("--check-deps", action="store_true", help="Check dependencies and paths") |
| p.add_argument("--top-n", type=int, default=None, help="Limit to top N symbols by data size (useful for quick tests)") |
| p.add_argument("--no-gpu", action="store_true", help="Force CPU") |
| args = p.parse_args() |
| gpu = not args.no_gpu |
|
|
| if args.check_deps: |
| check_deps(gpu) |
| elif args.global_model or args.all_symbols: |
| m = train_global_model(use_gpu=gpu, max_symbols=args.top_n) |
| print(json.dumps(m, indent=2, default=str)) |
| elif args.predict: |
| r = predict_today(args.predict.upper(), use_gpu=gpu) |
| print(json.dumps(r, indent=2) if r else f"No global model found. Run --global-model first.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|