Spaces:
Running
Running
| import sys, os | |
| import numpy as np, pandas as pd | |
| from scipy.stats import spearmanr | |
| import random | |
| import warnings; warnings.filterwarnings('ignore') | |
| sys.path.insert(0, os.path.dirname(__file__)) | |
| from backtesting.engines.v30_causal_engine import get_data, evaluate_slice, CAP, V30_PARAMS | |
| from backtesting.engines.v36_research_engine import SECTOR_MAP, SECTORS | |
| from backtesting.audits.v53_causal_audit import run_v53_causal | |
| def test_1_1_ic_analysis(dc, vf): | |
| print("--- Test 1.1: Information Coefficient (IC) Analysis ---") | |
| mom_long, mom_short = 175, 21 | |
| m_sig = (dc[vf].shift(mom_short) / dc[vf].shift(mom_long)) - 1 | |
| fwd = dc[vf].pct_change(60).shift(-60) | |
| ic_vals = [] | |
| for i in range(mom_long+10, len(dc)-60, 60): | |
| sig = m_sig.iloc[i].dropna() | |
| z = pd.Series(index=sig.index, dtype=float) | |
| for sector in SECTORS: | |
| stks = [t for t in sig.index if SECTOR_MAP.get(t) == sector] | |
| if len(stks) > 1: | |
| mu, sigma = sig[stks].mean(), sig[stks].std() | |
| z[stks] = (sig[stks] - mu) / (sigma if sigma > 1e-8 else 1e-8) | |
| common = z.dropna().index.intersection(fwd.iloc[i].dropna().index) | |
| if len(common) >= 15: | |
| top = z[common].nlargest(15) | |
| corr, _ = spearmanr(top.values, fwd.iloc[i][top.index].values) | |
| if not np.isnan(corr): ic_vals.append(corr) | |
| ic_mean = np.mean(ic_vals) | |
| ic_std = np.std(ic_vals) | |
| n = len(ic_vals) | |
| t_stat = ic_mean / (ic_std / np.sqrt(n)) | |
| print(f"Mean IC: {ic_mean:.4f}") | |
| print(f"Std IC: {ic_std:.4f}") | |
| print(f"t-stat: {t_stat:.2f}") | |
| if t_stat > 2.0: | |
| print("Result: PASS (Statistically significant edge)") | |
| else: | |
| print("Result: FAIL/WEAK (Check signal power)") | |
| return t_stat | |
| def test_1_2_regime_ic_decay(dc, spy, vf): | |
| print("\n--- Test 1.2: Regime-Conditioned IC Decay Analysis ---") | |
| mom_long, mom_short = 175, 21 | |
| m_sig = (dc[vf].shift(mom_short) / dc[vf].shift(mom_long)) - 1 | |
| fwd = dc[vf].pct_change(60).shift(-60) | |
| sma = spy.rolling(200).mean() | |
| periods = { | |
| "Period 1 (2008-2012)": ("2008-01-01", "2012-12-31"), | |
| "Period 2 (2013-2018)": ("2013-01-01", "2018-12-31"), | |
| "Period 3 (2019-2025)": ("2019-01-01", "2025-12-31") | |
| } | |
| # Calculate daily IC for top 15 | |
| ic_series = [] | |
| regimes = [] | |
| dates = [] | |
| for i in range(mom_long+10, len(dc)-60, 60): | |
| sig = m_sig.iloc[i].dropna() | |
| z = pd.Series(index=sig.index, dtype=float) | |
| for sector in SECTORS: | |
| stks = [t for t in sig.index if SECTOR_MAP.get(t) == sector] | |
| if len(stks) > 1: | |
| mu, sigma = sig[stks].mean(), sig[stks].std() | |
| z[stks] = (sig[stks] - mu) / (sigma if sigma > 1e-8 else 1e-8) | |
| common = z.dropna().index.intersection(fwd.iloc[i].dropna().index) | |
| if len(common) >= 15: | |
| top = z[common].nlargest(15) | |
| corr, _ = spearmanr(top.values, fwd.iloc[i][top.index].values) | |
| if not np.isnan(corr): | |
| dt = dc.index[i] | |
| ic_series.append(corr) | |
| dates.append(dt) | |
| regimes.append("ON" if spy.iloc[i] > sma.iloc[i] else "OFF") | |
| ic_df = pd.DataFrame({'IC': ic_series, 'Regime': regimes}, index=dates) | |
| for name, (start, end) in periods.items(): | |
| mask = (ic_df.index >= start) & (ic_df.index <= end) | |
| sub = ic_df[mask]['IC'] | |
| if len(sub) > 0: | |
| t_stat = sub.mean() / (sub.std() / np.sqrt(len(sub))) | |
| print(f"{name}: t-stat = {t_stat:.2f} (n={len(sub)})") | |
| print("\nBy Regime:") | |
| for reg in ["ON", "OFF"]: | |
| sub = ic_df[ic_df['Regime'] == reg]['IC'] | |
| if len(sub) > 0: | |
| t_stat = sub.mean() / (sub.std() / np.sqrt(len(sub))) | |
| print(f"Risk-{reg} days: t-stat = {t_stat:.2f} (n={len(sub)})") | |
| def run_variant(dc, spy, vf, daily_ret, mode='random'): | |
| rebal_days, vol_target, riskoff_haircut = 60, 0.18, 0.50 | |
| sma_lookback, mom_long, mom_short, top_n = 200, 175, 21, 15 | |
| txn_frac = 20 / 10000.0 | |
| price_mom = (dc[vf].shift(mom_short) / dc[vf].shift(mom_long)) - 1 | |
| sma = spy.rolling(sma_lookback).mean() | |
| nav, paper_nav = CAP, CAP | |
| peak_paper_nav, trough_paper_nav = CAP, CAP | |
| stop_active = False | |
| pick_tks = [] | |
| current_weights = pd.Series(dtype=float) | |
| port_rets, hist = [], [] | |
| days = 0 | |
| for i in range(1, len(dc)): | |
| if len(port_rets) >= 21: | |
| w_window = port_rets[-60:] if len(port_rets) >= 60 else port_rets[-21:] | |
| vs = vol_target / (np.std(w_window)*np.sqrt(252)+1e-8) | |
| else: vs = 0.5 | |
| sp, sm = spy.values[i-1], sma.values[i-1] | |
| if pd.isna(sm) or sp <= sm: vs *= riskoff_haircut | |
| vs = float(np.clip(vs, 0.05, 1.0)) | |
| day_ret = 0.0 | |
| if pick_tks: | |
| lr = daily_ret.iloc[i][[t for t in pick_tks if t in daily_ret.columns]].dropna() | |
| if not lr.empty: | |
| wt = current_weights.reindex(lr.index).fillna(0) | |
| if wt.sum() > 0: wt = wt / wt.sum() | |
| day_ret = (lr * wt).sum() * vs | |
| paper_nav *= (1 + day_ret) | |
| paper_nav -= paper_nav * txn_frac * 2 / rebal_days * vs | |
| paper_nav = max(paper_nav, 0.01) | |
| if not stop_active: | |
| nav *= (1 + day_ret) | |
| nav -= nav * txn_frac * 2 / rebal_days * vs | |
| nav = max(nav, 0.01) | |
| port_rets.append(day_ret) | |
| hist.append(nav) | |
| peak_paper_nav = max(peak_paper_nav, paper_nav) | |
| paper_dd = (paper_nav / peak_paper_nav) - 1.0 | |
| if not stop_active: | |
| if paper_dd <= -0.15: | |
| stop_active = True | |
| trough_paper_nav = paper_nav | |
| nav -= nav * txn_frac | |
| else: | |
| trough_paper_nav = min(trough_paper_nav, paper_nav) | |
| if paper_nav >= trough_paper_nav * 1.05: | |
| stop_active = False | |
| peak_paper_nav = paper_nav | |
| nav -= nav * txn_frac | |
| days += 1 | |
| if days >= rebal_days: | |
| days = 0 | |
| avail = price_mom.iloc[i].dropna().index.tolist() | |
| if not avail: continue | |
| if mode == 'random': | |
| new_picks = random.sample(avail, min(top_n, len(avail))) | |
| current_weights = pd.Series(1.0/len(new_picks), index=new_picks) | |
| elif mode == 'equal': | |
| # For EW, we could buy everything, but we will pick top N just for EW equivalent or all? | |
| # Framework says "Equal-weight all stocks in universe" | |
| new_picks = avail | |
| current_weights = pd.Series(1.0/len(new_picks), index=new_picks) | |
| else: | |
| new_picks = avail | |
| pick_tks = new_picks | |
| return pd.Series(hist, index=dc.index[1:len(hist)+1]) | |
| def test_1_3_null_hypothesis(dc, spy, vf, daily_ret): | |
| print("\n--- Test 1.3: Null Hypothesis Randomization Test ---") | |
| # Original | |
| c_orig = run_v53_causal(dc, spy, vf, daily_ret, **V30_PARAMS) | |
| m_orig = evaluate_slice(c_orig, "2008-01-01", "2025-12-31") | |
| orig_sharpe = m_orig['sharpe'] | |
| print(f"Original Strategy Sharpe: {orig_sharpe:.4f}") | |
| # Equal Weight | |
| c_ew = run_variant(dc, spy, vf, daily_ret, mode='equal') | |
| m_ew = evaluate_slice(c_ew, "2008-01-01", "2025-12-31") | |
| ew_sharpe = m_ew['sharpe'] | |
| print(f"Equal-Weight Universe Sharpe: {ew_sharpe:.4f}") | |
| # Random (Run 100 iterations as per framework) | |
| rand_sharpes = [] | |
| print("Running 100 Random iterations...", end="", flush=True) | |
| for _ in range(100): | |
| c_rand = run_variant(dc, spy, vf, daily_ret, mode='random') | |
| m_rand = evaluate_slice(c_rand, "2008-01-01", "2025-12-31") | |
| rand_sharpes.append(m_rand['sharpe']) | |
| if _ % 10 == 0: print(".", end="", flush=True) | |
| print() | |
| rand_median = np.median(rand_sharpes) | |
| print(f"Random Picks Median Sharpe: {rand_median:.4f}") | |
| print(f"Random Picks Max Sharpe: {np.max(rand_sharpes):.4f}") | |
| excess = orig_sharpe - rand_median | |
| print(f"\nExcess Sharpe vs Random: +{excess:.4f}") | |
| if excess > 0.15: | |
| print("Result: PASS (Signal adds strong value beyond risk management)") | |
| elif excess > 0.10: | |
| print("Result: WEAK PASS (Signal adds marginal value)") | |
| else: | |
| print("Result: FAIL (Signal has no edge over random/risk management)") | |
| if __name__ == "__main__": | |
| print("========================================") | |
| print(" V53 FRAMEWORK VALIDATION - PHASE 1") | |
| print("========================================") | |
| dc, spy, vf, daily_ret = get_data() | |
| test_1_1_ic_analysis(dc, vf) | |
| test_1_2_regime_ic_decay(dc, spy, vf) | |
| test_1_3_null_hypothesis(dc, spy, vf, daily_ret) | |