import sys, os import numpy as np, pandas as pd from scipy.stats import spearmanr import random import warnings; warnings.filterwarnings('ignore') sys.path.insert(0, os.path.dirname(__file__)) from backtesting.engines.v30_causal_engine import get_data, evaluate_slice, CAP, V30_PARAMS from backtesting.engines.v36_research_engine import SECTOR_MAP, SECTORS from backtesting.audits.v53_causal_audit import run_v53_causal def test_1_1_ic_analysis(dc, vf): print("--- Test 1.1: Information Coefficient (IC) Analysis ---") mom_long, mom_short = 175, 21 m_sig = (dc[vf].shift(mom_short) / dc[vf].shift(mom_long)) - 1 fwd = dc[vf].pct_change(60).shift(-60) ic_vals = [] for i in range(mom_long+10, len(dc)-60, 60): sig = m_sig.iloc[i].dropna() z = pd.Series(index=sig.index, dtype=float) for sector in SECTORS: stks = [t for t in sig.index if SECTOR_MAP.get(t) == sector] if len(stks) > 1: mu, sigma = sig[stks].mean(), sig[stks].std() z[stks] = (sig[stks] - mu) / (sigma if sigma > 1e-8 else 1e-8) common = z.dropna().index.intersection(fwd.iloc[i].dropna().index) if len(common) >= 15: top = z[common].nlargest(15) corr, _ = spearmanr(top.values, fwd.iloc[i][top.index].values) if not np.isnan(corr): ic_vals.append(corr) ic_mean = np.mean(ic_vals) ic_std = np.std(ic_vals) n = len(ic_vals) t_stat = ic_mean / (ic_std / np.sqrt(n)) print(f"Mean IC: {ic_mean:.4f}") print(f"Std IC: {ic_std:.4f}") print(f"t-stat: {t_stat:.2f}") if t_stat > 2.0: print("Result: PASS (Statistically significant edge)") else: print("Result: FAIL/WEAK (Check signal power)") return t_stat def test_1_2_regime_ic_decay(dc, spy, vf): print("\n--- Test 1.2: Regime-Conditioned IC Decay Analysis ---") mom_long, mom_short = 175, 21 m_sig = (dc[vf].shift(mom_short) / dc[vf].shift(mom_long)) - 1 fwd = dc[vf].pct_change(60).shift(-60) sma = spy.rolling(200).mean() periods = { "Period 1 (2008-2012)": ("2008-01-01", "2012-12-31"), "Period 2 (2013-2018)": ("2013-01-01", "2018-12-31"), "Period 3 (2019-2025)": ("2019-01-01", "2025-12-31") } # Calculate daily IC for top 15 ic_series = [] regimes = [] dates = [] for i in range(mom_long+10, len(dc)-60, 60): sig = m_sig.iloc[i].dropna() z = pd.Series(index=sig.index, dtype=float) for sector in SECTORS: stks = [t for t in sig.index if SECTOR_MAP.get(t) == sector] if len(stks) > 1: mu, sigma = sig[stks].mean(), sig[stks].std() z[stks] = (sig[stks] - mu) / (sigma if sigma > 1e-8 else 1e-8) common = z.dropna().index.intersection(fwd.iloc[i].dropna().index) if len(common) >= 15: top = z[common].nlargest(15) corr, _ = spearmanr(top.values, fwd.iloc[i][top.index].values) if not np.isnan(corr): dt = dc.index[i] ic_series.append(corr) dates.append(dt) regimes.append("ON" if spy.iloc[i] > sma.iloc[i] else "OFF") ic_df = pd.DataFrame({'IC': ic_series, 'Regime': regimes}, index=dates) for name, (start, end) in periods.items(): mask = (ic_df.index >= start) & (ic_df.index <= end) sub = ic_df[mask]['IC'] if len(sub) > 0: t_stat = sub.mean() / (sub.std() / np.sqrt(len(sub))) print(f"{name}: t-stat = {t_stat:.2f} (n={len(sub)})") print("\nBy Regime:") for reg in ["ON", "OFF"]: sub = ic_df[ic_df['Regime'] == reg]['IC'] if len(sub) > 0: t_stat = sub.mean() / (sub.std() / np.sqrt(len(sub))) print(f"Risk-{reg} days: t-stat = {t_stat:.2f} (n={len(sub)})") def run_variant(dc, spy, vf, daily_ret, mode='random'): rebal_days, vol_target, riskoff_haircut = 60, 0.18, 0.50 sma_lookback, mom_long, mom_short, top_n = 200, 175, 21, 15 txn_frac = 20 / 10000.0 price_mom = (dc[vf].shift(mom_short) / dc[vf].shift(mom_long)) - 1 sma = spy.rolling(sma_lookback).mean() nav, paper_nav = CAP, CAP peak_paper_nav, trough_paper_nav = CAP, CAP stop_active = False pick_tks = [] current_weights = pd.Series(dtype=float) port_rets, hist = [], [] days = 0 for i in range(1, len(dc)): if len(port_rets) >= 21: w_window = port_rets[-60:] if len(port_rets) >= 60 else port_rets[-21:] vs = vol_target / (np.std(w_window)*np.sqrt(252)+1e-8) else: vs = 0.5 sp, sm = spy.values[i-1], sma.values[i-1] if pd.isna(sm) or sp <= sm: vs *= riskoff_haircut vs = float(np.clip(vs, 0.05, 1.0)) day_ret = 0.0 if pick_tks: lr = daily_ret.iloc[i][[t for t in pick_tks if t in daily_ret.columns]].dropna() if not lr.empty: wt = current_weights.reindex(lr.index).fillna(0) if wt.sum() > 0: wt = wt / wt.sum() day_ret = (lr * wt).sum() * vs paper_nav *= (1 + day_ret) paper_nav -= paper_nav * txn_frac * 2 / rebal_days * vs paper_nav = max(paper_nav, 0.01) if not stop_active: nav *= (1 + day_ret) nav -= nav * txn_frac * 2 / rebal_days * vs nav = max(nav, 0.01) port_rets.append(day_ret) hist.append(nav) peak_paper_nav = max(peak_paper_nav, paper_nav) paper_dd = (paper_nav / peak_paper_nav) - 1.0 if not stop_active: if paper_dd <= -0.15: stop_active = True trough_paper_nav = paper_nav nav -= nav * txn_frac else: trough_paper_nav = min(trough_paper_nav, paper_nav) if paper_nav >= trough_paper_nav * 1.05: stop_active = False peak_paper_nav = paper_nav nav -= nav * txn_frac days += 1 if days >= rebal_days: days = 0 avail = price_mom.iloc[i].dropna().index.tolist() if not avail: continue if mode == 'random': new_picks = random.sample(avail, min(top_n, len(avail))) current_weights = pd.Series(1.0/len(new_picks), index=new_picks) elif mode == 'equal': # For EW, we could buy everything, but we will pick top N just for EW equivalent or all? # Framework says "Equal-weight all stocks in universe" new_picks = avail current_weights = pd.Series(1.0/len(new_picks), index=new_picks) else: new_picks = avail pick_tks = new_picks return pd.Series(hist, index=dc.index[1:len(hist)+1]) def test_1_3_null_hypothesis(dc, spy, vf, daily_ret): print("\n--- Test 1.3: Null Hypothesis Randomization Test ---") # Original c_orig = run_v53_causal(dc, spy, vf, daily_ret, **V30_PARAMS) m_orig = evaluate_slice(c_orig, "2008-01-01", "2025-12-31") orig_sharpe = m_orig['sharpe'] print(f"Original Strategy Sharpe: {orig_sharpe:.4f}") # Equal Weight c_ew = run_variant(dc, spy, vf, daily_ret, mode='equal') m_ew = evaluate_slice(c_ew, "2008-01-01", "2025-12-31") ew_sharpe = m_ew['sharpe'] print(f"Equal-Weight Universe Sharpe: {ew_sharpe:.4f}") # Random (Run 100 iterations as per framework) rand_sharpes = [] print("Running 100 Random iterations...", end="", flush=True) for _ in range(100): c_rand = run_variant(dc, spy, vf, daily_ret, mode='random') m_rand = evaluate_slice(c_rand, "2008-01-01", "2025-12-31") rand_sharpes.append(m_rand['sharpe']) if _ % 10 == 0: print(".", end="", flush=True) print() rand_median = np.median(rand_sharpes) print(f"Random Picks Median Sharpe: {rand_median:.4f}") print(f"Random Picks Max Sharpe: {np.max(rand_sharpes):.4f}") excess = orig_sharpe - rand_median print(f"\nExcess Sharpe vs Random: +{excess:.4f}") if excess > 0.15: print("Result: PASS (Signal adds strong value beyond risk management)") elif excess > 0.10: print("Result: WEAK PASS (Signal adds marginal value)") else: print("Result: FAIL (Signal has no edge over random/risk management)") if __name__ == "__main__": print("========================================") print(" V53 FRAMEWORK VALIDATION - PHASE 1") print("========================================") dc, spy, vf, daily_ret = get_data() test_1_1_ic_analysis(dc, vf) test_1_2_regime_ic_decay(dc, spy, vf) test_1_3_null_hypothesis(dc, spy, vf, daily_ret)