Spaces:
Running
Running
| # evaluate.py | |
| import json | |
| import os | |
| import shutil | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime | |
| import config | |
| from data_download import load_local | |
| from preprocess import run_preprocessing | |
| import model_a, model_b, model_c | |
| # βββ HF download helper βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def download_from_hf_if_needed(): | |
| """Download data + weights from HF Dataset if not present locally.""" | |
| try: | |
| from huggingface_hub import HfApi, hf_hub_download | |
| token = config.HF_TOKEN or None | |
| # Data parquets | |
| os.makedirs(config.DATA_DIR, exist_ok=True) | |
| for f in ["etf_price","etf_ret","etf_vol", | |
| "bench_price","bench_ret","bench_vol","macro"]: | |
| local = os.path.join(config.DATA_DIR, f"{f}.parquet") | |
| if not os.path.exists(local): | |
| try: | |
| dl = hf_hub_download( | |
| repo_id=config.HF_DATASET_REPO, | |
| filename=f"data/{f}.parquet", | |
| repo_type="dataset", token=token) | |
| shutil.copy(dl, local) | |
| print(f" Downloaded data/{f}.parquet") | |
| except Exception as e: | |
| print(f" Warning data/{f}: {e}") | |
| # Model weights | |
| os.makedirs(config.MODELS_DIR, exist_ok=True) | |
| api = HfApi(token=token) | |
| files = api.list_repo_files( | |
| repo_id=config.HF_DATASET_REPO, | |
| repo_type="dataset", token=token) | |
| for f in files: | |
| if f.startswith("models/") and f.endswith((".keras",".pkl",".json")): | |
| local = f | |
| if not os.path.exists(local): | |
| os.makedirs(os.path.dirname(local), exist_ok=True) | |
| try: | |
| dl = hf_hub_download( | |
| repo_id=config.HF_DATASET_REPO, | |
| filename=f, repo_type="dataset", token=token) | |
| shutil.copy(dl, local) | |
| print(f" Downloaded {f}") | |
| except Exception as e: | |
| print(f" Warning {f}: {e}") | |
| except Exception as e: | |
| print(f" WARNING: HF download failed: {e}") | |
| # βββ Signal generation ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def raw_signals(model, prep, is_dual=False): | |
| X_te = prep["X_te"] | |
| if is_dual: | |
| n = prep["n_etf_features"] | |
| inputs = [X_te[:, :, :n], X_te[:, :, n:]] | |
| else: | |
| inputs = X_te | |
| preds = model.predict(inputs, verbose=0) # (N, 5) | |
| # Diagnostic | |
| pred_std = preds.std(axis=0).mean() | |
| print(f" Raw pred std per ETF: {preds.std(axis=0).round(6)}") | |
| if pred_std < 1e-4: | |
| print(f" WARNING: Near-uniform predictions (std={pred_std:.6f}) β possible weight/scaler mismatch") | |
| return preds | |
| def softmax_probs(preds, temperature=1.0): | |
| """ | |
| Softmax probabilities. Models now output softmax directly (classification). | |
| temperature=1.0 = pass-through. Left as parameter for legacy compatibility. | |
| """ | |
| preds = np.array(preds) | |
| # If model already outputs softmax (sums to 1), return as-is | |
| row_sums = preds.sum(axis=1) | |
| if np.allclose(row_sums, 1.0, atol=0.01): | |
| return np.clip(preds, 0, 1) | |
| # Otherwise apply softmax (legacy regression models) | |
| scaled = preds / (temperature + 1e-8) | |
| e = np.exp(scaled - scaled.max(axis=1, keepdims=True)) | |
| return e / e.sum(axis=1, keepdims=True) | |
| def compute_z_scores(probs): | |
| """ | |
| Per-row Z-score: how many std devs is the top ETF above the row mean? | |
| Returns array of shape (N,) | |
| """ | |
| top = probs.max(axis=1) # (N,) | |
| mu = probs.mean(axis=1) # (N,) | |
| sigma = probs.std(axis=1) + 1e-8 # (N,) | |
| return (top - mu) / sigma # (N,) | |
| # βββ TSL backtest βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def backtest(probs, dates, etf_returns, tbill_series, | |
| fee_bps=10, tsl_pct=10.0, z_reentry=1.1): | |
| """ | |
| Day-by-day backtest with TSL + Z-score re-entry. | |
| Key fix: TSL fires end-of-day, re-entry checked next day. | |
| """ | |
| z_scores = compute_z_scores(probs) | |
| records = [] | |
| in_cash = False | |
| prev_ret = 0.0 | |
| prev2_ret = 0.0 | |
| last_signal= None | |
| tsl_days = 0 # days spent in CASH after TSL | |
| for i in range(len(probs)): | |
| date = pd.Timestamp(dates[i]) | |
| prob = probs[i] | |
| z = float(z_scores[i]) | |
| top_i = int(np.argmax(prob)) | |
| etf = config.ETFS[top_i] | |
| conf = float(prob[top_i]) | |
| # 2-day cumulative return (previous 2 days) | |
| two_day_cumul_pct = (prev_ret + prev2_ret) * 100 | |
| # ββ TSL trigger (must be out of CASH to trigger) βββββββββββββββββββ | |
| if not in_cash and two_day_cumul_pct <= -tsl_pct: | |
| in_cash = True | |
| tsl_days = 0 | |
| # ββ Z-score re-entry (only after at least 1 full CASH day) ββββββββ | |
| if in_cash and tsl_days >= 1 and z >= z_reentry: | |
| in_cash = False | |
| if in_cash: | |
| tsl_days += 1 | |
| # ββ Get actual return βββββββββββββββββββββββββββββββββββββββββββββ | |
| if date in etf_returns.index: | |
| if in_cash: | |
| tbill_rate = float(tbill_series.get(date, 3.6)) | |
| gross_ret = (tbill_rate / 100) / 252 | |
| mode = "π΅ CASH" | |
| signal = "CASH" | |
| else: | |
| gross_ret = float(etf_returns.loc[date, etf]) \ | |
| if etf in etf_returns.columns else 0.0 | |
| fee_cost = (fee_bps / 10000) if etf != last_signal else 0.0 | |
| gross_ret -= fee_cost | |
| mode = "π ETF" | |
| signal = etf | |
| last_signal= etf | |
| else: | |
| gross_ret = 0.0 | |
| mode = "π΅ CASH" if in_cash else "π ETF" | |
| signal = "CASH" if in_cash else etf | |
| records.append(dict( | |
| Date = str(date.date()), | |
| Signal = signal, | |
| Confidence = round(conf, 4), | |
| Z_Score = round(z, 4), | |
| Two_Day_Cumul_Pct = round(two_day_cumul_pct, 2), | |
| Mode = mode, | |
| Net_Return = round(gross_ret, 6), | |
| TSL_Triggered = in_cash, | |
| )) | |
| prev2_ret = prev_ret | |
| prev_ret = gross_ret | |
| prev_ret = gross_ret | |
| df = pd.DataFrame(records) | |
| df["Cumulative"] = (1 + df["Net_Return"]).cumprod() | |
| return df | |
| # βββ Performance metrics ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_metrics(bt, bench_ret, tbill_series): | |
| rets = bt["Net_Return"].values | |
| dates = pd.to_datetime(bt["Date"]) | |
| n_days = len(rets) | |
| total = float((1 + pd.Series(rets)).prod()) | |
| ann_ret = (total ** (252 / n_days) - 1) * 100 | |
| tbill_daily = tbill_series.reindex(dates).ffill().fillna(3.6) / 100 / 252 | |
| excess = rets - tbill_daily.values | |
| sharpe = float((excess.mean() / (excess.std() + 1e-8)) * np.sqrt(252)) | |
| cum = np.cumprod(1 + rets) | |
| peak = np.maximum.accumulate(cum) | |
| dd = (cum - peak) / peak | |
| max_dd = float(dd.min()) * 100 | |
| max_daily_dd = float(rets.min()) * 100 | |
| signs = np.sign(rets) | |
| hit_15 = float(pd.Series(signs).rolling(15).apply( | |
| lambda x: (x > 0).mean()).mean()) | |
| # SPY benchmark | |
| spy_dates = bench_ret.index.intersection(dates) | |
| spy_rets = bench_ret.loc[spy_dates, "SPY"].values \ | |
| if "SPY" in bench_ret.columns else np.zeros(1) | |
| spy_total = float((1 + pd.Series(spy_rets)).prod()) | |
| spy_ann = (spy_total ** (252 / max(len(spy_rets), 1)) - 1) * 100 | |
| # CASH days count | |
| cash_days = int((bt["Mode"] == "CASH").sum()) | |
| return dict( | |
| ann_return = round(ann_ret, 2), | |
| sharpe = round(sharpe, 3), | |
| hit_ratio_15d = round(hit_15, 3), | |
| max_drawdown = round(max_dd, 2), | |
| max_daily_dd = round(max_daily_dd, 2), | |
| vs_spy = round(ann_ret - spy_ann, 2), | |
| cash_days = cash_days, | |
| ) | |
| # βββ AR(1) baseline βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def ar1_backtest(etf_returns, test_dates): | |
| records = [] | |
| dates_dt = pd.to_datetime(test_dates) | |
| df = etf_returns[etf_returns.index.isin(dates_dt)].copy() | |
| prev = df.shift(1).fillna(0) | |
| for date, row in df.iterrows(): | |
| best = prev.loc[date].idxmax() | |
| records.append(dict(Date=date, Signal=best, | |
| Net_Return=float(row[best]))) | |
| out = pd.DataFrame(records) | |
| out["Cumulative"] = (1 + out["Net_Return"]).cumprod() | |
| return out | |
| # βββ Full evaluation ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_evaluation(tsl_pct=config.DEFAULT_TSL_PCT, | |
| z_reentry=config.DEFAULT_Z_REENTRY, | |
| fee_bps=10): | |
| print(f"\n{'='*60}") | |
| print(f" Evaluation β TSL={tsl_pct}% Z-reentry={z_reentry}Ο " | |
| f"Fee={fee_bps}bps") | |
| print(f"{'='*60}") | |
| # Download data + weights from HF if not available locally | |
| download_from_hf_if_needed() | |
| data = load_local() | |
| if not data: | |
| raise RuntimeError("No data. Run data_download.py first.") | |
| # Normalize ETF columns | |
| from preprocess import normalize_etf_columns, flatten_columns | |
| etf_ret = normalize_etf_columns(data["etf_ret"].copy()) | |
| etf_ret = etf_ret[[c for c in config.ETFS if c in etf_ret.columns]] | |
| bench_ret= normalize_etf_columns(data["bench_ret"].copy()) | |
| # T-bill series | |
| macro = flatten_columns(data["macro"].copy()) | |
| tbill = macro["TBILL_3M"] if "TBILL_3M" in macro.columns \ | |
| else pd.Series(3.6, index=macro.index) | |
| # Best lookbacks from training summary | |
| summary_path = os.path.join(config.MODELS_DIR, "training_summary.json") | |
| lb_map = {"model_a": 30, "model_b": 30, "model_c": 30} | |
| if os.path.exists(summary_path): | |
| with open(summary_path) as f: | |
| s = json.load(f) | |
| for k in lb_map: | |
| lb_map[k] = s.get(k, {}).get("best_lookback", 30) | |
| results = {} | |
| for tag, module, is_dual in [ | |
| ("model_a", model_a, False), | |
| ("model_b", model_b, False), | |
| ("model_c", model_c, True), | |
| ]: | |
| lb = lb_map[tag] | |
| print(f"\n Evaluating {tag.upper()} (lb={lb}d)...") | |
| prep = run_preprocessing(data, lb) | |
| try: | |
| m = module.load_model(lb) | |
| except Exception as e: | |
| print(f" Could not load {tag}: {e}") | |
| continue | |
| preds = raw_signals(m, prep, is_dual=is_dual) | |
| probs = softmax_probs(preds) | |
| # Check if model is producing meaningful predictions | |
| prob_std = probs.std(axis=1).mean() | |
| print(f" probs sample (first 3 rows):\n{probs[:3]}") | |
| print(f" z_scores sample: {compute_z_scores(probs[:5])}") | |
| print(f" Mean prob std across ETFs: {prob_std:.4f} " | |
| f"(>0.05 = model discriminating, ~0 = uniform = weights issue)") | |
| if prob_std < 0.01: | |
| print(f" WARNING: Model {tag} outputting near-uniform probabilities!") | |
| print(f" This usually means the model weights are not loaded correctly.") | |
| bt = backtest(probs, prep["d_te"], etf_ret, tbill, | |
| fee_bps=fee_bps, tsl_pct=tsl_pct, | |
| z_reentry=z_reentry) | |
| cash_count = (bt["Mode"] == "CASH").sum() | |
| print(f" CASH days triggered: {cash_count} / {len(bt)}") | |
| print(f" Signals distribution:\n{bt['Signal'].value_counts()}") | |
| metrics = compute_metrics(bt, bench_ret, tbill) | |
| # Extend audit trail with LIVE recent dates beyond test set | |
| # Test set ends at ~10% of total data from end. | |
| # We run inference on the most recent 60 trading days too. | |
| live_records = [] | |
| try: | |
| from preprocess import build_features, apply_scaler, load_scaler | |
| features = build_features(data) | |
| scaler = load_scaler(lb) | |
| recent_dates = features.index[-60:] # last 60 trading days | |
| for dt in recent_dates: | |
| if dt in prep["d_te"]: | |
| continue # already in test set | |
| idx = features.index.get_loc(dt) | |
| if idx < lb: | |
| continue | |
| window = features.iloc[idx - lb : idx].values.astype(np.float32) | |
| X_win = apply_scaler(window.reshape(1, lb, -1), scaler) | |
| if is_dual: | |
| n_e = prep["n_etf_features"] | |
| inp = [X_win[:, :, :n_e], X_win[:, :, n_e:]] | |
| else: | |
| inp = X_win | |
| raw = m.predict(inp, verbose=0) | |
| pr = softmax_probs(raw)[0] | |
| zi = float((pr.max() - pr.mean()) / (pr.std() + 1e-8)) | |
| ei = int(np.argmax(pr)) | |
| etf_name = config.ETFS[ei] | |
| # Get actual return if available | |
| if "etf_ret" in data: | |
| from preprocess import normalize_etf_columns | |
| er = normalize_etf_columns(data["etf_ret"].copy()) | |
| ec = [c for c in config.ETFS if c in er.columns] | |
| if etf_name in ec and dt in er.index: | |
| actual_ret = float(er.loc[dt, etf_name]) | |
| else: | |
| actual_ret = 0.0 | |
| else: | |
| actual_ret = 0.0 | |
| live_records.append(dict( | |
| Date = str(dt.date()), | |
| Signal = etf_name, | |
| Confidence = round(float(pr[ei]), 4), | |
| Z_Score = round(zi, 4), | |
| Two_Day_Cumul_Pct = 0.0, | |
| Mode = "ETF", | |
| Net_Return = round(actual_ret, 6), | |
| TSL_Triggered = False, | |
| )) | |
| except Exception as ex: | |
| print(f" Live extension warning: {ex}") | |
| # Merge test set + live records, sort, take last 30 | |
| all_rows = bt.to_dict(orient="records") + live_records | |
| all_df = pd.DataFrame(all_rows) | |
| all_df["Date"] = pd.to_datetime(all_df["Date"]) | |
| all_df = all_df.sort_values("Date").drop_duplicates("Date") | |
| audit_30 = all_df.tail(30).to_dict(orient="records") | |
| results[tag] = dict( | |
| metrics = metrics, | |
| lookback = lb, | |
| audit_tail = audit_30, | |
| all_signals = bt.to_dict(orient="records"), | |
| ) | |
| print(f" Ann={metrics['ann_return']}% " | |
| f"Sharpe={metrics['sharpe']} " | |
| f"MaxDD={metrics['max_drawdown']}% " | |
| f"CashDays={metrics['cash_days']}") | |
| # AR(1) baseline | |
| prep30 = run_preprocessing(data, 30) | |
| ar1_bt = ar1_backtest(etf_ret, prep30["d_te"]) | |
| ar1_rets = ar1_bt["Net_Return"].values | |
| n = len(ar1_rets) | |
| ar1_ann = ((1 + pd.Series(ar1_rets)).prod() ** (252/n) - 1) * 100 | |
| results["ar1_baseline"] = dict(ann_return=round(float(ar1_ann), 2)) | |
| # Benchmarks | |
| for bench in config.BENCHMARKS: | |
| test_dates = prep30["d_te"] | |
| b_dates = bench_ret.index.intersection(pd.to_datetime(test_dates)) | |
| b_rets = bench_ret.loc[b_dates, bench].values \ | |
| if bench in bench_ret.columns else np.zeros(1) | |
| b_total = (1 + pd.Series(b_rets)).prod() | |
| b_ann = (b_total ** (252 / max(len(b_rets),1)) - 1) * 100 | |
| b_sh = (b_rets.mean()/(b_rets.std()+1e-8))*np.sqrt(252) | |
| b_cum = np.cumprod(1 + b_rets) | |
| b_peak = np.maximum.accumulate(b_cum) | |
| b_mdd = float(((b_cum-b_peak)/b_peak).min())*100 | |
| results[bench] = dict( | |
| ann_return = round(float(b_ann), 2), | |
| sharpe = round(float(b_sh), 3), | |
| max_drawdown = round(float(b_mdd), 2), | |
| ) | |
| # Winner | |
| valid = [k for k in ["model_a","model_b","model_c"] if k in results] | |
| if valid: | |
| winner = max(valid, | |
| key=lambda k: results[k]["metrics"]["ann_return"]) | |
| results["winner"] = winner | |
| print(f"\n β WINNER: {winner.upper()} " | |
| f"({results[winner]['metrics']['ann_return']}%)") | |
| results["evaluated_at"] = datetime.now().isoformat() | |
| results["tsl_pct"] = tsl_pct | |
| results["z_reentry"] = z_reentry | |
| with open("evaluation_results.json","w") as f: | |
| json.dump(results, f, indent=2, default=str) | |
| print(f"\n Saved β evaluation_results.json") | |
| # ββ Write date-stamped sweep cache if this is a sweep year ββββββββββββββββ | |
| SWEEP_YEARS = [2008, 2013, 2015, 2017, 2019, 2021] | |
| start_yr = results.get("start_year") or ( | |
| results.get(winner, {}).get("start_year") if winner else None) | |
| # Read from training_summary.json | |
| if start_yr is None: | |
| try: | |
| import os as _os | |
| summ_path = _os.path.join(config.MODELS_DIR, "training_summary.json") | |
| if _os.path.exists(summ_path): | |
| with open(summ_path) as _f: | |
| start_yr = json.load(_f).get("start_year") | |
| except Exception: | |
| pass | |
| if start_yr in SWEEP_YEARS and winner and winner in results: | |
| from datetime import datetime as _dt, timezone as _tz, timedelta as _td | |
| _date_tag = (_dt.now(_tz.utc) - _td(hours=5)).strftime("%Y%m%d") | |
| w_metrics = results[winner].get("metrics", {}) | |
| # Derive next signal from last row of audit_tail or all_signals | |
| _next_signal = "?" | |
| try: | |
| _audit = results[winner].get("audit_tail") or results[winner].get("all_signals", []) | |
| if _audit: | |
| _last = _audit[-1] | |
| _next_signal = _last.get("Signal_TSL") or _last.get("Signal") or "?" | |
| except Exception: | |
| pass | |
| # Z-score from latest_prediction.json (written by predict.py before evaluate in workflow) | |
| # Fall back to 0 if not available | |
| _z = 0.0 | |
| try: | |
| if os.path.exists("latest_prediction.json"): | |
| with open("latest_prediction.json") as _pf: | |
| _pred = json.load(_pf) | |
| _preds = _pred.get("predictions", {}) | |
| _z = float(_preds.get(winner, {}).get("z_score", 0.0) or 0.0) | |
| except Exception: | |
| pass | |
| sweep_payload = { | |
| "signal": _next_signal, | |
| "ann_return": round(float(w_metrics.get("ann_return", 0)) / 100, 6), | |
| "z_score": round(_z, 4), | |
| "sharpe": round(float(w_metrics.get("sharpe", 0)), 4), | |
| "max_dd": round(float(w_metrics.get("max_drawdown", 0)) / 100, 6), | |
| "winner_model": winner, | |
| "start_year": start_yr, | |
| "sweep_date": _date_tag, | |
| } | |
| os.makedirs("sweep", exist_ok=True) | |
| _sweep_fname = f"sweep/sweep_{start_yr}_{_date_tag}.json" | |
| with open(_sweep_fname, "w") as _sf: | |
| json.dump(sweep_payload, _sf, indent=2) | |
| print(f" Sweep cache saved β {_sweep_fname} signal={_next_signal} z={_z:.3f}") | |
| return results | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--tsl", type=float, default=config.DEFAULT_TSL_PCT) | |
| parser.add_argument("--z", type=float, default=config.DEFAULT_Z_REENTRY) | |
| parser.add_argument("--fee", type=float, default=10) | |
| args = parser.parse_args() | |
| run_evaluation(tsl_pct=args.tsl, z_reentry=args.z, fee_bps=args.fee) | |