DHAN_AI_DATA / p170_max_system.py
addyAIMLprojects's picture
P170 10yr model + code β€” 2026-05-15
b22f9ac verified
"""
P170 MAXIMUM SYSTEM β€” dhan_financial_intelligence
==================================================
Confirmed data layout:
data/lake/clean/market_data/NSE/SYMBOL_real_market_data_v2.parquet
columns: [timestamp, open, high, low, close, volume] ← 2155 symbols
data/lake/ml/features/NSE/SYMBOL_features_ml_dataset_v1.parquet
β†’ only 5 symbols (HDFCBANK, ICICIBANK, INFY, RELIANCE, TCS) β€” fallback only
All outputs go to data/lake/ml/p170/ β€” nothing existing is touched.
Usage:
cd /home/user1/dhan_financial_intelligence
source .venv/bin/activate
pip install lightgbm catboost --break-system-packages # recommended
python p170_max_system.py --check-deps
python p170_max_system.py --symbol RELIANCE
python p170_max_system.py --all-symbols --top-n 50
python p170_max_system.py --all-symbols
python p170_max_system.py --predict RELIANCE
"""
import sys
import json
import time
import logging
import warnings
import argparse
import traceback
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Tuple
import numpy as np
import pandas as pd
warnings.filterwarnings("ignore")
# ── Logging ───────────────────────────────────────────────────────────────────
Path("logs/ml").mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(f"logs/ml/p170_{datetime.now():%Y%m%d_%H%M%S}.log"),
],
)
log = logging.getLogger("p170")
# ─────────────────────────────────────────────────────────────────────────────
# PATHS
# ─────────────────────────────────────────────────────────────────────────────
BASE = Path(".")
CLEAN_DIR = BASE / "data/lake/clean/market_data/NSE" # 2155 *_real_market_data_v2.parquet
FEAT_DIR = BASE / "data/lake/ml/features/NSE" # 5 *_features_ml_dataset_v1.parquet
OUT = BASE / "data/lake/ml/p170"
MODELS_DIR = OUT / "models"
METRICS_DIR = OUT / "metrics"
PREDS_DIR = OUT / "predictions"
REGISTRY_DIR = OUT / "registry"
for d in [MODELS_DIR, METRICS_DIR, PREDS_DIR, REGISTRY_DIR]:
d.mkdir(parents=True, exist_ok=True)
# ─────────────────────────────────────────────────────────────────────────────
# LABEL CONFIG (matches existing policy exactly)
# ─────────────────────────────────────────────────────────────────────────────
TARGET = "direction_class" # 0=down 1=neutral 2=up
HORIZON = 3 # 3-day forward return
POS_TH = 0.015 # β‰₯+1.5% β†’ up (was 0.5% β†’ gave 9% neutral, unusable)
NEG_TH = -0.015 # ≀-1.5% β†’ down (Β±1.5% gives ~33/33/33 split)
TRAIN_R = 0.70
VAL_R = 0.15
TEST_R = 0.15
WF_FOLDS = 5
MIN_ROWS = 80
SEED = 42
np.random.seed(SEED)
# ─────────────────────────────────────────────────────────────────────────────
# SYMBOL DISCOVERY
# ─────────────────────────────────────────────────────────────────────────────
def discover_symbols() -> List[Tuple[str, Path]]:
"""
Returns [(symbol, parquet_path), ...] sorted largest-file-first (most data first).
Strips _real_market_data_v2 to get clean tickers.
"""
seen: dict = {}
# Primary: 2155 clean market data files
for f in CLEAN_DIR.glob("*_real_market_data_v2.parquet"):
sym = f.stem.replace("_real_market_data_v2", "").upper()
if sym:
seen[sym] = f
# Fallback: 5 legacy feature files (only adds symbols not already in clean data)
for f in FEAT_DIR.glob("*_features_ml_dataset_v1.parquet"):
sym = f.stem.replace("_features_ml_dataset_v1", "").upper()
if sym and sym not in seen:
seen[sym] = f
pairs = list(seen.items())
pairs.sort(key=lambda x: x[1].stat().st_size, reverse=True)
return pairs
# ─────────────────────────────────────────────────────────────────────────────
# DATA LOADING
# ─────────────────────────────────────────────────────────────────────────────
def load_candles(symbol: str) -> Optional[pd.DataFrame]:
"""
Load OHLCV for symbol. Returns DataFrame with column 'date' (datetime).
Clean market data files use 'timestamp' β€” renamed here.
Returns None if not found or too small.
"""
candidates = [
CLEAN_DIR / f"{symbol}_real_market_data_v2.parquet", # primary (2155 symbols)
CLEAN_DIR / f"{symbol}.parquet", # alternative naming
FEAT_DIR / f"{symbol}_features_ml_dataset_v1.parquet", # legacy fallback
]
for path in candidates:
if not path.exists():
continue
try:
df = pd.read_parquet(path)
# Normalise date column
if "date" not in df.columns:
if "timestamp" in df.columns:
df = df.rename(columns={"timestamp": "date"})
elif df.index.name:
df = df.reset_index().rename(columns={df.index.name: "date"})
else:
continue
df["date"] = pd.to_datetime(df["date"])
# Need full OHLCV β€” feature-only files don't have open/high/low
if not all(c in df.columns for c in ["open", "high", "low", "close", "volume"]):
continue
df = (df[["date", "open", "high", "low", "close", "volume"]]
.dropna(subset=["close", "volume"])
.sort_values("date")
.reset_index(drop=True))
if len(df) >= MIN_ROWS:
return df
except Exception as e:
log.debug(f"Could not load {path}: {e}")
continue
return None
# ─────────────────────────────────────────────────────────────────────────────
# LAYER 0 β€” FEATURE ENGINEERING (87 features)
# ─────────────────────────────────────────────────────────────────────────────
# ── NIFTYBEES market index cache ──────────────────────────────────────────────
_NIFTYBEES_CACHE: dict = {}
def _load_niftybees() -> "pd.DataFrame | None":
"""
Load NIFTYBEES (Nifty50 proxy) and compute market-level features.
Cached after first load. Returns DataFrame with columns:
[date, mkt_ret_1d, mkt_ret_5d, mkt_vol_20d, mkt_above_ma20]
"""
global _NIFTYBEES_CACHE
if "df" in _NIFTYBEES_CACHE:
return _NIFTYBEES_CACHE["df"]
path = CLEAN_DIR / "NIFTYBEES_real_market_data_v2.parquet"
if not path.exists():
_NIFTYBEES_CACHE["df"] = None
return None
try:
m = pd.read_parquet(path)
if "timestamp" in m.columns:
m = m.rename(columns={"timestamp": "date"})
m["date"] = pd.to_datetime(m["date"])
m = m.sort_values("date").reset_index(drop=True)
mc = m["close"].astype(float)
m["mkt_ret_1d"] = mc.pct_change(1)
m["mkt_ret_5d"] = mc.pct_change(5)
m["mkt_vol_20d"] = m["mkt_ret_1d"].rolling(20).std()
m["mkt_ma20"] = mc.rolling(20).mean()
m["mkt_above_ma20"] = (mc > m["mkt_ma20"]).astype(int)
# Keep only the columns we need for merging
mkt_out = m[["date","mkt_ret_1d","mkt_ret_5d",
"mkt_vol_20d","mkt_above_ma20"]].copy()
# ffill/bfill warmup NaNs
for col in ["mkt_ret_1d","mkt_ret_5d","mkt_vol_20d","mkt_above_ma20"]:
mkt_out[col] = mkt_out[col].ffill().bfill().fillna(0)
_NIFTYBEES_CACHE["df"] = mkt_out
return mkt_out
except Exception as e:
log.debug(f"NIFTYBEES load failed: {e}")
_NIFTYBEES_CACHE["df"] = None
return None
# ── Cross-asset data cache (yfinance) ─────────────────────────────────────────
_CROSS_ASSET_CACHE: dict = {}
def _load_cross_asset(ticker: str, key: str) -> "pd.DataFrame | None":
"""Load cross-asset daily returns. Reads from parquet cache first, falls back to yfinance."""
global _CROSS_ASSET_CACHE
if key in _CROSS_ASSET_CACHE:
return _CROSS_ASSET_CACHE[key]
# Parquet name mapping
parquet_map = {"nifty": "NIFTY50", "usdinr": "USDINR", "crude": "CRUDE"}
pname = parquet_map.get(key)
cross_path = BASE / f"data/lake/clean/market_data/CROSS/{pname}.parquet" if pname else None
try:
if cross_path and cross_path.exists():
raw = pd.read_parquet(cross_path)
raw["date"] = pd.to_datetime(raw["date"]).dt.tz_localize(None)
result = raw[["date", "ret_1d", "ret_5d"]].copy()
_CROSS_ASSET_CACHE[key] = result
log.debug(f"Cross-asset {key}: loaded {len(result)} rows from parquet")
return result
# Fallback: yfinance
import yfinance as yf
raw = yf.download(ticker, period="max", progress=False, auto_adjust=True)
if raw is None or raw.empty:
_CROSS_ASSET_CACHE[key] = None
return None
if isinstance(raw.columns, pd.MultiIndex):
raw.columns = [c[0].lower() for c in raw.columns]
else:
raw.columns = [c.lower() for c in raw.columns]
raw = raw.reset_index()
raw = raw.rename(columns={raw.columns[0]: "date"})
raw["date"] = pd.to_datetime(raw["date"]).dt.tz_localize(None)
raw["ret_1d"] = raw["close"].pct_change(1)
raw["ret_5d"] = raw["close"].pct_change(5)
result = raw[["date", "ret_1d", "ret_5d"]].copy()
_CROSS_ASSET_CACHE[key] = result
return result
except Exception as e:
log.debug(f"Cross-asset {ticker} failed: {e}")
_CROSS_ASSET_CACHE[key] = None
return None
def _hurst_rs(prices: np.ndarray) -> float:
"""Hurst exponent via R/S analysis. >0.5=trending, <0.5=mean-reverting."""
try:
n = len(prices)
if n < 20:
return 0.5
mean_adj = prices - np.mean(prices)
cs = np.cumsum(mean_adj)
r = np.max(cs) - np.min(cs)
s = np.std(prices, ddof=1)
if s == 0:
return 0.5
return float(np.log(r / s) / np.log(n))
except Exception:
return 0.5
def engineer_features(raw: pd.DataFrame) -> pd.DataFrame:
"""
Input : OHLCV DataFrame with [date, open, high, low, close, volume]
Output: same rows with 87 feature columns + future_return + direction_class
"""
df = raw.copy()
c = df["close"].astype(float)
h = df["high"].astype(float)
l = df["low"].astype(float)
o = df["open"].astype(float)
v = df["volume"].astype(float).replace(0, np.nan)
# ── Returns ───────────────────────────────────────────────────────────────
# ── Keltner Channels ─────────────────────────────────────────────────────
keltner_mid = c.ewm(span=20, adjust=False).mean()
_tr_k = pd.concat([h-l, (h-c.shift()).abs(), (l-c.shift()).abs()], axis=1).max(axis=1)
atr14_k = _tr_k.ewm(span=14, adjust=False).mean()
df["keltner_width"] = (keltner_mid + 2*atr14_k - (keltner_mid - 2*atr14_k)) / (keltner_mid + 1e-9)
# ── Historical volatility (annualised) ────────────────────────────────────
log_ret = np.log(c / (c.shift(1) + 1e-9))
for w in [10, 20, 60]:
df[f"hist_vol_{w}"] = log_ret.rolling(w).std() * np.sqrt(252)
# ── Money Flow Index ─────────────────────────────────────────────────────
typical = (h + l + c) / 3
raw_mf = typical * v
pos_mf = raw_mf.where(typical > typical.shift(1), 0).rolling(14).sum()
neg_mf = raw_mf.where(typical < typical.shift(1), 0).rolling(14).sum()
df["mfi_14"] = 100 - (100 / (1 + pos_mf / (neg_mf + 1e-9)))
# ── Chaikin Money Flow ────────────────────────────────────────────────────
clv = ((c - l) - (h - c)) / (h - l + 1e-9)
df["cmf_20"] = (clv * v).rolling(20).sum() / (v.rolling(20).sum() + 1e-9)
# ── VWAP distance ─────────────────────────────────────────────────────────
typical_p = (h + l + c) / 3
vwap = (typical_p * v).rolling(20).sum() / (v.rolling(20).sum() + 1e-9)
df["vwap_dist"] = (c - vwap) / (vwap + 1e-9)
# ── Amihud illiquidity ────────────────────────────────────────────────────
df["amihud_illiquidity"] = (c.pct_change().abs() / (v * c + 1e-9)).rolling(20).mean()
# ── Volume ratios ─────────────────────────────────────────────────────────
df["vol_ratio_5"] = v / (v.rolling(5).mean() + 1e-9)
df["vol_ratio_20"] = v / (v.rolling(20).mean() + 1e-9)
# ── OBV divergence ────────────────────────────────────────────────────────
obv_raw = (np.sign(c.diff()) * v).cumsum()
obv_ema = obv_raw.ewm(span=20, adjust=False).mean()
df["obv_divergence"] = obv_raw - obv_ema
# ── Williams %R ───────────────────────────────────────────────────────────
df["williams_r"] = -100 * (h.rolling(14).max() - c) / (h.rolling(14).max() - l.rolling(14).min() + 1e-9)
# ── CCI ───────────────────────────────────────────────────────────────────
df["cci_20"] = (c - c.rolling(20).mean()) / (0.015 * c.rolling(20).std() + 1e-9)
# ── MACD cross up (computed after MACD section) ─────────────────────────
if "macd" in df.columns and "macd_sig" in df.columns:
df["macd_cross_up"] = (
(df["macd"] > df["macd_sig"]) & (df["macd"].shift(1) <= df["macd_sig"].shift(1))
).astype(int)
else:
df["macd_cross_up"] = 0
# ── Stoch cross up ────────────────────────────────────────────────────────
if "stoch_k_14" in df.columns and "stoch_d_14" in df.columns:
df["stoch_cross_up"] = (
(df["stoch_k_14"] > df["stoch_d_14"]) &
(df["stoch_k_14"].shift(1) <= df["stoch_d_14"].shift(1))
).astype(int)
else:
df["stoch_cross_up"] = 0
# ── Candlestick patterns ─────────────────────────────────────────────────
body = (c - o).abs()
full_range = (h - l).replace(0, np.nan)
upper_sh = (h - pd.concat([c, o], axis=1).max(axis=1)) / full_range
lower_sh = (pd.concat([c, o], axis=1).min(axis=1) - l) / full_range
df["gap_up"] = ((o - c.shift(1)) / (c.shift(1) + 1e-9)).clip(lower=0)
df["gap_down"] = ((c.shift(1) - o) / (c.shift(1) + 1e-9)).clip(lower=0)
df["inside_bar"] = ((h < h.shift(1)) & (l > l.shift(1))).astype(int)
df["doji"] = (body / (full_range + 1e-9) < 0.1).astype(int)
df["hammer"] = ((lower_sh > 2 * body / (full_range + 1e-9)) & (upper_sh < 0.3)).astype(int)
df["shooting_star"] = ((upper_sh > 2 * body / (full_range + 1e-9)) & (lower_sh < 0.3)).astype(int)
df["engulfing_bull"] = (
(c > o) & (o < c.shift(1)) & (c > o.shift(1)) & (c.shift(1) < o.shift(1))
).astype(int)
df["engulfing_bear"] = (
(c < o) & (o > c.shift(1)) & (c < o.shift(1)) & (c.shift(1) > o.shift(1))
).astype(int)
# ── Drawdown from recent high ─────────────────────────────────────────────
df["dd_from_high_20"] = (c - c.rolling(20).max()) / (c.rolling(20).max() + 1e-9)
df["dd_from_high_60"] = (c - c.rolling(60).max()) / (c.rolling(60).max() + 1e-9)
# ── Rolling Sharpe ────────────────────────────────────────────────────────
log_ret_s = np.log(c / (c.shift(1) + 1e-9))
df["rolling_sharpe_20"] = (log_ret_s.rolling(20).mean() / (log_ret_s.rolling(20).std() + 1e-9)) * np.sqrt(252)
df["rolling_sharpe_60"] = (log_ret_s.rolling(60).mean() / (log_ret_s.rolling(60).std() + 1e-9)) * np.sqrt(252)
# ── Autocorrelation (trend persistence proxy, faster than Hurst) ──────────
df["hurst_50"] = c.pct_change().rolling(20).apply(lambda x: x.autocorr(lag=1) if len(x)>5 else 0, raw=False)
# ── Cross-asset macro features ────────────────────────────────────────────
try:
nifty_df = _load_cross_asset("^NSEI", "nifty")
usdinr_df = _load_cross_asset("USDINR=X", "usdinr")
crude_df = _load_cross_asset("CL=F", "crude")
if "date" in df.columns:
df["date"] = pd.to_datetime(df["date"])
for cross, prefix in [(nifty_df,"nifty"), (usdinr_df,"usdinr"), (crude_df,"crude")]:
if cross is not None:
merged = df[["date"]].merge(cross, on="date", how="left")
if prefix == "nifty":
df["nifty_ret_1d"] = merged["ret_1d"].values
df["nifty_ret_5d"] = merged["ret_5d"].values
# Beta: cov(stock, nifty) / var(nifty)
sr = c.pct_change()
nr = pd.Series(merged["ret_1d"].values, index=df.index)
df["beta_20d"] = sr.rolling(20).cov(nr) / (nr.rolling(20).var() + 1e-9)
elif prefix == "usdinr":
df["usd_inr_ret"] = merged["ret_1d"].values
elif prefix == "crude":
df["crude_ret"] = merged["ret_1d"].values
except Exception as e:
log.debug(f"Cross-asset features skipped: {e}")
for col in ["nifty_ret_1d","nifty_ret_5d","beta_20d","usd_inr_ret","crude_ret"]:
if col not in df.columns:
df[col] = 0.0
for p in [1, 3, 5, 10, 20]:
df[f"return_{p}d"] = c.pct_change(p)
# ── Volatility ────────────────────────────────────────────────────────────
r1 = df["return_1d"]
for w in [5, 10, 20]:
df[f"volatility_{w}d"] = r1.rolling(w).std()
df["vol_ratio"] = df["volatility_5d"] / (df["volatility_20d"] + 1e-9)
df["skew_20"] = r1.rolling(20).skew()
df["kurt_20"] = r1.rolling(20).kurt()
# ── Moving averages ───────────────────────────────────────────────────────
for w in [5, 10, 20, 50]:
df[f"ma_{w}"] = c.rolling(w).mean()
df[f"close_to_ma_{w}"] = (c - df[f"ma_{w}"]) / (df[f"ma_{w}"] + 1e-9)
# ── EMA alignment ─────────────────────────────────────────────────────────
ema5 = c.ewm(span=5, adjust=False).mean()
ema20 = c.ewm(span=20, adjust=False).mean()
ema50 = c.ewm(span=50, adjust=False).mean()
df["ema_5_20_ratio"] = ema5 / (ema20 + 1e-9)
df["ema_20_50_ratio"] = ema20 / (ema50 + 1e-9)
df["ema_align"] = (ema5 > ema20).astype(int) + (ema20 > ema50).astype(int)
# ── RSI (7, 14, 21) ───────────────────────────────────────────────────────
delta = c.diff()
gain = delta.clip(lower=0)
loss = (-delta).clip(lower=0)
for period in [7, 14, 21]:
ag = gain.ewm(com=period - 1, min_periods=period).mean()
al = loss.ewm(com=period - 1, min_periods=period).mean()
df[f"rsi_{period}"] = 100 - 100 / (1 + ag / (al + 1e-9))
# ── MACD ──────────────────────────────────────────────────────────────────
ema12 = c.ewm(span=12, adjust=False).mean()
ema26 = c.ewm(span=26, adjust=False).mean()
df["macd"] = ema12 - ema26
df["macd_sig"] = df["macd"].ewm(span=9, adjust=False).mean()
df["macd_hist"] = df["macd"] - df["macd_sig"]
df["macd_cross"] = (df["macd"] > df["macd_sig"]).astype(int)
# ── Bollinger Bands ───────────────────────────────────────────────────────
for w in [10, 20]:
mid = c.rolling(w).mean()
std = c.rolling(w).std()
bb_u = mid + 2 * std
bb_l = mid - 2 * std
df[f"bb_width_{w}"] = (bb_u - bb_l) / (mid + 1e-9)
df[f"bb_pct_{w}"] = (c - bb_l) / (bb_u - bb_l + 1e-9)
# ── ATR ───────────────────────────────────────────────────────────────────
tr = pd.concat(
[h - l, (h - c.shift()).abs(), (l - c.shift()).abs()], axis=1
).max(axis=1)
for w in [7, 14]:
atr = tr.ewm(span=w, adjust=False).mean()
df[f"atr_{w}"] = atr
df[f"atr_pct_{w}"] = atr / (c + 1e-9)
# ── ADX / DI ──────────────────────────────────────────────────────────────
plus_dm = (h - h.shift(1)).clip(lower=0)
minus_dm = (l.shift(1) - l).clip(lower=0)
plus_dm = plus_dm.where(plus_dm > minus_dm, 0)
minus_dm = minus_dm.where(minus_dm > plus_dm, 0)
atr14 = tr.ewm(span=14, adjust=False).mean() + 1e-9
plus_di = 100 * plus_dm.ewm(span=14, adjust=False).mean() / atr14
minus_di = 100 * minus_dm.ewm(span=14, adjust=False).mean() / atr14
dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di + 1e-9)
df["adx_14"] = dx.ewm(span=14, adjust=False).mean()
df["plus_di_14"] = plus_di
df["minus_di_14"] = minus_di
# ── Stochastics ───────────────────────────────────────────────────────────
for k in [5, 14]:
lo_k = l.rolling(k).min()
hi_k = h.rolling(k).max()
stoch_k = 100 * (c - lo_k) / (hi_k - lo_k + 1e-9)
df[f"stoch_k_{k}"] = stoch_k
df[f"stoch_d_{k}"] = stoch_k.rolling(3).mean()
# ── Donchian channel position ──────────────────────────────────────────────
for w in [10, 20]:
dc_hi = h.rolling(w).max()
dc_lo = l.rolling(w).min()
df[f"donchian_{w}"] = (c - dc_lo) / (dc_hi - dc_lo + 1e-9)
# ── Candle structure ──────────────────────────────────────────────────────
rng = h - l + 1e-9
df["candle_body"] = (c - o) / rng
df["upper_shadow"] = (h - c.clip(lower=o)) / rng
df["lower_shadow"] = (o.clip(upper=c) - l) / rng
df["gap"] = (o - c.shift(1)) / (c.shift(1) + 1e-9)
# ── Volume features ───────────────────────────────────────────────────────
for w in [5, 20]:
vmean = v.rolling(w).mean()
vstd = v.rolling(w).std() + 1e-9
df[f"volume_z_{w}"] = (v - vmean) / vstd
obv = (np.sign(r1) * v).cumsum()
df["obv_z"] = (obv - obv.rolling(20).mean()) / (obv.rolling(20).std() + 1e-9)
df["vpt_5"] = (r1 * v).rolling(5).sum()
# ── Momentum / ROC ────────────────────────────────────────────────────────
for p in [3, 5, 10, 20]:
df[f"roc_{p}"] = c.pct_change(p)
df["momentum_score"] = (
df["roc_3"].rank(pct=True) * 0.4
+ df["roc_5"].rank(pct=True) * 0.3
+ df["roc_10"].rank(pct=True) * 0.2
+ df["roc_20"].rank(pct=True) * 0.1
)
# ── Market regime features (NIFTYBEES as Nifty50 proxy) ──────────────────
# Cross-sectional context: is this stock outperforming the market?
# Loaded once, merged by date, gracefully skipped if unavailable.
try:
mkt = _load_niftybees()
if mkt is not None and "date" in df.columns:
df["date"] = pd.to_datetime(df["date"])
df = df.merge(mkt, on="date", how="left")
# stock vs market relative strength
df["stock_vs_mkt_5d"] = df["return_5d"] - df["mkt_ret_5d"].fillna(0)
df["stock_vs_mkt_20d"] = df["return_20d"] - df["mkt_ret_5d"].fillna(0) * 4
mkt_cols = ["mkt_ret_1d","mkt_ret_5d","mkt_vol_20d",
"mkt_above_ma20","stock_vs_mkt_5d","stock_vs_mkt_20d"]
df[mkt_cols] = df[mkt_cols].ffill().bfill().fillna(0)
except Exception as e:
log.debug(f"Market regime features skipped: {e}")
# ── Target label (matches existing policy exactly) ───────────────────────
fwd = c.shift(-HORIZON) / c - 1
df["future_return"] = fwd
df[TARGET] = 1 # neutral default
df.loc[fwd >= POS_TH, TARGET] = 2 # up
df.loc[fwd <= NEG_TH, TARGET] = 0 # down
# Forward-fill warmup NaNs (first ~50 rows) so dropna doesn't slash feature count.
# Only fills leading NaNs β€” does NOT touch mid-series NaNs which are real gaps.
feat_cols_now = [col for col in df.columns
if col not in {"date","open","high","low","close","volume",
"future_return", TARGET}]
df[feat_cols_now] = df[feat_cols_now].ffill().bfill()
return df
def feature_columns(df: pd.DataFrame) -> List[str]:
drop = {"date","open","high","low","close","volume",
"symbol","exchange","timestamp","future_return", TARGET}
return [c for c in df.columns if c not in drop]
# ─────────────────────────────────────────────────────────────────────────────
# LAYER 1 β€” BASE MODELS
# ─────────────────────────────────────────────────────────────────────────────
def make_base_models(use_gpu: bool) -> dict:
from sklearn.ensemble import RandomForestClassifier, ExtraTreesClassifier
import xgboost as xgb
gpu_xgb = {"device": "cuda", "tree_method": "hist"} if use_gpu else {}
models = {
"xgboost": xgb.XGBClassifier(
n_estimators=300, max_depth=5, learning_rate=0.05,
subsample=0.8, colsample_bytree=0.8,
min_child_weight=3, gamma=0.1, reg_alpha=0.1, reg_lambda=1.0,
objective="multi:softprob", num_class=3, eval_metric="mlogloss",
random_state=SEED, verbosity=0, n_jobs=-1, **gpu_xgb,
),
"random_forest": RandomForestClassifier(
n_estimators=200, max_depth=8, min_samples_leaf=5,
max_features="sqrt", class_weight="balanced",
random_state=SEED, n_jobs=-1,
),
"extra_trees": ExtraTreesClassifier(
n_estimators=200, max_depth=10, min_samples_leaf=3,
class_weight="balanced", random_state=SEED, n_jobs=-1,
),
}
try:
import lightgbm as lgb
models["lightgbm"] = lgb.LGBMClassifier(
n_estimators=300, num_leaves=63, learning_rate=0.05,
subsample=0.8, colsample_bytree=0.8, min_child_samples=10,
reg_alpha=0.1, reg_lambda=1.0,
objective="multiclass", num_class=3,
random_state=SEED, verbose=-1, n_jobs=-1,
)
log.info(" LightGBM βœ“")
except ImportError:
log.warning(" LightGBM not installed (pip install lightgbm --break-system-packages)")
try:
from catboost import CatBoostClassifier
cb_gpu = {"task_type": "GPU"} if use_gpu else {"task_type": "CPU"}
models["catboost"] = CatBoostClassifier(
iterations=300, depth=6, learning_rate=0.05, l2_leaf_reg=3,
loss_function="MultiClass", classes_count=3,
random_seed=SEED, verbose=False, **cb_gpu,
)
log.info(" CatBoost βœ“")
except ImportError:
log.warning(" CatBoost not installed (pip install catboost --break-system-packages)")
return models
# ─────────────────────────────────────────────────────────────────────────────
# LAYER 2 β€” PYTORCH META-LEARNER
# ─────────────────────────────────────────────────────────────────────────────
def _meta_net(in_dim: int):
import torch.nn as nn
return nn.Sequential(
nn.Linear(in_dim, 64), nn.LayerNorm(64), nn.GELU(), nn.Dropout(0.2),
nn.Linear(64, 32), nn.GELU(), nn.Dropout(0.1),
nn.Linear(32, 3),
)
def train_meta_torch(net, X: np.ndarray, y: np.ndarray, device, epochs: int = 60):
import torch, torch.nn as nn
net = net.to(device)
opt = torch.optim.AdamW(net.parameters(), lr=1e-3, weight_decay=1e-4)
sched = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=epochs)
crit = nn.CrossEntropyLoss()
Xt = torch.FloatTensor(X).to(device)
yt = torch.LongTensor(y).to(device)
net.train()
for _ in range(epochs):
opt.zero_grad()
loss = crit(net(Xt), yt)
loss.backward()
torch.nn.utils.clip_grad_norm_(net.parameters(), 1.0)
opt.step(); sched.step()
net.eval()
return net
def predict_meta_torch(net, X: np.ndarray, device) -> np.ndarray:
import torch
with torch.no_grad():
return torch.softmax(net(torch.FloatTensor(X).to(device)), dim=1).cpu().numpy()
# ─────────────────────────────────────────────────────────────────────────────
# GLOBAL MODEL β€” train on ALL symbols combined (~662K rows)
# This is the right architecture when each symbol only has ~300 rows.
# One model learns market-wide patterns; per-symbol fine-tuning adds specificity.
# ─────────────────────────────────────────────────────────────────────────────
def build_global_dataset(
pairs: List[Tuple[str, Path]],
max_symbols: Optional[int] = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, List[str], List[str]]:
"""
Load and engineer features for all symbols, stack into one big matrix.
Returns (X, y, symbol_ids, feature_cols, symbol_list)
Uses the LAST 20% of each symbol's rows as test β€” everything else is train/val.
Symbol IDs allow per-symbol evaluation after global training.
"""
all_X, all_y, all_syms, all_dates = [], [], [], []
feature_cols_ref = None
skipped = 0
# Re-apply equity filter in case pairs came from unfiltered source
pairs = [(s,p) for s,p in pairs if not any(
x in s.upper() for x in [
'LIQUID','GILT','GSEC','TBILL','CASH','GOLD','SILVER','METAL',
'BBETF','EBBETF','BEES','NIFTY','LIQUIDADD','LIQUIDETF',
'CASHIETF','AXISBPS','LICNETF',
]
) and not s.upper().endswith(('ETF','BEES','LIQUID','ADD','IETF'))]
if max_symbols:
pairs = pairs[:max_symbols]
log.info(f" Loading {len(pairs)} equity symbols...")
for i, (sym, _) in enumerate(pairs):
if i % 200 == 0:
log.info(f" {i}/{len(pairs)} loaded, {sum(len(x) for x in all_X):,} rows so far")
try:
raw = load_candles(sym)
if raw is None:
skipped += 1
continue
df = engineer_features(raw)
fcols = feature_columns(df)
df = df.dropna(subset=fcols + [TARGET])
df = df[df[TARGET].isin([0, 1, 2])].reset_index(drop=True)
if len(df) < MIN_ROWS:
skipped += 1
continue
# Align feature columns across symbols
if feature_cols_ref is None:
feature_cols_ref = fcols
else:
fcols = feature_cols_ref # use first symbol's cols as reference
# Only use columns that exist in this df
available = [c for c in fcols if c in df.columns]
X_sym = np.zeros((len(df), len(fcols)), dtype=np.float32)
for j, fc in enumerate(fcols):
if fc in df.columns:
X_sym[:, j] = df[fc].values.astype(np.float32)
all_X.append(X_sym)
all_y.append(df[TARGET].values.astype(np.int32))
all_syms.extend([sym] * len(df))
if "date" in df.columns:
all_dates.extend(df["date"].tolist())
else:
all_dates.extend([None] * len(df))
except Exception as e:
log.debug(f" [{sym}] skipped: {e}")
skipped += 1
log.info(f" Loaded: {len(pairs)-skipped} symbols, {skipped} skipped")
if not all_X:
raise ValueError("No data loaded β€” check CLEAN_DIR path")
X = np.vstack(all_X)
y = np.concatenate(all_y)
sym_ids = np.array(all_syms)
log.info(f" Combined dataset: {X.shape[0]:,} rows Γ— {X.shape[1]} features")
cls, cnts = np.unique(y, return_counts=True)
for c, n in zip(cls, cnts):
log.info(f" class {c}: {n:,} ({n/len(y)*100:.1f}%)")
return X, y, sym_ids, feature_cols_ref or [], all_dates
def temporal_global_split(
sym_ids: np.ndarray,
all_dates: List,
test_ratio: float = 0.20,
) -> Tuple[np.ndarray, np.ndarray]:
"""
For each symbol, put last test_ratio of its rows in test, rest in train.
This prevents future leakage while keeping temporal structure.
Returns (train_mask, test_mask).
"""
n = len(sym_ids)
train_mask = np.ones(n, dtype=bool)
test_mask = np.zeros(n, dtype=bool)
symbols = np.unique(sym_ids)
for sym in symbols:
idx = np.where(sym_ids == sym)[0]
cut = int(len(idx) * (1 - test_ratio))
test_mask[idx[cut:]] = True
train_mask[idx[cut:]] = False
return train_mask, test_mask
def train_global_model(use_gpu: bool = True, max_symbols: Optional[int] = None) -> dict:
"""
Train one ensemble on all ~662K rows combined.
Saves models to data/lake/ml/p170/models/GLOBAL/
Returns metrics dict.
"""
import joblib
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import accuracy_score, classification_report
log.info(f"\n{'#'*54}")
log.info(f" GLOBAL MODEL TRAINING")
log.info(f" ~662K rows across 2150 NSE symbols")
log.info(f"{'#'*54}\n")
t0 = time.time()
# ── Load all data ─────────────────────────────────────────────────────────
pairs = discover_symbols()
X, y, sym_ids, fcols, all_dates = build_global_dataset(pairs, max_symbols=max_symbols)
# ── Temporal split per symbol ─────────────────────────────────────────────
train_mask, test_mask = temporal_global_split(sym_ids, all_dates, test_ratio=0.20)
# Use 80% of training rows for train, 20% for val (within the train set)
train_idx = np.where(train_mask)[0]
val_cut = int(len(train_idx) * 0.80)
val_idx = train_idx[val_cut:]
tr_idx = train_idx[:val_cut]
te_idx = np.where(test_mask)[0]
X_tr, y_tr = X[tr_idx], y[tr_idx]
X_val, y_val = X[val_idx], y[val_idx]
X_te, y_te = X[te_idx], y[te_idx]
log.info(f" Train: {len(X_tr):,} Val: {len(X_val):,} Test: {len(X_te):,}")
# ── Scale ─────────────────────────────────────────────────────────────────
scaler = RobustScaler()
X_tr_s = scaler.fit_transform(X_tr)
X_val_s = scaler.transform(X_val)
X_te_s = scaler.transform(X_te)
# ── Layer 1: base models ──────────────────────────────────────────────────
base = make_base_models(use_gpu)
nm = len(base)
oof_val = np.zeros((len(X_val), nm * 3), dtype=np.float32)
oof_te = np.zeros((len(X_te), nm * 3), dtype=np.float32)
scores = {}
for idx, (name, model) in enumerate(base.items()):
log.info(f" [{name}] training on {len(X_tr):,} rows...")
try:
if name == "catboost":
model.fit(X_tr_s, y_tr, eval_set=(X_val_s, y_val), verbose=100)
elif name == "xgboost":
model.fit(X_tr_s, y_tr, eval_set=[(X_val_s, y_val)],
verbose=100)
elif name == "lightgbm":
model.fit(X_tr_s, y_tr, eval_set=[(X_val_s, y_val)])
else:
model.fit(X_tr_s, y_tr)
vp = model.predict_proba(X_val_s)
tp = model.predict_proba(X_te_s)
oof_val[:, idx*3:(idx+1)*3] = vp
oof_te[:, idx*3:(idx+1)*3] = tp
acc = float(accuracy_score(y_val, np.argmax(vp, axis=1)))
scores[name] = round(acc, 4)
log.info(f" [{name}] val acc = {acc:.4f}")
except Exception as e:
log.error(f" [{name}] failed: {e}")
log.debug(traceback.format_exc())
oof_val[:, idx*3:(idx+1)*3] = 1/3
oof_te[:, idx*3:(idx+1)*3] = 1/3
# ── Layer 2: simple probability average ─────────────────────────────────
# Stacking hurts here: val and test cover different time periods per symbol,
# so weights learned on val are wrong for test. Simple average wins.
log.info(f" [ensemble] averaging {len(base)} base models...")
nm_actual = oof_te.shape[1] // 3
ens_proba = np.zeros((len(oof_te), 3), dtype=np.float32)
active = 0
for mi in range(nm_actual):
block = oof_te[:, mi*3:(mi+1)*3]
if block.std() > 0.001:
ens_proba += block
active += 1
ens_proba = ens_proba / max(active, 1)
meta_type = "simple_average"
meta_net_obj = None
log.info(f" averaged {active} active base models")
ens_pred = np.argmax(ens_proba, axis=1)
ens_acc = float(accuracy_score(y_te, ens_pred))
log.info(f" [ensemble] global test acc = {ens_acc:.4f}")
log.info(f"\n{classification_report(y_te, ens_pred, target_names=['DOWN','NEUTRAL','UP'])}")
# ── Per-symbol test accuracy ───────────────────────────────────────────────
test_syms = sym_ids[te_idx]
sym_accs = {}
for sym in np.unique(test_syms):
mask = test_syms == sym
if mask.sum() >= 5:
sym_accs[sym] = round(float(accuracy_score(y_te[mask], ens_pred[mask])), 4)
# ── Feature importance ────────────────────────────────────────────────────
top_features = []
if "xgboost" in base:
try:
imp = base["xgboost"].feature_importances_
top_idx = np.argsort(imp)[-15:][::-1]
top_features = [[fcols[i], round(float(imp[i]), 4)] for i in top_idx]
except Exception:
pass
# ── Save global model ─────────────────────────────────────────────────────
gdir = MODELS_DIR / "GLOBAL"
gdir.mkdir(exist_ok=True)
joblib.dump(scaler, gdir / "scaler.joblib")
joblib.dump(fcols, gdir / "feature_cols.joblib")
for name, model in base.items():
try:
joblib.dump(model, gdir / f"{name}.joblib")
log.info(f" saved {name}.joblib")
except Exception as e:
log.warning(f" save {name} failed: {e}")
if meta_net_obj is not None:
try:
import torch
torch.save(meta_net_obj.state_dict(), gdir / "meta_net.pt")
with open(gdir / "meta_config.json", "w") as f:
json.dump({"type": "neural_net", "input_dim": oof_val.shape[1]}, f)
except Exception:
pass
# ── Save test predictions ─────────────────────────────────────────────────
pd.DataFrame({
"symbol": test_syms,
"pred_class": ens_pred,
"prob_down": ens_proba[:, 0],
"prob_neutral": ens_proba[:, 1],
"prob_up": ens_proba[:, 2],
"actual_class": y_te,
}).to_parquet(PREDS_DIR / "GLOBAL_p170_predictions.parquet", index=False)
# ── Metrics ───────────────────────────────────────────────────────────────
elapsed = round(time.time() - t0, 1)
cls, cnts = np.unique(y_tr, return_counts=True)
# top/bottom 10 symbols by accuracy
sorted_syms = sorted(sym_accs.items(), key=lambda x: x[1], reverse=True)
top10 = sorted_syms[:10]
bot10 = sorted_syms[-10:]
metrics = {
"model": "GLOBAL",
"status": "success",
"timestamp": datetime.now().isoformat(),
"n_symbols": int(len(np.unique(sym_ids))),
"n_rows_total": int(len(X)),
"n_train": int(len(X_tr)),
"n_val": int(len(X_val)),
"n_test": int(len(X_te)),
"n_features": int(len(fcols)),
"base_models": list(base.keys()),
"meta_learner_type": meta_type,
"base_val_accuracy": scores,
"ensemble_test_accuracy": round(ens_acc, 4),
"top_features": top_features[:10],
"top10_symbols": top10,
"bottom10_symbols": bot10,
"elapsed_seconds": elapsed,
"class_distribution_train": {str(int(k)): int(v) for k, v in zip(cls, cnts)},
}
with open(METRICS_DIR / "GLOBAL_p170_metrics.json", "w") as f:
json.dump(metrics, f, indent=2)
log.info(f"\n{'='*54}")
log.info(f" GLOBAL MODEL DONE {elapsed:.0f}s")
log.info(f" Test accuracy: {ens_acc:.4f} ({len(X_te):,} rows across {len(sym_accs)} symbols)")
log.info(f" Top 5 symbols: {top10[:5]}")
log.info(f"{'='*54}\n")
return metrics
# ─────────────────────────────────────────────────────────────────────────────
# INFERENCE β€” predict today using global model
# ─────────────────────────────────────────────────────────────────────────────
def predict_today(symbol: str, use_gpu: bool = True) -> Optional[dict]:
"""Load global model and predict on the latest row for a symbol."""
import joblib
gdir = MODELS_DIR / "GLOBAL"
if not (gdir / "scaler.joblib").exists():
log.error("No global model found. Run: python p170_max_system.py --global-model")
return None
raw = load_candles(symbol)
if raw is None:
return None
df = engineer_features(raw)
fcols = joblib.load(gdir / "feature_cols.joblib")
scaler = joblib.load(gdir / "scaler.joblib")
# Latest row with all features available
last = df.iloc[-1]
X = np.zeros((1, len(fcols)), dtype=np.float32)
for i, fc in enumerate(fcols):
if fc in last.index and not pd.isna(last[fc]):
X[0, i] = float(last[fc])
X_s = scaler.transform(X)
# Collect base model probas
base_probas = []
for name in ["xgboost", "lightgbm", "catboost", "random_forest", "extra_trees"]:
mp = gdir / f"{name}.joblib"
if mp.exists():
try:
base_probas.append(joblib.load(mp).predict_proba(X_s)[0])
except Exception:
pass
if not base_probas:
return None
meta_input = np.concatenate(base_probas).reshape(1, -1).astype(np.float32)
cfg_path = gdir / "meta_config.json"
try:
import torch
if cfg_path.exists():
with open(cfg_path) as f:
cfg = json.load(f)
net = _meta_net(cfg["input_dim"])
net.load_state_dict(torch.load(gdir / "meta_net.pt", map_location="cpu"))
net.eval()
device = torch.device("cuda" if use_gpu and torch.cuda.is_available() else "cpu")
proba = predict_meta_torch(net, meta_input, device)[0]
else:
proba = np.mean(base_probas, axis=0)
except Exception:
proba = np.mean(base_probas, axis=0)
pred = int(np.argmax(proba))
label = {0: "DOWN", 1: "NEUTRAL", 2: "UP"}[pred]
return {
"symbol": symbol, "date": str(last.get("date", "latest")),
"prediction": label,
"prob_down": round(float(proba[0]), 4),
"prob_neutral": round(float(proba[1]), 4),
"prob_up": round(float(proba[2]), 4),
"confidence": round(float(proba.max()), 4),
"rsi_14": round(float(last.get("rsi_14", np.nan)), 2),
"macd_hist": round(float(last.get("macd_hist", np.nan)), 4),
"adx_14": round(float(last.get("adx_14", np.nan)), 2),
"ema_align": int(last.get("ema_align", -1)),
}
# ─────────────────────────────────────────────────────────────────────────────
# LEGACY: per-symbol batch (kept for compatibility, but global model is better)
# ─────────────────────────────────────────────────────────────────────────────
def run_batch(pairs: List[Tuple[str, Path]], use_gpu: bool = True) -> dict:
"""Thin wrapper β€” just calls train_global_model with all pairs."""
return train_global_model(use_gpu=use_gpu, max_symbols=len(pairs))
# ─────────────────────────────────────────────────────────────────────────────
# DEPENDENCY CHECK
# ─────────────────────────────────────────────────────────────────────────────
def check_deps(use_gpu: bool):
print("\n=== P170 Dependency Check ===\n")
for name, mod in [("numpy","numpy"),("pandas","pandas"),
("scikit-learn","sklearn"),("xgboost","xgboost"),("joblib","joblib")]:
try: __import__(mod); print(f" βœ“ {name}")
except: print(f" βœ— {name} (pip install {name} --break-system-packages)")
for name, mod in [("lightgbm","lightgbm"),("catboost","catboost"),("optuna","optuna")]:
try: __import__(mod); print(f" βœ“ {name} (optional)")
except: print(f" ~ {name} (optional β€” pip install {name} --break-system-packages)")
try:
import torch
cuda = torch.cuda.is_available()
dev = torch.cuda.get_device_name(0) if cuda else "CPU only"
print(f" βœ“ pytorch ({dev})")
if cuda:
print(f" GPU memory: {torch.cuda.get_device_properties(0).total_memory/1e9:.1f} GB")
except ImportError:
print(" ~ pytorch (meta-learner falls back to LogisticRegression without it)")
pairs = discover_symbols()
print(f"\nPaths:")
print(f" Clean data : {CLEAN_DIR} exists={CLEAN_DIR.exists()}")
print(f" Features : {FEAT_DIR} exists={FEAT_DIR.exists()}")
print(f" Output : {OUT}")
print(f"\nSymbols found: {len(pairs)}")
if pairs:
print(" Largest 10 (most data first):")
for sym, path in pairs[:10]:
print(f" {sym:20s} {path.stat().st_size//1024:6d} KB {path.name}")
print()
# ─────────────────────────────────────────────────────────────────────────────
# CLI
# ─────────────────────────────────────────────────────────────────────────────
def main():
p = argparse.ArgumentParser(description="P170 MAX SYSTEM")
g = p.add_mutually_exclusive_group(required=True)
g.add_argument("--global-model", action="store_true", help="Train global model on all 2150 symbols combined (recommended)")
g.add_argument("--all-symbols", action="store_true", help="Alias for --global-model")
g.add_argument("--predict", type=str, help="Predict today for a symbol using global model")
g.add_argument("--check-deps", action="store_true", help="Check dependencies and paths")
p.add_argument("--top-n", type=int, default=None, help="Limit to top N symbols by data size (useful for quick tests)")
p.add_argument("--no-gpu", action="store_true", help="Force CPU")
args = p.parse_args()
gpu = not args.no_gpu
if args.check_deps:
check_deps(gpu)
elif args.global_model or args.all_symbols:
m = train_global_model(use_gpu=gpu, max_symbols=args.top_n)
print(json.dumps(m, indent=2, default=str))
elif args.predict:
r = predict_today(args.predict.upper(), use_gpu=gpu)
print(json.dumps(r, indent=2) if r else f"No global model found. Run --global-model first.")
if __name__ == "__main__":
main()