File size: 8,981 Bytes
1cd56b6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import sys, os
import numpy as np, pandas as pd
from scipy.stats import spearmanr
import random
import warnings; warnings.filterwarnings('ignore')

sys.path.insert(0, os.path.dirname(__file__))
from backtesting.engines.v30_causal_engine import get_data, evaluate_slice, CAP, V30_PARAMS
from backtesting.engines.v36_research_engine import SECTOR_MAP, SECTORS
from backtesting.audits.v53_causal_audit import run_v53_causal

def test_1_1_ic_analysis(dc, vf):
    print("--- Test 1.1: Information Coefficient (IC) Analysis ---")
    mom_long, mom_short = 175, 21
    m_sig = (dc[vf].shift(mom_short) / dc[vf].shift(mom_long)) - 1
    fwd = dc[vf].pct_change(60).shift(-60)
    ic_vals = []
    
    for i in range(mom_long+10, len(dc)-60, 60):
        sig = m_sig.iloc[i].dropna()
        z = pd.Series(index=sig.index, dtype=float)
        for sector in SECTORS:
            stks = [t for t in sig.index if SECTOR_MAP.get(t) == sector]
            if len(stks) > 1:
                mu, sigma = sig[stks].mean(), sig[stks].std()
                z[stks] = (sig[stks] - mu) / (sigma if sigma > 1e-8 else 1e-8)
        common = z.dropna().index.intersection(fwd.iloc[i].dropna().index)
        if len(common) >= 15:
            top = z[common].nlargest(15)
            corr, _ = spearmanr(top.values, fwd.iloc[i][top.index].values)
            if not np.isnan(corr): ic_vals.append(corr)
    
    ic_mean = np.mean(ic_vals)
    ic_std = np.std(ic_vals)
    n = len(ic_vals)
    t_stat = ic_mean / (ic_std / np.sqrt(n))
    
    print(f"Mean IC: {ic_mean:.4f}")
    print(f"Std IC:  {ic_std:.4f}")
    print(f"t-stat:  {t_stat:.2f}")
    if t_stat > 2.0:
        print("Result: PASS (Statistically significant edge)")
    else:
        print("Result: FAIL/WEAK (Check signal power)")
    return t_stat

def test_1_2_regime_ic_decay(dc, spy, vf):
    print("\n--- Test 1.2: Regime-Conditioned IC Decay Analysis ---")
    mom_long, mom_short = 175, 21
    m_sig = (dc[vf].shift(mom_short) / dc[vf].shift(mom_long)) - 1
    fwd = dc[vf].pct_change(60).shift(-60)
    sma = spy.rolling(200).mean()
    
    periods = {
        "Period 1 (2008-2012)": ("2008-01-01", "2012-12-31"),
        "Period 2 (2013-2018)": ("2013-01-01", "2018-12-31"),
        "Period 3 (2019-2025)": ("2019-01-01", "2025-12-31")
    }
    
    # Calculate daily IC for top 15
    ic_series = []
    regimes = []
    dates = []
    
    for i in range(mom_long+10, len(dc)-60, 60):
        sig = m_sig.iloc[i].dropna()
        z = pd.Series(index=sig.index, dtype=float)
        for sector in SECTORS:
            stks = [t for t in sig.index if SECTOR_MAP.get(t) == sector]
            if len(stks) > 1:
                mu, sigma = sig[stks].mean(), sig[stks].std()
                z[stks] = (sig[stks] - mu) / (sigma if sigma > 1e-8 else 1e-8)
        common = z.dropna().index.intersection(fwd.iloc[i].dropna().index)
        if len(common) >= 15:
            top = z[common].nlargest(15)
            corr, _ = spearmanr(top.values, fwd.iloc[i][top.index].values)
            if not np.isnan(corr): 
                dt = dc.index[i]
                ic_series.append(corr)
                dates.append(dt)
                regimes.append("ON" if spy.iloc[i] > sma.iloc[i] else "OFF")
                
    ic_df = pd.DataFrame({'IC': ic_series, 'Regime': regimes}, index=dates)
    
    for name, (start, end) in periods.items():
        mask = (ic_df.index >= start) & (ic_df.index <= end)
        sub = ic_df[mask]['IC']
        if len(sub) > 0:
            t_stat = sub.mean() / (sub.std() / np.sqrt(len(sub)))
            print(f"{name}: t-stat = {t_stat:.2f} (n={len(sub)})")
            
    print("\nBy Regime:")
    for reg in ["ON", "OFF"]:
        sub = ic_df[ic_df['Regime'] == reg]['IC']
        if len(sub) > 0:
            t_stat = sub.mean() / (sub.std() / np.sqrt(len(sub)))
            print(f"Risk-{reg} days: t-stat = {t_stat:.2f} (n={len(sub)})")

def run_variant(dc, spy, vf, daily_ret, mode='random'):
    rebal_days, vol_target, riskoff_haircut = 60, 0.18, 0.50
    sma_lookback, mom_long, mom_short, top_n = 200, 175, 21, 15
    txn_frac = 20 / 10000.0
    
    price_mom = (dc[vf].shift(mom_short) / dc[vf].shift(mom_long)) - 1
    sma = spy.rolling(sma_lookback).mean()
    
    nav, paper_nav = CAP, CAP
    peak_paper_nav, trough_paper_nav = CAP, CAP
    stop_active = False
    
    pick_tks = []
    current_weights = pd.Series(dtype=float)
    port_rets, hist = [], []
    days = 0
    
    for i in range(1, len(dc)):
        if len(port_rets) >= 21:
            w_window = port_rets[-60:] if len(port_rets) >= 60 else port_rets[-21:]
            vs = vol_target / (np.std(w_window)*np.sqrt(252)+1e-8)
        else: vs = 0.5
        
        sp, sm = spy.values[i-1], sma.values[i-1]
        if pd.isna(sm) or sp <= sm: vs *= riskoff_haircut
        vs = float(np.clip(vs, 0.05, 1.0))
        
        day_ret = 0.0
        if pick_tks:
            lr = daily_ret.iloc[i][[t for t in pick_tks if t in daily_ret.columns]].dropna()
            if not lr.empty:
                wt = current_weights.reindex(lr.index).fillna(0)
                if wt.sum() > 0: wt = wt / wt.sum()
                day_ret = (lr * wt).sum() * vs
                
        paper_nav *= (1 + day_ret)
        paper_nav -= paper_nav * txn_frac * 2 / rebal_days * vs
        paper_nav = max(paper_nav, 0.01)
        
        if not stop_active:
            nav *= (1 + day_ret)
            nav -= nav * txn_frac * 2 / rebal_days * vs
            nav = max(nav, 0.01)
            
        port_rets.append(day_ret)
        hist.append(nav)
        
        peak_paper_nav = max(peak_paper_nav, paper_nav)
        paper_dd = (paper_nav / peak_paper_nav) - 1.0
        
        if not stop_active:
            if paper_dd <= -0.15:
                stop_active = True
                trough_paper_nav = paper_nav
                nav -= nav * txn_frac
        else:
            trough_paper_nav = min(trough_paper_nav, paper_nav)
            if paper_nav >= trough_paper_nav * 1.05:
                stop_active = False
                peak_paper_nav = paper_nav
                nav -= nav * txn_frac
                
        days += 1
        if days >= rebal_days:
            days = 0
            avail = price_mom.iloc[i].dropna().index.tolist()
            if not avail: continue
            
            if mode == 'random':
                new_picks = random.sample(avail, min(top_n, len(avail)))
                current_weights = pd.Series(1.0/len(new_picks), index=new_picks)
            elif mode == 'equal':
                # For EW, we could buy everything, but we will pick top N just for EW equivalent or all? 
                # Framework says "Equal-weight all stocks in universe"
                new_picks = avail
                current_weights = pd.Series(1.0/len(new_picks), index=new_picks)
            else:
                new_picks = avail
                
            pick_tks = new_picks
            
    return pd.Series(hist, index=dc.index[1:len(hist)+1])

def test_1_3_null_hypothesis(dc, spy, vf, daily_ret):
    print("\n--- Test 1.3: Null Hypothesis Randomization Test ---")
    
    # Original
    c_orig = run_v53_causal(dc, spy, vf, daily_ret, **V30_PARAMS)
    m_orig = evaluate_slice(c_orig, "2008-01-01", "2025-12-31")
    orig_sharpe = m_orig['sharpe']
    print(f"Original Strategy Sharpe: {orig_sharpe:.4f}")
    
    # Equal Weight
    c_ew = run_variant(dc, spy, vf, daily_ret, mode='equal')
    m_ew = evaluate_slice(c_ew, "2008-01-01", "2025-12-31")
    ew_sharpe = m_ew['sharpe']
    print(f"Equal-Weight Universe Sharpe: {ew_sharpe:.4f}")
    
    # Random (Run 100 iterations as per framework)
    rand_sharpes = []
    print("Running 100 Random iterations...", end="", flush=True)
    for _ in range(100):
        c_rand = run_variant(dc, spy, vf, daily_ret, mode='random')
        m_rand = evaluate_slice(c_rand, "2008-01-01", "2025-12-31")
        rand_sharpes.append(m_rand['sharpe'])
        if _ % 10 == 0: print(".", end="", flush=True)
    print()
    
    rand_median = np.median(rand_sharpes)
    print(f"Random Picks Median Sharpe: {rand_median:.4f}")
    print(f"Random Picks Max Sharpe:    {np.max(rand_sharpes):.4f}")
    
    excess = orig_sharpe - rand_median
    print(f"\nExcess Sharpe vs Random: +{excess:.4f}")
    
    if excess > 0.15:
        print("Result: PASS (Signal adds strong value beyond risk management)")
    elif excess > 0.10:
        print("Result: WEAK PASS (Signal adds marginal value)")
    else:
        print("Result: FAIL (Signal has no edge over random/risk management)")

if __name__ == "__main__":
    print("========================================")
    print(" V53 FRAMEWORK VALIDATION - PHASE 1")
    print("========================================")
    dc, spy, vf, daily_ret = get_data()
    
    test_1_1_ic_analysis(dc, vf)
    test_1_2_regime_ic_decay(dc, spy, vf)
    test_1_3_null_hypothesis(dc, spy, vf, daily_ret)