P2-ETF-DEEPWAVE-DL / evaluate.py
P2SAMAPA's picture
[auto] Sync code from GitHub
d21da44 verified
# evaluate.py
import json
import os
import shutil
import numpy as np
import pandas as pd
from datetime import datetime
import config
from data_download import load_local
from preprocess import run_preprocessing
import model_a, model_b, model_c
# ─── HF download helper ───────────────────────────────────────────────────────
def download_from_hf_if_needed():
"""Download data + weights from HF Dataset if not present locally."""
try:
from huggingface_hub import HfApi, hf_hub_download
token = config.HF_TOKEN or None
# Data parquets
os.makedirs(config.DATA_DIR, exist_ok=True)
for f in ["etf_price","etf_ret","etf_vol",
"bench_price","bench_ret","bench_vol","macro"]:
local = os.path.join(config.DATA_DIR, f"{f}.parquet")
if not os.path.exists(local):
try:
dl = hf_hub_download(
repo_id=config.HF_DATASET_REPO,
filename=f"data/{f}.parquet",
repo_type="dataset", token=token)
shutil.copy(dl, local)
print(f" Downloaded data/{f}.parquet")
except Exception as e:
print(f" Warning data/{f}: {e}")
# Model weights
os.makedirs(config.MODELS_DIR, exist_ok=True)
api = HfApi(token=token)
files = api.list_repo_files(
repo_id=config.HF_DATASET_REPO,
repo_type="dataset", token=token)
for f in files:
if f.startswith("models/") and f.endswith((".keras",".pkl",".json")):
local = f
if not os.path.exists(local):
os.makedirs(os.path.dirname(local), exist_ok=True)
try:
dl = hf_hub_download(
repo_id=config.HF_DATASET_REPO,
filename=f, repo_type="dataset", token=token)
shutil.copy(dl, local)
print(f" Downloaded {f}")
except Exception as e:
print(f" Warning {f}: {e}")
except Exception as e:
print(f" WARNING: HF download failed: {e}")
# ─── Signal generation ────────────────────────────────────────────────────────
def raw_signals(model, prep, is_dual=False):
X_te = prep["X_te"]
if is_dual:
n = prep["n_etf_features"]
inputs = [X_te[:, :, :n], X_te[:, :, n:]]
else:
inputs = X_te
preds = model.predict(inputs, verbose=0) # (N, 5)
# Diagnostic
pred_std = preds.std(axis=0).mean()
print(f" Raw pred std per ETF: {preds.std(axis=0).round(6)}")
if pred_std < 1e-4:
print(f" WARNING: Near-uniform predictions (std={pred_std:.6f}) β€” possible weight/scaler mismatch")
return preds
def softmax_probs(preds, temperature=1.0):
"""
Softmax probabilities. Models now output softmax directly (classification).
temperature=1.0 = pass-through. Left as parameter for legacy compatibility.
"""
preds = np.array(preds)
# If model already outputs softmax (sums to 1), return as-is
row_sums = preds.sum(axis=1)
if np.allclose(row_sums, 1.0, atol=0.01):
return np.clip(preds, 0, 1)
# Otherwise apply softmax (legacy regression models)
scaled = preds / (temperature + 1e-8)
e = np.exp(scaled - scaled.max(axis=1, keepdims=True))
return e / e.sum(axis=1, keepdims=True)
def compute_z_scores(probs):
"""
Per-row Z-score: how many std devs is the top ETF above the row mean?
Returns array of shape (N,)
"""
top = probs.max(axis=1) # (N,)
mu = probs.mean(axis=1) # (N,)
sigma = probs.std(axis=1) + 1e-8 # (N,)
return (top - mu) / sigma # (N,)
# ─── TSL backtest ─────────────────────────────────────────────────────────────
def backtest(probs, dates, etf_returns, tbill_series,
fee_bps=10, tsl_pct=10.0, z_reentry=1.1):
"""
Day-by-day backtest with TSL + Z-score re-entry.
Key fix: TSL fires end-of-day, re-entry checked next day.
"""
z_scores = compute_z_scores(probs)
records = []
in_cash = False
prev_ret = 0.0
prev2_ret = 0.0
last_signal= None
tsl_days = 0 # days spent in CASH after TSL
for i in range(len(probs)):
date = pd.Timestamp(dates[i])
prob = probs[i]
z = float(z_scores[i])
top_i = int(np.argmax(prob))
etf = config.ETFS[top_i]
conf = float(prob[top_i])
# 2-day cumulative return (previous 2 days)
two_day_cumul_pct = (prev_ret + prev2_ret) * 100
# ── TSL trigger (must be out of CASH to trigger) ───────────────────
if not in_cash and two_day_cumul_pct <= -tsl_pct:
in_cash = True
tsl_days = 0
# ── Z-score re-entry (only after at least 1 full CASH day) ────────
if in_cash and tsl_days >= 1 and z >= z_reentry:
in_cash = False
if in_cash:
tsl_days += 1
# ── Get actual return ─────────────────────────────────────────────
if date in etf_returns.index:
if in_cash:
tbill_rate = float(tbill_series.get(date, 3.6))
gross_ret = (tbill_rate / 100) / 252
mode = "πŸ’΅ CASH"
signal = "CASH"
else:
gross_ret = float(etf_returns.loc[date, etf]) \
if etf in etf_returns.columns else 0.0
fee_cost = (fee_bps / 10000) if etf != last_signal else 0.0
gross_ret -= fee_cost
mode = "πŸ“ˆ ETF"
signal = etf
last_signal= etf
else:
gross_ret = 0.0
mode = "πŸ’΅ CASH" if in_cash else "πŸ“ˆ ETF"
signal = "CASH" if in_cash else etf
records.append(dict(
Date = str(date.date()),
Signal = signal,
Confidence = round(conf, 4),
Z_Score = round(z, 4),
Two_Day_Cumul_Pct = round(two_day_cumul_pct, 2),
Mode = mode,
Net_Return = round(gross_ret, 6),
TSL_Triggered = in_cash,
))
prev2_ret = prev_ret
prev_ret = gross_ret
prev_ret = gross_ret
df = pd.DataFrame(records)
df["Cumulative"] = (1 + df["Net_Return"]).cumprod()
return df
# ─── Performance metrics ──────────────────────────────────────────────────────
def compute_metrics(bt, bench_ret, tbill_series):
rets = bt["Net_Return"].values
dates = pd.to_datetime(bt["Date"])
n_days = len(rets)
total = float((1 + pd.Series(rets)).prod())
ann_ret = (total ** (252 / n_days) - 1) * 100
tbill_daily = tbill_series.reindex(dates).ffill().fillna(3.6) / 100 / 252
excess = rets - tbill_daily.values
sharpe = float((excess.mean() / (excess.std() + 1e-8)) * np.sqrt(252))
cum = np.cumprod(1 + rets)
peak = np.maximum.accumulate(cum)
dd = (cum - peak) / peak
max_dd = float(dd.min()) * 100
max_daily_dd = float(rets.min()) * 100
signs = np.sign(rets)
hit_15 = float(pd.Series(signs).rolling(15).apply(
lambda x: (x > 0).mean()).mean())
# SPY benchmark
spy_dates = bench_ret.index.intersection(dates)
spy_rets = bench_ret.loc[spy_dates, "SPY"].values \
if "SPY" in bench_ret.columns else np.zeros(1)
spy_total = float((1 + pd.Series(spy_rets)).prod())
spy_ann = (spy_total ** (252 / max(len(spy_rets), 1)) - 1) * 100
# CASH days count
cash_days = int((bt["Mode"] == "CASH").sum())
return dict(
ann_return = round(ann_ret, 2),
sharpe = round(sharpe, 3),
hit_ratio_15d = round(hit_15, 3),
max_drawdown = round(max_dd, 2),
max_daily_dd = round(max_daily_dd, 2),
vs_spy = round(ann_ret - spy_ann, 2),
cash_days = cash_days,
)
# ─── AR(1) baseline ───────────────────────────────────────────────────────────
def ar1_backtest(etf_returns, test_dates):
records = []
dates_dt = pd.to_datetime(test_dates)
df = etf_returns[etf_returns.index.isin(dates_dt)].copy()
prev = df.shift(1).fillna(0)
for date, row in df.iterrows():
best = prev.loc[date].idxmax()
records.append(dict(Date=date, Signal=best,
Net_Return=float(row[best])))
out = pd.DataFrame(records)
out["Cumulative"] = (1 + out["Net_Return"]).cumprod()
return out
# ─── Full evaluation ──────────────────────────────────────────────────────────
def run_evaluation(tsl_pct=config.DEFAULT_TSL_PCT,
z_reentry=config.DEFAULT_Z_REENTRY,
fee_bps=10):
print(f"\n{'='*60}")
print(f" Evaluation β€” TSL={tsl_pct}% Z-reentry={z_reentry}Οƒ "
f"Fee={fee_bps}bps")
print(f"{'='*60}")
# Download data + weights from HF if not available locally
download_from_hf_if_needed()
data = load_local()
if not data:
raise RuntimeError("No data. Run data_download.py first.")
# Normalize ETF columns
from preprocess import normalize_etf_columns, flatten_columns
etf_ret = normalize_etf_columns(data["etf_ret"].copy())
etf_ret = etf_ret[[c for c in config.ETFS if c in etf_ret.columns]]
bench_ret= normalize_etf_columns(data["bench_ret"].copy())
# T-bill series
macro = flatten_columns(data["macro"].copy())
tbill = macro["TBILL_3M"] if "TBILL_3M" in macro.columns \
else pd.Series(3.6, index=macro.index)
# Best lookbacks from training summary
summary_path = os.path.join(config.MODELS_DIR, "training_summary.json")
lb_map = {"model_a": 30, "model_b": 30, "model_c": 30}
if os.path.exists(summary_path):
with open(summary_path) as f:
s = json.load(f)
for k in lb_map:
lb_map[k] = s.get(k, {}).get("best_lookback", 30)
results = {}
for tag, module, is_dual in [
("model_a", model_a, False),
("model_b", model_b, False),
("model_c", model_c, True),
]:
lb = lb_map[tag]
print(f"\n Evaluating {tag.upper()} (lb={lb}d)...")
prep = run_preprocessing(data, lb)
try:
m = module.load_model(lb)
except Exception as e:
print(f" Could not load {tag}: {e}")
continue
preds = raw_signals(m, prep, is_dual=is_dual)
probs = softmax_probs(preds)
# Check if model is producing meaningful predictions
prob_std = probs.std(axis=1).mean()
print(f" probs sample (first 3 rows):\n{probs[:3]}")
print(f" z_scores sample: {compute_z_scores(probs[:5])}")
print(f" Mean prob std across ETFs: {prob_std:.4f} "
f"(>0.05 = model discriminating, ~0 = uniform = weights issue)")
if prob_std < 0.01:
print(f" WARNING: Model {tag} outputting near-uniform probabilities!")
print(f" This usually means the model weights are not loaded correctly.")
bt = backtest(probs, prep["d_te"], etf_ret, tbill,
fee_bps=fee_bps, tsl_pct=tsl_pct,
z_reentry=z_reentry)
cash_count = (bt["Mode"] == "CASH").sum()
print(f" CASH days triggered: {cash_count} / {len(bt)}")
print(f" Signals distribution:\n{bt['Signal'].value_counts()}")
metrics = compute_metrics(bt, bench_ret, tbill)
# Extend audit trail with LIVE recent dates beyond test set
# Test set ends at ~10% of total data from end.
# We run inference on the most recent 60 trading days too.
live_records = []
try:
from preprocess import build_features, apply_scaler, load_scaler
features = build_features(data)
scaler = load_scaler(lb)
recent_dates = features.index[-60:] # last 60 trading days
for dt in recent_dates:
if dt in prep["d_te"]:
continue # already in test set
idx = features.index.get_loc(dt)
if idx < lb:
continue
window = features.iloc[idx - lb : idx].values.astype(np.float32)
X_win = apply_scaler(window.reshape(1, lb, -1), scaler)
if is_dual:
n_e = prep["n_etf_features"]
inp = [X_win[:, :, :n_e], X_win[:, :, n_e:]]
else:
inp = X_win
raw = m.predict(inp, verbose=0)
pr = softmax_probs(raw)[0]
zi = float((pr.max() - pr.mean()) / (pr.std() + 1e-8))
ei = int(np.argmax(pr))
etf_name = config.ETFS[ei]
# Get actual return if available
if "etf_ret" in data:
from preprocess import normalize_etf_columns
er = normalize_etf_columns(data["etf_ret"].copy())
ec = [c for c in config.ETFS if c in er.columns]
if etf_name in ec and dt in er.index:
actual_ret = float(er.loc[dt, etf_name])
else:
actual_ret = 0.0
else:
actual_ret = 0.0
live_records.append(dict(
Date = str(dt.date()),
Signal = etf_name,
Confidence = round(float(pr[ei]), 4),
Z_Score = round(zi, 4),
Two_Day_Cumul_Pct = 0.0,
Mode = "ETF",
Net_Return = round(actual_ret, 6),
TSL_Triggered = False,
))
except Exception as ex:
print(f" Live extension warning: {ex}")
# Merge test set + live records, sort, take last 30
all_rows = bt.to_dict(orient="records") + live_records
all_df = pd.DataFrame(all_rows)
all_df["Date"] = pd.to_datetime(all_df["Date"])
all_df = all_df.sort_values("Date").drop_duplicates("Date")
audit_30 = all_df.tail(30).to_dict(orient="records")
results[tag] = dict(
metrics = metrics,
lookback = lb,
audit_tail = audit_30,
all_signals = bt.to_dict(orient="records"),
)
print(f" Ann={metrics['ann_return']}% "
f"Sharpe={metrics['sharpe']} "
f"MaxDD={metrics['max_drawdown']}% "
f"CashDays={metrics['cash_days']}")
# AR(1) baseline
prep30 = run_preprocessing(data, 30)
ar1_bt = ar1_backtest(etf_ret, prep30["d_te"])
ar1_rets = ar1_bt["Net_Return"].values
n = len(ar1_rets)
ar1_ann = ((1 + pd.Series(ar1_rets)).prod() ** (252/n) - 1) * 100
results["ar1_baseline"] = dict(ann_return=round(float(ar1_ann), 2))
# Benchmarks
for bench in config.BENCHMARKS:
test_dates = prep30["d_te"]
b_dates = bench_ret.index.intersection(pd.to_datetime(test_dates))
b_rets = bench_ret.loc[b_dates, bench].values \
if bench in bench_ret.columns else np.zeros(1)
b_total = (1 + pd.Series(b_rets)).prod()
b_ann = (b_total ** (252 / max(len(b_rets),1)) - 1) * 100
b_sh = (b_rets.mean()/(b_rets.std()+1e-8))*np.sqrt(252)
b_cum = np.cumprod(1 + b_rets)
b_peak = np.maximum.accumulate(b_cum)
b_mdd = float(((b_cum-b_peak)/b_peak).min())*100
results[bench] = dict(
ann_return = round(float(b_ann), 2),
sharpe = round(float(b_sh), 3),
max_drawdown = round(float(b_mdd), 2),
)
# Winner
valid = [k for k in ["model_a","model_b","model_c"] if k in results]
if valid:
winner = max(valid,
key=lambda k: results[k]["metrics"]["ann_return"])
results["winner"] = winner
print(f"\n ⭐ WINNER: {winner.upper()} "
f"({results[winner]['metrics']['ann_return']}%)")
results["evaluated_at"] = datetime.now().isoformat()
results["tsl_pct"] = tsl_pct
results["z_reentry"] = z_reentry
with open("evaluation_results.json","w") as f:
json.dump(results, f, indent=2, default=str)
print(f"\n Saved β†’ evaluation_results.json")
# ── Write date-stamped sweep cache if this is a sweep year ────────────────
SWEEP_YEARS = [2008, 2013, 2015, 2017, 2019, 2021]
start_yr = results.get("start_year") or (
results.get(winner, {}).get("start_year") if winner else None)
# Read from training_summary.json
if start_yr is None:
try:
import os as _os
summ_path = _os.path.join(config.MODELS_DIR, "training_summary.json")
if _os.path.exists(summ_path):
with open(summ_path) as _f:
start_yr = json.load(_f).get("start_year")
except Exception:
pass
if start_yr in SWEEP_YEARS and winner and winner in results:
from datetime import datetime as _dt, timezone as _tz, timedelta as _td
_date_tag = (_dt.now(_tz.utc) - _td(hours=5)).strftime("%Y%m%d")
w_metrics = results[winner].get("metrics", {})
# Derive next signal from last row of audit_tail or all_signals
_next_signal = "?"
try:
_audit = results[winner].get("audit_tail") or results[winner].get("all_signals", [])
if _audit:
_last = _audit[-1]
_next_signal = _last.get("Signal_TSL") or _last.get("Signal") or "?"
except Exception:
pass
# Z-score from latest_prediction.json (written by predict.py before evaluate in workflow)
# Fall back to 0 if not available
_z = 0.0
try:
if os.path.exists("latest_prediction.json"):
with open("latest_prediction.json") as _pf:
_pred = json.load(_pf)
_preds = _pred.get("predictions", {})
_z = float(_preds.get(winner, {}).get("z_score", 0.0) or 0.0)
except Exception:
pass
sweep_payload = {
"signal": _next_signal,
"ann_return": round(float(w_metrics.get("ann_return", 0)) / 100, 6),
"z_score": round(_z, 4),
"sharpe": round(float(w_metrics.get("sharpe", 0)), 4),
"max_dd": round(float(w_metrics.get("max_drawdown", 0)) / 100, 6),
"winner_model": winner,
"start_year": start_yr,
"sweep_date": _date_tag,
}
os.makedirs("sweep", exist_ok=True)
_sweep_fname = f"sweep/sweep_{start_yr}_{_date_tag}.json"
with open(_sweep_fname, "w") as _sf:
json.dump(sweep_payload, _sf, indent=2)
print(f" Sweep cache saved β†’ {_sweep_fname} signal={_next_signal} z={_z:.3f}")
return results
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--tsl", type=float, default=config.DEFAULT_TSL_PCT)
parser.add_argument("--z", type=float, default=config.DEFAULT_Z_REENTRY)
parser.add_argument("--fee", type=float, default=10)
args = parser.parse_args()
run_evaluation(tsl_pct=args.tsl, z_reentry=args.z, fee_bps=args.fee)