""" P170 MAXIMUM SYSTEM — dhan_financial_intelligence ================================================== Confirmed data layout: data/lake/clean/market_data/NSE/SYMBOL_real_market_data_v2.parquet columns: [timestamp, open, high, low, close, volume] ← 2155 symbols data/lake/ml/features/NSE/SYMBOL_features_ml_dataset_v1.parquet → only 5 symbols (HDFCBANK, ICICIBANK, INFY, RELIANCE, TCS) — fallback only All outputs go to data/lake/ml/p170/ — nothing existing is touched. Usage: cd /home/user1/dhan_financial_intelligence source .venv/bin/activate pip install lightgbm catboost --break-system-packages # recommended python p170_max_system.py --check-deps python p170_max_system.py --symbol RELIANCE python p170_max_system.py --all-symbols --top-n 50 python p170_max_system.py --all-symbols python p170_max_system.py --predict RELIANCE """ import sys import json import time import logging import warnings import argparse import traceback from pathlib import Path from datetime import datetime from typing import Optional, List, Tuple import numpy as np import pandas as pd warnings.filterwarnings("ignore") # ── Logging ─────────────────────────────────────────────────────────────────── Path("logs/ml").mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(f"logs/ml/p170_{datetime.now():%Y%m%d_%H%M%S}.log"), ], ) log = logging.getLogger("p170") # ───────────────────────────────────────────────────────────────────────────── # PATHS # ───────────────────────────────────────────────────────────────────────────── BASE = Path(".") CLEAN_DIR = BASE / "data/lake/clean/market_data/NSE" # 2155 *_real_market_data_v2.parquet FEAT_DIR = BASE / "data/lake/ml/features/NSE" # 5 *_features_ml_dataset_v1.parquet OUT = BASE / "data/lake/ml/p170" MODELS_DIR = OUT / "models" METRICS_DIR = OUT / "metrics" PREDS_DIR = OUT / "predictions" REGISTRY_DIR = OUT / "registry" for d in [MODELS_DIR, METRICS_DIR, PREDS_DIR, REGISTRY_DIR]: d.mkdir(parents=True, exist_ok=True) # ───────────────────────────────────────────────────────────────────────────── # LABEL CONFIG (matches existing policy exactly) # ───────────────────────────────────────────────────────────────────────────── TARGET = "direction_class" # 0=down 1=neutral 2=up HORIZON = 3 # 3-day forward return POS_TH = 0.015 # ≥+1.5% → up (was 0.5% → gave 9% neutral, unusable) NEG_TH = -0.015 # ≤-1.5% → down (±1.5% gives ~33/33/33 split) TRAIN_R = 0.70 VAL_R = 0.15 TEST_R = 0.15 WF_FOLDS = 5 MIN_ROWS = 80 SEED = 42 np.random.seed(SEED) # ───────────────────────────────────────────────────────────────────────────── # SYMBOL DISCOVERY # ───────────────────────────────────────────────────────────────────────────── def discover_symbols() -> List[Tuple[str, Path]]: """ Returns [(symbol, parquet_path), ...] sorted largest-file-first (most data first). Strips _real_market_data_v2 to get clean tickers. """ seen: dict = {} # Primary: 2155 clean market data files for f in CLEAN_DIR.glob("*_real_market_data_v2.parquet"): sym = f.stem.replace("_real_market_data_v2", "").upper() if sym: seen[sym] = f # Fallback: 5 legacy feature files (only adds symbols not already in clean data) for f in FEAT_DIR.glob("*_features_ml_dataset_v1.parquet"): sym = f.stem.replace("_features_ml_dataset_v1", "").upper() if sym and sym not in seen: seen[sym] = f pairs = list(seen.items()) pairs.sort(key=lambda x: x[1].stat().st_size, reverse=True) return pairs # ───────────────────────────────────────────────────────────────────────────── # DATA LOADING # ───────────────────────────────────────────────────────────────────────────── def load_candles(symbol: str) -> Optional[pd.DataFrame]: """ Load OHLCV for symbol. Returns DataFrame with column 'date' (datetime). Clean market data files use 'timestamp' — renamed here. Returns None if not found or too small. """ candidates = [ CLEAN_DIR / f"{symbol}_real_market_data_v2.parquet", # primary (2155 symbols) CLEAN_DIR / f"{symbol}.parquet", # alternative naming FEAT_DIR / f"{symbol}_features_ml_dataset_v1.parquet", # legacy fallback ] for path in candidates: if not path.exists(): continue try: df = pd.read_parquet(path) # Normalise date column if "date" not in df.columns: if "timestamp" in df.columns: df = df.rename(columns={"timestamp": "date"}) elif df.index.name: df = df.reset_index().rename(columns={df.index.name: "date"}) else: continue df["date"] = pd.to_datetime(df["date"]) # Need full OHLCV — feature-only files don't have open/high/low if not all(c in df.columns for c in ["open", "high", "low", "close", "volume"]): continue df = (df[["date", "open", "high", "low", "close", "volume"]] .dropna(subset=["close", "volume"]) .sort_values("date") .reset_index(drop=True)) if len(df) >= MIN_ROWS: return df except Exception as e: log.debug(f"Could not load {path}: {e}") continue return None # ───────────────────────────────────────────────────────────────────────────── # LAYER 0 — FEATURE ENGINEERING (87 features) # ───────────────────────────────────────────────────────────────────────────── # ── NIFTYBEES market index cache ────────────────────────────────────────────── _NIFTYBEES_CACHE: dict = {} def _load_niftybees() -> "pd.DataFrame | None": """ Load NIFTYBEES (Nifty50 proxy) and compute market-level features. Cached after first load. Returns DataFrame with columns: [date, mkt_ret_1d, mkt_ret_5d, mkt_vol_20d, mkt_above_ma20] """ global _NIFTYBEES_CACHE if "df" in _NIFTYBEES_CACHE: return _NIFTYBEES_CACHE["df"] path = CLEAN_DIR / "NIFTYBEES_real_market_data_v2.parquet" if not path.exists(): _NIFTYBEES_CACHE["df"] = None return None try: m = pd.read_parquet(path) if "timestamp" in m.columns: m = m.rename(columns={"timestamp": "date"}) m["date"] = pd.to_datetime(m["date"]) m = m.sort_values("date").reset_index(drop=True) mc = m["close"].astype(float) m["mkt_ret_1d"] = mc.pct_change(1) m["mkt_ret_5d"] = mc.pct_change(5) m["mkt_vol_20d"] = m["mkt_ret_1d"].rolling(20).std() m["mkt_ma20"] = mc.rolling(20).mean() m["mkt_above_ma20"] = (mc > m["mkt_ma20"]).astype(int) # Keep only the columns we need for merging mkt_out = m[["date","mkt_ret_1d","mkt_ret_5d", "mkt_vol_20d","mkt_above_ma20"]].copy() # ffill/bfill warmup NaNs for col in ["mkt_ret_1d","mkt_ret_5d","mkt_vol_20d","mkt_above_ma20"]: mkt_out[col] = mkt_out[col].ffill().bfill().fillna(0) _NIFTYBEES_CACHE["df"] = mkt_out return mkt_out except Exception as e: log.debug(f"NIFTYBEES load failed: {e}") _NIFTYBEES_CACHE["df"] = None return None # ── Cross-asset data cache (yfinance) ───────────────────────────────────────── _CROSS_ASSET_CACHE: dict = {} def _load_cross_asset(ticker: str, key: str) -> "pd.DataFrame | None": """Load cross-asset daily returns. Reads from parquet cache first, falls back to yfinance.""" global _CROSS_ASSET_CACHE if key in _CROSS_ASSET_CACHE: return _CROSS_ASSET_CACHE[key] # Parquet name mapping parquet_map = {"nifty": "NIFTY50", "usdinr": "USDINR", "crude": "CRUDE"} pname = parquet_map.get(key) cross_path = BASE / f"data/lake/clean/market_data/CROSS/{pname}.parquet" if pname else None try: if cross_path and cross_path.exists(): raw = pd.read_parquet(cross_path) raw["date"] = pd.to_datetime(raw["date"]).dt.tz_localize(None) result = raw[["date", "ret_1d", "ret_5d"]].copy() _CROSS_ASSET_CACHE[key] = result log.debug(f"Cross-asset {key}: loaded {len(result)} rows from parquet") return result # Fallback: yfinance import yfinance as yf raw = yf.download(ticker, period="max", progress=False, auto_adjust=True) if raw is None or raw.empty: _CROSS_ASSET_CACHE[key] = None return None if isinstance(raw.columns, pd.MultiIndex): raw.columns = [c[0].lower() for c in raw.columns] else: raw.columns = [c.lower() for c in raw.columns] raw = raw.reset_index() raw = raw.rename(columns={raw.columns[0]: "date"}) raw["date"] = pd.to_datetime(raw["date"]).dt.tz_localize(None) raw["ret_1d"] = raw["close"].pct_change(1) raw["ret_5d"] = raw["close"].pct_change(5) result = raw[["date", "ret_1d", "ret_5d"]].copy() _CROSS_ASSET_CACHE[key] = result return result except Exception as e: log.debug(f"Cross-asset {ticker} failed: {e}") _CROSS_ASSET_CACHE[key] = None return None def _hurst_rs(prices: np.ndarray) -> float: """Hurst exponent via R/S analysis. >0.5=trending, <0.5=mean-reverting.""" try: n = len(prices) if n < 20: return 0.5 mean_adj = prices - np.mean(prices) cs = np.cumsum(mean_adj) r = np.max(cs) - np.min(cs) s = np.std(prices, ddof=1) if s == 0: return 0.5 return float(np.log(r / s) / np.log(n)) except Exception: return 0.5 def engineer_features(raw: pd.DataFrame) -> pd.DataFrame: """ Input : OHLCV DataFrame with [date, open, high, low, close, volume] Output: same rows with 87 feature columns + future_return + direction_class """ df = raw.copy() c = df["close"].astype(float) h = df["high"].astype(float) l = df["low"].astype(float) o = df["open"].astype(float) v = df["volume"].astype(float).replace(0, np.nan) # ── Returns ─────────────────────────────────────────────────────────────── # ── Keltner Channels ───────────────────────────────────────────────────── keltner_mid = c.ewm(span=20, adjust=False).mean() _tr_k = pd.concat([h-l, (h-c.shift()).abs(), (l-c.shift()).abs()], axis=1).max(axis=1) atr14_k = _tr_k.ewm(span=14, adjust=False).mean() df["keltner_width"] = (keltner_mid + 2*atr14_k - (keltner_mid - 2*atr14_k)) / (keltner_mid + 1e-9) # ── Historical volatility (annualised) ──────────────────────────────────── log_ret = np.log(c / (c.shift(1) + 1e-9)) for w in [10, 20, 60]: df[f"hist_vol_{w}"] = log_ret.rolling(w).std() * np.sqrt(252) # ── Money Flow Index ───────────────────────────────────────────────────── typical = (h + l + c) / 3 raw_mf = typical * v pos_mf = raw_mf.where(typical > typical.shift(1), 0).rolling(14).sum() neg_mf = raw_mf.where(typical < typical.shift(1), 0).rolling(14).sum() df["mfi_14"] = 100 - (100 / (1 + pos_mf / (neg_mf + 1e-9))) # ── Chaikin Money Flow ──────────────────────────────────────────────────── clv = ((c - l) - (h - c)) / (h - l + 1e-9) df["cmf_20"] = (clv * v).rolling(20).sum() / (v.rolling(20).sum() + 1e-9) # ── VWAP distance ───────────────────────────────────────────────────────── typical_p = (h + l + c) / 3 vwap = (typical_p * v).rolling(20).sum() / (v.rolling(20).sum() + 1e-9) df["vwap_dist"] = (c - vwap) / (vwap + 1e-9) # ── Amihud illiquidity ──────────────────────────────────────────────────── df["amihud_illiquidity"] = (c.pct_change().abs() / (v * c + 1e-9)).rolling(20).mean() # ── Volume ratios ───────────────────────────────────────────────────────── df["vol_ratio_5"] = v / (v.rolling(5).mean() + 1e-9) df["vol_ratio_20"] = v / (v.rolling(20).mean() + 1e-9) # ── OBV divergence ──────────────────────────────────────────────────────── obv_raw = (np.sign(c.diff()) * v).cumsum() obv_ema = obv_raw.ewm(span=20, adjust=False).mean() df["obv_divergence"] = obv_raw - obv_ema # ── Williams %R ─────────────────────────────────────────────────────────── df["williams_r"] = -100 * (h.rolling(14).max() - c) / (h.rolling(14).max() - l.rolling(14).min() + 1e-9) # ── CCI ─────────────────────────────────────────────────────────────────── df["cci_20"] = (c - c.rolling(20).mean()) / (0.015 * c.rolling(20).std() + 1e-9) # ── MACD cross up (computed after MACD section) ───────────────────────── if "macd" in df.columns and "macd_sig" in df.columns: df["macd_cross_up"] = ( (df["macd"] > df["macd_sig"]) & (df["macd"].shift(1) <= df["macd_sig"].shift(1)) ).astype(int) else: df["macd_cross_up"] = 0 # ── Stoch cross up ──────────────────────────────────────────────────────── if "stoch_k_14" in df.columns and "stoch_d_14" in df.columns: df["stoch_cross_up"] = ( (df["stoch_k_14"] > df["stoch_d_14"]) & (df["stoch_k_14"].shift(1) <= df["stoch_d_14"].shift(1)) ).astype(int) else: df["stoch_cross_up"] = 0 # ── Candlestick patterns ───────────────────────────────────────────────── body = (c - o).abs() full_range = (h - l).replace(0, np.nan) upper_sh = (h - pd.concat([c, o], axis=1).max(axis=1)) / full_range lower_sh = (pd.concat([c, o], axis=1).min(axis=1) - l) / full_range df["gap_up"] = ((o - c.shift(1)) / (c.shift(1) + 1e-9)).clip(lower=0) df["gap_down"] = ((c.shift(1) - o) / (c.shift(1) + 1e-9)).clip(lower=0) df["inside_bar"] = ((h < h.shift(1)) & (l > l.shift(1))).astype(int) df["doji"] = (body / (full_range + 1e-9) < 0.1).astype(int) df["hammer"] = ((lower_sh > 2 * body / (full_range + 1e-9)) & (upper_sh < 0.3)).astype(int) df["shooting_star"] = ((upper_sh > 2 * body / (full_range + 1e-9)) & (lower_sh < 0.3)).astype(int) df["engulfing_bull"] = ( (c > o) & (o < c.shift(1)) & (c > o.shift(1)) & (c.shift(1) < o.shift(1)) ).astype(int) df["engulfing_bear"] = ( (c < o) & (o > c.shift(1)) & (c < o.shift(1)) & (c.shift(1) > o.shift(1)) ).astype(int) # ── Drawdown from recent high ───────────────────────────────────────────── df["dd_from_high_20"] = (c - c.rolling(20).max()) / (c.rolling(20).max() + 1e-9) df["dd_from_high_60"] = (c - c.rolling(60).max()) / (c.rolling(60).max() + 1e-9) # ── Rolling Sharpe ──────────────────────────────────────────────────────── log_ret_s = np.log(c / (c.shift(1) + 1e-9)) df["rolling_sharpe_20"] = (log_ret_s.rolling(20).mean() / (log_ret_s.rolling(20).std() + 1e-9)) * np.sqrt(252) df["rolling_sharpe_60"] = (log_ret_s.rolling(60).mean() / (log_ret_s.rolling(60).std() + 1e-9)) * np.sqrt(252) # ── Autocorrelation (trend persistence proxy, faster than Hurst) ────────── df["hurst_50"] = c.pct_change().rolling(20).apply(lambda x: x.autocorr(lag=1) if len(x)>5 else 0, raw=False) # ── Cross-asset macro features ──────────────────────────────────────────── try: nifty_df = _load_cross_asset("^NSEI", "nifty") usdinr_df = _load_cross_asset("USDINR=X", "usdinr") crude_df = _load_cross_asset("CL=F", "crude") if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) for cross, prefix in [(nifty_df,"nifty"), (usdinr_df,"usdinr"), (crude_df,"crude")]: if cross is not None: merged = df[["date"]].merge(cross, on="date", how="left") if prefix == "nifty": df["nifty_ret_1d"] = merged["ret_1d"].values df["nifty_ret_5d"] = merged["ret_5d"].values # Beta: cov(stock, nifty) / var(nifty) sr = c.pct_change() nr = pd.Series(merged["ret_1d"].values, index=df.index) df["beta_20d"] = sr.rolling(20).cov(nr) / (nr.rolling(20).var() + 1e-9) elif prefix == "usdinr": df["usd_inr_ret"] = merged["ret_1d"].values elif prefix == "crude": df["crude_ret"] = merged["ret_1d"].values except Exception as e: log.debug(f"Cross-asset features skipped: {e}") for col in ["nifty_ret_1d","nifty_ret_5d","beta_20d","usd_inr_ret","crude_ret"]: if col not in df.columns: df[col] = 0.0 for p in [1, 3, 5, 10, 20]: df[f"return_{p}d"] = c.pct_change(p) # ── Volatility ──────────────────────────────────────────────────────────── r1 = df["return_1d"] for w in [5, 10, 20]: df[f"volatility_{w}d"] = r1.rolling(w).std() df["vol_ratio"] = df["volatility_5d"] / (df["volatility_20d"] + 1e-9) df["skew_20"] = r1.rolling(20).skew() df["kurt_20"] = r1.rolling(20).kurt() # ── Moving averages ─────────────────────────────────────────────────────── for w in [5, 10, 20, 50]: df[f"ma_{w}"] = c.rolling(w).mean() df[f"close_to_ma_{w}"] = (c - df[f"ma_{w}"]) / (df[f"ma_{w}"] + 1e-9) # ── EMA alignment ───────────────────────────────────────────────────────── ema5 = c.ewm(span=5, adjust=False).mean() ema20 = c.ewm(span=20, adjust=False).mean() ema50 = c.ewm(span=50, adjust=False).mean() df["ema_5_20_ratio"] = ema5 / (ema20 + 1e-9) df["ema_20_50_ratio"] = ema20 / (ema50 + 1e-9) df["ema_align"] = (ema5 > ema20).astype(int) + (ema20 > ema50).astype(int) # ── RSI (7, 14, 21) ─────────────────────────────────────────────────────── delta = c.diff() gain = delta.clip(lower=0) loss = (-delta).clip(lower=0) for period in [7, 14, 21]: ag = gain.ewm(com=period - 1, min_periods=period).mean() al = loss.ewm(com=period - 1, min_periods=period).mean() df[f"rsi_{period}"] = 100 - 100 / (1 + ag / (al + 1e-9)) # ── MACD ────────────────────────────────────────────────────────────────── ema12 = c.ewm(span=12, adjust=False).mean() ema26 = c.ewm(span=26, adjust=False).mean() df["macd"] = ema12 - ema26 df["macd_sig"] = df["macd"].ewm(span=9, adjust=False).mean() df["macd_hist"] = df["macd"] - df["macd_sig"] df["macd_cross"] = (df["macd"] > df["macd_sig"]).astype(int) # ── Bollinger Bands ─────────────────────────────────────────────────────── for w in [10, 20]: mid = c.rolling(w).mean() std = c.rolling(w).std() bb_u = mid + 2 * std bb_l = mid - 2 * std df[f"bb_width_{w}"] = (bb_u - bb_l) / (mid + 1e-9) df[f"bb_pct_{w}"] = (c - bb_l) / (bb_u - bb_l + 1e-9) # ── ATR ─────────────────────────────────────────────────────────────────── tr = pd.concat( [h - l, (h - c.shift()).abs(), (l - c.shift()).abs()], axis=1 ).max(axis=1) for w in [7, 14]: atr = tr.ewm(span=w, adjust=False).mean() df[f"atr_{w}"] = atr df[f"atr_pct_{w}"] = atr / (c + 1e-9) # ── ADX / DI ────────────────────────────────────────────────────────────── plus_dm = (h - h.shift(1)).clip(lower=0) minus_dm = (l.shift(1) - l).clip(lower=0) plus_dm = plus_dm.where(plus_dm > minus_dm, 0) minus_dm = minus_dm.where(minus_dm > plus_dm, 0) atr14 = tr.ewm(span=14, adjust=False).mean() + 1e-9 plus_di = 100 * plus_dm.ewm(span=14, adjust=False).mean() / atr14 minus_di = 100 * minus_dm.ewm(span=14, adjust=False).mean() / atr14 dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-9) df["adx_14"] = dx.ewm(span=14, adjust=False).mean() df["plus_di_14"] = plus_di df["minus_di_14"] = minus_di # ── Stochastics ─────────────────────────────────────────────────────────── for k in [5, 14]: lo_k = l.rolling(k).min() hi_k = h.rolling(k).max() stoch_k = 100 * (c - lo_k) / (hi_k - lo_k + 1e-9) df[f"stoch_k_{k}"] = stoch_k df[f"stoch_d_{k}"] = stoch_k.rolling(3).mean() # ── Donchian channel position ────────────────────────────────────────────── for w in [10, 20]: dc_hi = h.rolling(w).max() dc_lo = l.rolling(w).min() df[f"donchian_{w}"] = (c - dc_lo) / (dc_hi - dc_lo + 1e-9) # ── Candle structure ────────────────────────────────────────────────────── rng = h - l + 1e-9 df["candle_body"] = (c - o) / rng df["upper_shadow"] = (h - c.clip(lower=o)) / rng df["lower_shadow"] = (o.clip(upper=c) - l) / rng df["gap"] = (o - c.shift(1)) / (c.shift(1) + 1e-9) # ── Volume features ─────────────────────────────────────────────────────── for w in [5, 20]: vmean = v.rolling(w).mean() vstd = v.rolling(w).std() + 1e-9 df[f"volume_z_{w}"] = (v - vmean) / vstd obv = (np.sign(r1) * v).cumsum() df["obv_z"] = (obv - obv.rolling(20).mean()) / (obv.rolling(20).std() + 1e-9) df["vpt_5"] = (r1 * v).rolling(5).sum() # ── Momentum / ROC ──────────────────────────────────────────────────────── for p in [3, 5, 10, 20]: df[f"roc_{p}"] = c.pct_change(p) df["momentum_score"] = ( df["roc_3"].rank(pct=True) * 0.4 + df["roc_5"].rank(pct=True) * 0.3 + df["roc_10"].rank(pct=True) * 0.2 + df["roc_20"].rank(pct=True) * 0.1 ) # ── Market regime features (NIFTYBEES as Nifty50 proxy) ────────────────── # Cross-sectional context: is this stock outperforming the market? # Loaded once, merged by date, gracefully skipped if unavailable. try: mkt = _load_niftybees() if mkt is not None and "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) df = df.merge(mkt, on="date", how="left") # stock vs market relative strength df["stock_vs_mkt_5d"] = df["return_5d"] - df["mkt_ret_5d"].fillna(0) df["stock_vs_mkt_20d"] = df["return_20d"] - df["mkt_ret_5d"].fillna(0) * 4 mkt_cols = ["mkt_ret_1d","mkt_ret_5d","mkt_vol_20d", "mkt_above_ma20","stock_vs_mkt_5d","stock_vs_mkt_20d"] df[mkt_cols] = df[mkt_cols].ffill().bfill().fillna(0) except Exception as e: log.debug(f"Market regime features skipped: {e}") # ── Target label (matches existing policy exactly) ─────────────────────── fwd = c.shift(-HORIZON) / c - 1 df["future_return"] = fwd df[TARGET] = 1 # neutral default df.loc[fwd >= POS_TH, TARGET] = 2 # up df.loc[fwd <= NEG_TH, TARGET] = 0 # down # Forward-fill warmup NaNs (first ~50 rows) so dropna doesn't slash feature count. # Only fills leading NaNs — does NOT touch mid-series NaNs which are real gaps. feat_cols_now = [col for col in df.columns if col not in {"date","open","high","low","close","volume", "future_return", TARGET}] df[feat_cols_now] = df[feat_cols_now].ffill().bfill() return df def feature_columns(df: pd.DataFrame) -> List[str]: drop = {"date","open","high","low","close","volume", "symbol","exchange","timestamp","future_return", TARGET} return [c for c in df.columns if c not in drop] # ───────────────────────────────────────────────────────────────────────────── # LAYER 1 — BASE MODELS # ───────────────────────────────────────────────────────────────────────────── def make_base_models(use_gpu: bool) -> dict: from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier import xgboost as xgb gpu_xgb = {"device": "cuda", "tree_method": "hist"} if use_gpu else {} models = { "xgboost": xgb.XGBClassifier( n_estimators=300, max_depth=5, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, min_child_weight=3, gamma=0.1, reg_alpha=0.1, reg_lambda=1.0, objective="multi:softprob", num_class=3, eval_metric="mlogloss", random_state=SEED, verbosity=0, n_jobs=-1, **gpu_xgb, ), "random_forest": RandomForestClassifier( n_estimators=200, max_depth=8, min_samples_leaf=5, max_features="sqrt", class_weight="balanced", random_state=SEED, n_jobs=-1, ), "extra_trees": ExtraTreesClassifier( n_estimators=200, max_depth=10, min_samples_leaf=3, class_weight="balanced", random_state=SEED, n_jobs=-1, ), } try: import lightgbm as lgb models["lightgbm"] = lgb.LGBMClassifier( n_estimators=300, num_leaves=63, learning_rate=0.05, subsample=0.8, colsample_bytree=0.8, min_child_samples=10, reg_alpha=0.1, reg_lambda=1.0, objective="multiclass", num_class=3, random_state=SEED, verbose=-1, n_jobs=-1, ) log.info(" LightGBM ✓") except ImportError: log.warning(" LightGBM not installed (pip install lightgbm --break-system-packages)") try: from catboost import CatBoostClassifier cb_gpu = {"task_type": "GPU"} if use_gpu else {"task_type": "CPU"} models["catboost"] = CatBoostClassifier( iterations=300, depth=6, learning_rate=0.05, l2_leaf_reg=3, loss_function="MultiClass", classes_count=3, random_seed=SEED, verbose=False, **cb_gpu, ) log.info(" CatBoost ✓") except ImportError: log.warning(" CatBoost not installed (pip install catboost --break-system-packages)") return models # ───────────────────────────────────────────────────────────────────────────── # LAYER 2 — PYTORCH META-LEARNER # ───────────────────────────────────────────────────────────────────────────── def _meta_net(in_dim: int): import torch.nn as nn return nn.Sequential( nn.Linear(in_dim, 64), nn.LayerNorm(64), nn.GELU(), nn.Dropout(0.2), nn.Linear(64, 32), nn.GELU(), nn.Dropout(0.1), nn.Linear(32, 3), ) def train_meta_torch(net, X: np.ndarray, y: np.ndarray, device, epochs: int = 60): import torch, torch.nn as nn net = net.to(device) opt = torch.optim.AdamW(net.parameters(), lr=1e-3, weight_decay=1e-4) sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=epochs) crit = nn.CrossEntropyLoss() Xt = torch.FloatTensor(X).to(device) yt = torch.LongTensor(y).to(device) net.train() for _ in range(epochs): opt.zero_grad() loss = crit(net(Xt), yt) loss.backward() torch.nn.utils.clip_grad_norm_(net.parameters(), 1.0) opt.step(); sched.step() net.eval() return net def predict_meta_torch(net, X: np.ndarray, device) -> np.ndarray: import torch with torch.no_grad(): return torch.softmax(net(torch.FloatTensor(X).to(device)), dim=1).cpu().numpy() # ───────────────────────────────────────────────────────────────────────────── # GLOBAL MODEL — train on ALL symbols combined (~662K rows) # This is the right architecture when each symbol only has ~300 rows. # One model learns market-wide patterns; per-symbol fine-tuning adds specificity. # ───────────────────────────────────────────────────────────────────────────── def build_global_dataset( pairs: List[Tuple[str, Path]], max_symbols: Optional[int] = None, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, List[str], List[str]]: """ Load and engineer features for all symbols, stack into one big matrix. Returns (X, y, symbol_ids, feature_cols, symbol_list) Uses the LAST 20% of each symbol's rows as test — everything else is train/val. Symbol IDs allow per-symbol evaluation after global training. """ all_X, all_y, all_syms, all_dates = [], [], [], [] feature_cols_ref = None skipped = 0 # Re-apply equity filter in case pairs came from unfiltered source pairs = [(s,p) for s,p in pairs if not any( x in s.upper() for x in [ 'LIQUID','GILT','GSEC','TBILL','CASH','GOLD','SILVER','METAL', 'BBETF','EBBETF','BEES','NIFTY','LIQUIDADD','LIQUIDETF', 'CASHIETF','AXISBPS','LICNETF', ] ) and not s.upper().endswith(('ETF','BEES','LIQUID','ADD','IETF'))] if max_symbols: pairs = pairs[:max_symbols] log.info(f" Loading {len(pairs)} equity symbols...") for i, (sym, _) in enumerate(pairs): if i % 200 == 0: log.info(f" {i}/{len(pairs)} loaded, {sum(len(x) for x in all_X):,} rows so far") try: raw = load_candles(sym) if raw is None: skipped += 1 continue df = engineer_features(raw) fcols = feature_columns(df) df = df.dropna(subset=fcols + [TARGET]) df = df[df[TARGET].isin([0, 1, 2])].reset_index(drop=True) if len(df) < MIN_ROWS: skipped += 1 continue # Align feature columns across symbols if feature_cols_ref is None: feature_cols_ref = fcols else: fcols = feature_cols_ref # use first symbol's cols as reference # Only use columns that exist in this df available = [c for c in fcols if c in df.columns] X_sym = np.zeros((len(df), len(fcols)), dtype=np.float32) for j, fc in enumerate(fcols): if fc in df.columns: X_sym[:, j] = df[fc].values.astype(np.float32) all_X.append(X_sym) all_y.append(df[TARGET].values.astype(np.int32)) all_syms.extend([sym] * len(df)) if "date" in df.columns: all_dates.extend(df["date"].tolist()) else: all_dates.extend([None] * len(df)) except Exception as e: log.debug(f" [{sym}] skipped: {e}") skipped += 1 log.info(f" Loaded: {len(pairs)-skipped} symbols, {skipped} skipped") if not all_X: raise ValueError("No data loaded — check CLEAN_DIR path") X = np.vstack(all_X) y = np.concatenate(all_y) sym_ids = np.array(all_syms) log.info(f" Combined dataset: {X.shape[0]:,} rows × {X.shape[1]} features") cls, cnts = np.unique(y, return_counts=True) for c, n in zip(cls, cnts): log.info(f" class {c}: {n:,} ({n/len(y)*100:.1f}%)") return X, y, sym_ids, feature_cols_ref or [], all_dates def temporal_global_split( sym_ids: np.ndarray, all_dates: List, test_ratio: float = 0.20, ) -> Tuple[np.ndarray, np.ndarray]: """ For each symbol, put last test_ratio of its rows in test, rest in train. This prevents future leakage while keeping temporal structure. Returns (train_mask, test_mask). """ n = len(sym_ids) train_mask = np.ones(n, dtype=bool) test_mask = np.zeros(n, dtype=bool) symbols = np.unique(sym_ids) for sym in symbols: idx = np.where(sym_ids == sym)[0] cut = int(len(idx) * (1 - test_ratio)) test_mask[idx[cut:]] = True train_mask[idx[cut:]] = False return train_mask, test_mask def train_global_model(use_gpu: bool = True, max_symbols: Optional[int] = None) -> dict: """ Train one ensemble on all ~662K rows combined. Saves models to data/lake/ml/p170/models/GLOBAL/ Returns metrics dict. """ import joblib from sklearn.preprocessing import RobustScaler from sklearn.metrics import accuracy_score, classification_report log.info(f"\n{'#'*54}") log.info(f" GLOBAL MODEL TRAINING") log.info(f" ~662K rows across 2150 NSE symbols") log.info(f"{'#'*54}\n") t0 = time.time() # ── Load all data ───────────────────────────────────────────────────────── pairs = discover_symbols() X, y, sym_ids, fcols, all_dates = build_global_dataset(pairs, max_symbols=max_symbols) # ── Temporal split per symbol ───────────────────────────────────────────── train_mask, test_mask = temporal_global_split(sym_ids, all_dates, test_ratio=0.20) # Use 80% of training rows for train, 20% for val (within the train set) train_idx = np.where(train_mask)[0] val_cut = int(len(train_idx) * 0.80) val_idx = train_idx[val_cut:] tr_idx = train_idx[:val_cut] te_idx = np.where(test_mask)[0] X_tr, y_tr = X[tr_idx], y[tr_idx] X_val, y_val = X[val_idx], y[val_idx] X_te, y_te = X[te_idx], y[te_idx] log.info(f" Train: {len(X_tr):,} Val: {len(X_val):,} Test: {len(X_te):,}") # ── Scale ───────────────────────────────────────────────────────────────── scaler = RobustScaler() X_tr_s = scaler.fit_transform(X_tr) X_val_s = scaler.transform(X_val) X_te_s = scaler.transform(X_te) # ── Layer 1: base models ────────────────────────────────────────────────── base = make_base_models(use_gpu) nm = len(base) oof_val = np.zeros((len(X_val), nm * 3), dtype=np.float32) oof_te = np.zeros((len(X_te), nm * 3), dtype=np.float32) scores = {} for idx, (name, model) in enumerate(base.items()): log.info(f" [{name}] training on {len(X_tr):,} rows...") try: if name == "catboost": model.fit(X_tr_s, y_tr, eval_set=(X_val_s, y_val), verbose=100) elif name == "xgboost": model.fit(X_tr_s, y_tr, eval_set=[(X_val_s, y_val)], verbose=100) elif name == "lightgbm": model.fit(X_tr_s, y_tr, eval_set=[(X_val_s, y_val)]) else: model.fit(X_tr_s, y_tr) vp = model.predict_proba(X_val_s) tp = model.predict_proba(X_te_s) oof_val[:, idx*3:(idx+1)*3] = vp oof_te[:, idx*3:(idx+1)*3] = tp acc = float(accuracy_score(y_val, np.argmax(vp, axis=1))) scores[name] = round(acc, 4) log.info(f" [{name}] val acc = {acc:.4f}") except Exception as e: log.error(f" [{name}] failed: {e}") log.debug(traceback.format_exc()) oof_val[:, idx*3:(idx+1)*3] = 1/3 oof_te[:, idx*3:(idx+1)*3] = 1/3 # ── Layer 2: simple probability average ───────────────────────────────── # Stacking hurts here: val and test cover different time periods per symbol, # so weights learned on val are wrong for test. Simple average wins. log.info(f" [ensemble] averaging {len(base)} base models...") nm_actual = oof_te.shape[1] // 3 ens_proba = np.zeros((len(oof_te), 3), dtype=np.float32) active = 0 for mi in range(nm_actual): block = oof_te[:, mi*3:(mi+1)*3] if block.std() > 0.001: ens_proba += block active += 1 ens_proba = ens_proba / max(active, 1) meta_type = "simple_average" meta_net_obj = None log.info(f" averaged {active} active base models") ens_pred = np.argmax(ens_proba, axis=1) ens_acc = float(accuracy_score(y_te, ens_pred)) log.info(f" [ensemble] global test acc = {ens_acc:.4f}") log.info(f"\n{classification_report(y_te, ens_pred, target_names=['DOWN','NEUTRAL','UP'])}") # ── Per-symbol test accuracy ─────────────────────────────────────────────── test_syms = sym_ids[te_idx] sym_accs = {} for sym in np.unique(test_syms): mask = test_syms == sym if mask.sum() >= 5: sym_accs[sym] = round(float(accuracy_score(y_te[mask], ens_pred[mask])), 4) # ── Feature importance ──────────────────────────────────────────────────── top_features = [] if "xgboost" in base: try: imp = base["xgboost"].feature_importances_ top_idx = np.argsort(imp)[-15:][::-1] top_features = [[fcols[i], round(float(imp[i]), 4)] for i in top_idx] except Exception: pass # ── Save global model ───────────────────────────────────────────────────── gdir = MODELS_DIR / "GLOBAL" gdir.mkdir(exist_ok=True) joblib.dump(scaler, gdir / "scaler.joblib") joblib.dump(fcols, gdir / "feature_cols.joblib") for name, model in base.items(): try: joblib.dump(model, gdir / f"{name}.joblib") log.info(f" saved {name}.joblib") except Exception as e: log.warning(f" save {name} failed: {e}") if meta_net_obj is not None: try: import torch torch.save(meta_net_obj.state_dict(), gdir / "meta_net.pt") with open(gdir / "meta_config.json", "w") as f: json.dump({"type": "neural_net", "input_dim": oof_val.shape[1]}, f) except Exception: pass # ── Save test predictions ───────────────────────────────────────────────── pd.DataFrame({ "symbol": test_syms, "pred_class": ens_pred, "prob_down": ens_proba[:, 0], "prob_neutral": ens_proba[:, 1], "prob_up": ens_proba[:, 2], "actual_class": y_te, }).to_parquet(PREDS_DIR / "GLOBAL_p170_predictions.parquet", index=False) # ── Metrics ─────────────────────────────────────────────────────────────── elapsed = round(time.time() - t0, 1) cls, cnts = np.unique(y_tr, return_counts=True) # top/bottom 10 symbols by accuracy sorted_syms = sorted(sym_accs.items(), key=lambda x: x[1], reverse=True) top10 = sorted_syms[:10] bot10 = sorted_syms[-10:] metrics = { "model": "GLOBAL", "status": "success", "timestamp": datetime.now().isoformat(), "n_symbols": int(len(np.unique(sym_ids))), "n_rows_total": int(len(X)), "n_train": int(len(X_tr)), "n_val": int(len(X_val)), "n_test": int(len(X_te)), "n_features": int(len(fcols)), "base_models": list(base.keys()), "meta_learner_type": meta_type, "base_val_accuracy": scores, "ensemble_test_accuracy": round(ens_acc, 4), "top_features": top_features[:10], "top10_symbols": top10, "bottom10_symbols": bot10, "elapsed_seconds": elapsed, "class_distribution_train": {str(int(k)): int(v) for k, v in zip(cls, cnts)}, } with open(METRICS_DIR / "GLOBAL_p170_metrics.json", "w") as f: json.dump(metrics, f, indent=2) log.info(f"\n{'='*54}") log.info(f" GLOBAL MODEL DONE {elapsed:.0f}s") log.info(f" Test accuracy: {ens_acc:.4f} ({len(X_te):,} rows across {len(sym_accs)} symbols)") log.info(f" Top 5 symbols: {top10[:5]}") log.info(f"{'='*54}\n") return metrics # ───────────────────────────────────────────────────────────────────────────── # INFERENCE — predict today using global model # ───────────────────────────────────────────────────────────────────────────── def predict_today(symbol: str, use_gpu: bool = True) -> Optional[dict]: """Load global model and predict on the latest row for a symbol.""" import joblib gdir = MODELS_DIR / "GLOBAL" if not (gdir / "scaler.joblib").exists(): log.error("No global model found. Run: python p170_max_system.py --global-model") return None raw = load_candles(symbol) if raw is None: return None df = engineer_features(raw) fcols = joblib.load(gdir / "feature_cols.joblib") scaler = joblib.load(gdir / "scaler.joblib") # Latest row with all features available last = df.iloc[-1] X = np.zeros((1, len(fcols)), dtype=np.float32) for i, fc in enumerate(fcols): if fc in last.index and not pd.isna(last[fc]): X[0, i] = float(last[fc]) X_s = scaler.transform(X) # Collect base model probas base_probas = [] for name in ["xgboost", "lightgbm", "catboost", "random_forest", "extra_trees"]: mp = gdir / f"{name}.joblib" if mp.exists(): try: base_probas.append(joblib.load(mp).predict_proba(X_s)[0]) except Exception: pass if not base_probas: return None meta_input = np.concatenate(base_probas).reshape(1, -1).astype(np.float32) cfg_path = gdir / "meta_config.json" try: import torch if cfg_path.exists(): with open(cfg_path) as f: cfg = json.load(f) net = _meta_net(cfg["input_dim"]) net.load_state_dict(torch.load(gdir / "meta_net.pt", map_location="cpu")) net.eval() device = torch.device("cuda" if use_gpu and torch.cuda.is_available() else "cpu") proba = predict_meta_torch(net, meta_input, device)[0] else: proba = np.mean(base_probas, axis=0) except Exception: proba = np.mean(base_probas, axis=0) pred = int(np.argmax(proba)) label = {0: "DOWN", 1: "NEUTRAL", 2: "UP"}[pred] return { "symbol": symbol, "date": str(last.get("date", "latest")), "prediction": label, "prob_down": round(float(proba[0]), 4), "prob_neutral": round(float(proba[1]), 4), "prob_up": round(float(proba[2]), 4), "confidence": round(float(proba.max()), 4), "rsi_14": round(float(last.get("rsi_14", np.nan)), 2), "macd_hist": round(float(last.get("macd_hist", np.nan)), 4), "adx_14": round(float(last.get("adx_14", np.nan)), 2), "ema_align": int(last.get("ema_align", -1)), } # ───────────────────────────────────────────────────────────────────────────── # LEGACY: per-symbol batch (kept for compatibility, but global model is better) # ───────────────────────────────────────────────────────────────────────────── def run_batch(pairs: List[Tuple[str, Path]], use_gpu: bool = True) -> dict: """Thin wrapper — just calls train_global_model with all pairs.""" return train_global_model(use_gpu=use_gpu, max_symbols=len(pairs)) # ───────────────────────────────────────────────────────────────────────────── # DEPENDENCY CHECK # ───────────────────────────────────────────────────────────────────────────── def check_deps(use_gpu: bool): print("\n=== P170 Dependency Check ===\n") for name, mod in [("numpy","numpy"),("pandas","pandas"), ("scikit-learn","sklearn"),("xgboost","xgboost"),("joblib","joblib")]: try: __import__(mod); print(f" ✓ {name}") except: print(f" ✗ {name} (pip install {name} --break-system-packages)") for name, mod in [("lightgbm","lightgbm"),("catboost","catboost"),("optuna","optuna")]: try: __import__(mod); print(f" ✓ {name} (optional)") except: print(f" ~ {name} (optional — pip install {name} --break-system-packages)") try: import torch cuda = torch.cuda.is_available() dev = torch.cuda.get_device_name(0) if cuda else "CPU only" print(f" ✓ pytorch ({dev})") if cuda: print(f" GPU memory: {torch.cuda.get_device_properties(0).total_memory/1e9:.1f} GB") except ImportError: print(" ~ pytorch (meta-learner falls back to LogisticRegression without it)") pairs = discover_symbols() print(f"\nPaths:") print(f" Clean data : {CLEAN_DIR} exists={CLEAN_DIR.exists()}") print(f" Features : {FEAT_DIR} exists={FEAT_DIR.exists()}") print(f" Output : {OUT}") print(f"\nSymbols found: {len(pairs)}") if pairs: print(" Largest 10 (most data first):") for sym, path in pairs[:10]: print(f" {sym:20s} {path.stat().st_size//1024:6d} KB {path.name}") print() # ───────────────────────────────────────────────────────────────────────────── # CLI # ───────────────────────────────────────────────────────────────────────────── def main(): p = argparse.ArgumentParser(description="P170 MAX SYSTEM") g = p.add_mutually_exclusive_group(required=True) g.add_argument("--global-model", action="store_true", help="Train global model on all 2150 symbols combined (recommended)") g.add_argument("--all-symbols", action="store_true", help="Alias for --global-model") g.add_argument("--predict", type=str, help="Predict today for a symbol using global model") g.add_argument("--check-deps", action="store_true", help="Check dependencies and paths") p.add_argument("--top-n", type=int, default=None, help="Limit to top N symbols by data size (useful for quick tests)") p.add_argument("--no-gpu", action="store_true", help="Force CPU") args = p.parse_args() gpu = not args.no_gpu if args.check_deps: check_deps(gpu) elif args.global_model or args.all_symbols: m = train_global_model(use_gpu=gpu, max_symbols=args.top_n) print(json.dumps(m, indent=2, default=str)) elif args.predict: r = predict_today(args.predict.upper(), use_gpu=gpu) print(json.dumps(r, indent=2) if r else f"No global model found. Run --global-model first.") if __name__ == "__main__": main()