| """Walk-forward backtest for ML signals (local). |
| |
| Goal: produce *measurable* evidence for whether ML outputs are usable. |
| This script: |
| - fetches historical prices via `data.stock_data_api.get_stock_data_for_api` |
| - runs a walk-forward training/prediction loop (time-series safe) |
| - optionally computes deterministic technical gates (required_ok / technical_signal) |
| - simulates a simple long-only strategy |
| |
| It is intentionally conservative and auditable. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| from dataclasses import dataclass |
| from datetime import datetime |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
| from sklearn.ensemble import ( |
| GradientBoostingClassifier, |
| GradientBoostingRegressor, |
| RandomForestClassifier, |
| RandomForestRegressor, |
| ) |
| from sklearn.metrics import accuracy_score, mean_absolute_error, r2_score |
| from sklearn.preprocessing import StandardScaler |
|
|
| from data.stock_data_api import get_stock_data_for_api |
| from ai.predictions_api import _apply_shrinkage, _compute_confidence, _signal_from_change |
| from analysis.scan_signals_api import compute_scan_signals_for_df |
| from ai.enhanced_features import ( |
| engineer_enhanced_features, ENHANCED_FEATURES, ALL_FEATURES, |
| add_macro_features, flag_corp_action_days, |
| ) |
| from trading.broker_base import SlippageModel |
| from trading.market_registry import DEFAULT_MARKET_ID |
|
|
|
|
| def _parse_date(s: str) -> pd.Timestamp: |
| return pd.to_datetime(str(s).strip()).tz_localize(None) |
|
|
|
|
| def _to_iso(d: Any) -> str: |
| try: |
| return pd.to_datetime(d).strftime("%Y-%m-%d") |
| except Exception: |
| return str(d) |
|
|
|
|
| def _normalize_signal(x: Any) -> str: |
| s = str(x or "").strip().upper() |
| return s if s in {"BUY", "SELL", "HOLD"} else "HOLD" |
|
|
|
|
| def _combine_signals(ml_signal: str, tech_signal: str, required_ok: bool) -> str: |
| """Combine ML + tech gate signals. Tech gate must confirm ML BUY. |
| |
| Rules: |
| - SELL from either side β SELL (unless conflicting β HOLD) |
| - ML BUY + Tech BUY β BUY |
| - ML BUY + Tech HOLD β HOLD (gate not confirmed) |
| - Tech BUY + required_ok + ML HOLD β HOLD (ML must agree) |
| - Anything else β HOLD |
| """ |
| ml = _normalize_signal(ml_signal) |
| tech = _normalize_signal(tech_signal) |
|
|
| if ml == "SELL" or tech == "SELL": |
| if (ml == "SELL" and tech == "BUY") or (ml == "BUY" and tech == "SELL"): |
| return "HOLD" |
| return "SELL" |
|
|
| |
| if ml == "BUY" and (tech == "BUY" or required_ok): |
| return "BUY" |
| return "HOLD" |
|
|
|
|
| |
| |
| FEATURES = ALL_FEATURES |
|
|
| |
| _BENCHMARK_CACHE: Dict[str, pd.DataFrame] = {} |
|
|
|
|
| def _get_market_benchmark_data(market_id: str = DEFAULT_MARKET_ID) -> Optional[pd.DataFrame]: |
| """Fetch a market benchmark once and cache it per market.""" |
| benchmark_map = { |
| "bist": "XU100.IS", |
| "us": "SPY", |
| } |
| benchmark_symbol = benchmark_map.get(str(market_id or DEFAULT_MARKET_ID).strip().lower()) |
| if not benchmark_symbol: |
| return None |
| if benchmark_symbol in _BENCHMARK_CACHE: |
| return _BENCHMARK_CACHE[benchmark_symbol] |
| try: |
| benchmark_df = get_stock_data_for_api(benchmark_symbol, period="5y", interval="1d", market_id=market_id) |
| if benchmark_df is not None and not benchmark_df.empty: |
| benchmark_df = benchmark_df.sort_index() |
| benchmark_df.index = pd.to_datetime(benchmark_df.index).tz_localize(None) |
| _BENCHMARK_CACHE[benchmark_symbol] = benchmark_df |
| return _BENCHMARK_CACHE[benchmark_symbol] |
| except Exception: |
| pass |
| return None |
|
|
|
|
| @dataclass |
| class Trade: |
| date: str |
| type: str |
| price: float |
| shares: int |
| capital: float |
| reason: str = "" |
|
|
|
|
| def _max_drawdown(equity: pd.Series) -> float: |
| if equity is None or equity.empty: |
| return 0.0 |
| running_max = equity.cummax() |
| dd = (equity / running_max) - 1.0 |
| return float(abs(dd.min()) * 100.0) |
|
|
|
|
| def _sharpe(daily_returns: pd.Series) -> float: |
| if daily_returns is None or daily_returns.empty: |
| return 0.0 |
| r = daily_returns.dropna() |
| if len(r) < 2: |
| return 0.0 |
| mean_r = float(r.mean()) |
| std_r = float(r.std()) |
| if std_r <= 0: |
| return 0.0 |
| return float((mean_r / std_r) * np.sqrt(252.0)) |
|
|
|
|
| def _cagr_pct(initial_capital: float, final_capital: float, years: float) -> float: |
| if years <= 0 or initial_capital <= 0: |
| return 0.0 |
| if final_capital <= 0: |
| return -100.0 |
| return float(((final_capital / initial_capital) ** (1.0 / years) - 1.0) * 100.0) |
|
|
|
|
| def _years_between(start_iso: str, end_iso: str) -> float: |
| try: |
| a = pd.to_datetime(start_iso) |
| b = pd.to_datetime(end_iso) |
| days = float((b - a).days) |
| return max(0.0, days / 365.25) |
| except Exception: |
| return 0.0 |
|
|
|
|
| def _turnover(total_trade_value: float, avg_equity: float, years: float) -> Dict[str, float]: |
| if avg_equity <= 0: |
| return {"turnover": 0.0, "turnover_annualized": 0.0} |
| t = float(total_trade_value / avg_equity) |
| ann = float(t / years) if years > 0 else 0.0 |
| return {"turnover": t, "turnover_annualized": ann} |
|
|
|
|
| def _parse_optional_int(x: Any) -> Optional[int]: |
| try: |
| if x is None: |
| return None |
| v = int(x) |
| return v if v > 0 else None |
| except Exception: |
| return None |
|
|
|
|
| def _clamp01(x: float) -> float: |
| try: |
| return float(max(0.0, min(1.0, float(x)))) |
| except Exception: |
| return 0.0 |
|
|
|
|
| def _position_size_shares( |
| equity: float, |
| price: float, |
| max_position_pct: float, |
| max_risk_per_trade_pct: float, |
| stop_loss_pct: Optional[float], |
| ) -> int: |
| """Compute position size given equity, price and risk constraints. |
| |
| - Max allocation cap: `max_position_pct` of equity. |
| - Risk cap (if stop-loss is provided): loss at stop <= `max_risk_per_trade_pct` of equity. |
| |
| Returns shares (integer >= 0). |
| """ |
| if equity <= 0 or price <= 0: |
| return 0 |
|
|
| max_position_pct = _clamp01(max_position_pct) |
| max_risk_per_trade_pct = _clamp01(max_risk_per_trade_pct) |
|
|
| alloc_cap_value = equity * max_position_pct |
| shares_by_alloc = int(alloc_cap_value // price) |
| if shares_by_alloc <= 0: |
| return 0 |
|
|
| if stop_loss_pct is None or stop_loss_pct <= 0: |
| return shares_by_alloc |
|
|
| risk_cap_value = equity * max_risk_per_trade_pct |
| loss_per_share = price * float(stop_loss_pct) |
| if loss_per_share <= 0: |
| return shares_by_alloc |
|
|
| shares_by_risk = int(risk_cap_value // loss_per_share) |
| return max(0, min(shares_by_alloc, shares_by_risk)) |
|
|
|
|
| def _get_vol_data(df_feat: pd.DataFrame, pos: int) -> Tuple[float, float]: |
| """Get 20-day average volume and daily volatility (%) at position.""" |
| try: |
| vol = float(df_feat["_avg_vol_20d"].iloc[pos]) |
| if not (np.isfinite(vol) and vol > 0): |
| vol = float(df_feat["Volume"].iloc[pos]) |
| except Exception: |
| vol = 0.0 |
| try: |
| vol_pct = float(df_feat["vol_20d"].iloc[pos]) * 100.0 |
| vol_pct = vol_pct if np.isfinite(vol_pct) else 2.0 |
| except Exception: |
| vol_pct = 2.0 |
| return vol, vol_pct |
|
|
|
|
| def _dynamic_trade_cost_frac( |
| slippage_model: SlippageModel, |
| close_px: float, |
| shares: int, |
| daily_volume: float, |
| daily_vol_pct: float, |
| ) -> float: |
| """One-way trade cost as fraction of notional (commission + slippage). |
| |
| Uses Almgren-Chriss-like impact model from broker_base.SlippageModel. |
| Returns e.g. 0.0025 for 25 bps total cost. |
| """ |
| comm_frac = slippage_model.commission_rate * (1.0 + slippage_model.bsmv_rate) |
| slip_bps = slippage_model.estimate_slippage_bps(daily_volume, shares, daily_vol_pct) |
| return comm_frac + slip_bps / 10_000.0 |
|
|
|
|
| def walk_forward_backtest( |
| symbol: str, |
| start_date: str, |
| end_date: str, |
| market_id: str = DEFAULT_MARKET_ID, |
| days_ahead: int = 7, |
| train_window: int = 504, |
| model_type: str = "rf", |
| use_technical_gate: bool = True, |
| initial_capital: float = 100_000.0, |
| commission_bps: float = 10.0, |
| slippage_bps: float = 10.0, |
| exit_rule: str = "signal", |
| max_hold_days: Optional[int] = None, |
| stop_loss_pct: Optional[float] = None, |
| take_profit_pct: Optional[float] = None, |
| trailing_stop_pct: Optional[float] = None, |
| max_position_pct: float = 1.0, |
| max_risk_per_trade_pct: float = 1.0, |
| ) -> Tuple[pd.DataFrame, Dict[str, Any]]: |
| sym = str(symbol).strip().upper() |
| if not sym: |
| raise ValueError("symbol is required") |
|
|
| df = get_stock_data_for_api(sym, period="5y", interval="1d", market_id=market_id) |
| if df is None or df.empty: |
| raise RuntimeError(f"No data for {sym}") |
|
|
| df = df.sort_index() |
| df.index = pd.to_datetime(df.index).tz_localize(None) |
|
|
| start_dt = _parse_date(start_date) |
| end_dt = _parse_date(end_date) |
|
|
| df = df[(df.index >= start_dt) & (df.index <= end_dt)].copy() |
| if df.empty or len(df) < (train_window + days_ahead + 50): |
| raise RuntimeError("Not enough data in selected range for walk-forward") |
|
|
| df_feat = engineer_enhanced_features(df) |
| df_feat = add_macro_features(df_feat) |
| df_feat["target_return"] = (df_feat["Close"].shift(-days_ahead) / df_feat["Close"] - 1) * 100.0 |
|
|
| |
| _target_clip = 3.5 * float(np.sqrt(max(1, days_ahead))) |
| _extreme_mask = df_feat["target_return"].abs() > _target_clip |
| df_feat.loc[_extreme_mask, "target_return"] = np.clip( |
| df_feat.loc[_extreme_mask, "target_return"], -_target_clip, _target_clip, |
| ) |
|
|
| |
| |
| _ca_suspect = flag_corp_action_days(df) |
| _ca_suspect = _ca_suspect.reindex(df_feat.index).fillna(False) |
| _ca_expanded = _ca_suspect.copy() |
| for _shift in range(-days_ahead, days_ahead + 1): |
| _ca_expanded = _ca_expanded | _ca_suspect.shift(_shift).fillna(False).astype(bool) |
| _n_poisoned = int(_ca_expanded.sum()) |
| if _n_poisoned > 0: |
| df_feat.loc[_ca_expanded, "target_return"] = np.nan |
| import logging as _log |
| _log.getLogger("walk_forward").info( |
| "%s: poisoned %d target rows around %d corp-action suspect days", |
| sym, _n_poisoned, int(_ca_suspect.sum()), |
| ) |
|
|
| |
| _hl = df_feat["High"] - df_feat["Low"] |
| _hc = (df_feat["High"] - df_feat["Close"].shift(1)).abs() |
| _lc = (df_feat["Low"] - df_feat["Close"].shift(1)).abs() |
| df_feat["_atr_14"] = pd.concat([_hl, _hc, _lc], axis=1).max(axis=1).rolling(14).mean() |
|
|
| |
| df_feat["_avg_vol_20d"] = df_feat["Volume"].rolling(20).mean().fillna(df_feat["Volume"]) |
|
|
| |
| _market_uptrend = pd.Series(True, index=df_feat.index) |
| benchmark_df = _get_market_benchmark_data(market_id) |
| if benchmark_df is not None: |
| _xu100_close = benchmark_df["Close"].reindex(df_feat.index, method="ffill") |
| _xu100_sma50 = _xu100_close.rolling(50).mean() |
| _xu100_sma200 = _xu100_close.rolling(200).mean() |
| |
| _market_uptrend = (_xu100_close >= _xu100_sma50) | (_xu100_sma50 >= _xu100_sma200) |
| _market_uptrend = _market_uptrend.fillna(True) |
|
|
| |
| records: List[Dict[str, Any]] = [] |
|
|
| |
| capital = float(initial_capital) |
| shares = 0 |
| position = False |
| entry_price: Optional[float] = None |
| entry_date: Optional[str] = None |
| days_in_position = 0 |
| max_close_since_entry: Optional[float] = None |
| trades: List[Trade] = [] |
| total_trade_value = 0.0 |
| _rolling_accuracies: List[float] = [] |
| _current_prob_up: float = 0.5 |
| entry_atr: Optional[float] = None |
|
|
| |
| _slippage_model = SlippageModel( |
| commission_rate=commission_bps / 10_000.0, |
| bsmv_rate=0.05, |
| min_slippage_bps=max(slippage_bps, 5.0), |
| vol_slippage_coeff=0.3, |
| ) |
|
|
| |
| for pos_t in range(train_window, len(df_feat) - days_ahead): |
| date_t = df_feat.index[pos_t] |
|
|
| |
| row_t = df_feat.iloc[pos_t] |
| if not np.all(np.isfinite(row_t[FEATURES].to_numpy(dtype=float))): |
| continue |
|
|
| train_end = pos_t - days_ahead |
| train_start = max(0, train_end - train_window + 1) |
|
|
| train_slice = df_feat.iloc[train_start : train_end + 1] |
| X_all = train_slice[FEATURES].to_numpy(dtype=float) |
| y_all = train_slice["target_return"].to_numpy(dtype=float) |
|
|
| finite_mask = np.isfinite(y_all) & np.all(np.isfinite(X_all), axis=1) |
| X_all = X_all[finite_mask] |
| y_all = y_all[finite_mask] |
|
|
| if len(y_all) < 120: |
| continue |
|
|
| |
| split_idx = int(len(y_all) * 0.8) |
| val_start = split_idx + days_ahead |
| if split_idx < 60 or val_start >= len(y_all) or (len(y_all) - val_start) < 10: |
| continue |
|
|
| X_train, X_test = X_all[:split_idx], X_all[val_start:] |
| y_train, y_test = y_all[:split_idx], y_all[val_start:] |
|
|
| scaler = StandardScaler() |
| X_train_s = scaler.fit_transform(np.nan_to_num(X_train, nan=0.0, posinf=0.0, neginf=0.0)) |
| X_test_s = scaler.transform(np.nan_to_num(X_test, nan=0.0, posinf=0.0, neginf=0.0)) |
|
|
| |
| _sel_rf = RandomForestRegressor( |
| n_estimators=50, max_depth=4, min_samples_leaf=5, |
| max_features='sqrt', random_state=42, n_jobs=-1, |
| ) |
| _sel_rf.fit(X_train_s, y_train) |
| importances = _sel_rf.feature_importances_ |
| n_keep = min(10, len(FEATURES)) |
| top_idx = np.argsort(importances)[-n_keep:] |
| X_train_s = X_train_s[:, top_idx] |
| X_test_s = X_test_s[:, top_idx] |
|
|
| |
| n_train = len(X_train_s) |
| sample_weights = np.exp(np.linspace(-1.0, 0.0, n_train)) |
|
|
| |
| y_train_cls = (y_train > 0).astype(int) |
| y_test_cls = (y_test > 0).astype(int) |
|
|
| |
| clf_rf = RandomForestClassifier( |
| n_estimators=200, max_depth=3, min_samples_split=10, |
| min_samples_leaf=5, max_features='sqrt', |
| random_state=42, n_jobs=-1, class_weight="balanced", |
| ) |
| clf_gb = GradientBoostingClassifier( |
| n_estimators=200, max_depth=3, learning_rate=0.03, |
| subsample=0.8, min_samples_split=10, |
| random_state=42, |
| ) |
| clf_rf.fit(X_train_s, y_train_cls, sample_weight=sample_weights) |
| clf_gb.fit(X_train_s, y_train_cls, sample_weight=sample_weights) |
|
|
| |
| prob_rf = clf_rf.predict_proba(X_test_s)[:, 1] if len(clf_rf.classes_) == 2 else np.full(len(X_test_s), 0.5) |
| prob_gb = clf_gb.predict_proba(X_test_s)[:, 1] if len(clf_gb.classes_) == 2 else np.full(len(X_test_s), 0.5) |
| prob_test = 0.5 * prob_rf + 0.5 * prob_gb |
| y_pred_cls = (prob_test > 0.5).astype(int) |
|
|
| direction_correct = float(accuracy_score(y_test_cls, y_pred_cls)) |
|
|
| |
| _rolling_accuracies.append(direction_correct) |
|
|
| |
| reg_model: Any |
| if str(model_type).lower() == "rf": |
| reg_model = RandomForestRegressor( |
| n_estimators=200, max_depth=3, min_samples_split=10, |
| min_samples_leaf=5, max_features='sqrt', |
| random_state=42, n_jobs=-1, |
| ) |
| else: |
| reg_model = GradientBoostingRegressor( |
| n_estimators=200, max_depth=3, learning_rate=0.03, |
| subsample=0.8, min_samples_split=10, |
| random_state=42, |
| ) |
| reg_model.fit(X_train_s, y_train, sample_weight=sample_weights) |
| y_pred_reg = np.asarray(reg_model.predict(X_test_s), dtype=float) |
| r2 = float(r2_score(y_test, y_pred_reg)) |
| mae = float(mean_absolute_error(y_test, y_pred_reg)) |
|
|
| confidence_pct = float(_compute_confidence(r2, direction_correct)) |
|
|
| |
| _rolling_avg_bad = ( |
| len(_rolling_accuracies) >= 5 |
| and float(np.mean(_rolling_accuracies[-5:])) < 0.52 |
| ) |
|
|
| |
| if direction_correct < 0.55 or _rolling_avg_bad: |
| |
| X_pred_row = scaler.transform(np.nan_to_num(row_t[FEATURES].to_numpy(dtype=float).reshape(1, -1), nan=0.0, posinf=0.0, neginf=0.0)) |
| X_pred_sel = X_pred_row[:, top_idx] |
| reg_pred = float(np.asarray(reg_model.predict(X_pred_sel), dtype=float).ravel()[0]) |
| reg_pred *= 0.30 |
| predicted_change = float(_apply_shrinkage(reg_pred, confidence_pct, days_ahead)) |
| ml_signal = "HOLD" |
| _current_prob_up = 0.5 |
| else: |
| X_pred_row = scaler.transform(np.nan_to_num(row_t[FEATURES].to_numpy(dtype=float).reshape(1, -1), nan=0.0, posinf=0.0, neginf=0.0)) |
| X_pred_sel = X_pred_row[:, top_idx] |
|
|
| |
| p_rf = clf_rf.predict_proba(X_pred_sel)[:, 1][0] if len(clf_rf.classes_) == 2 else 0.5 |
| p_gb = clf_gb.predict_proba(X_pred_sel)[:, 1][0] if len(clf_gb.classes_) == 2 else 0.5 |
| prob_up = 0.5 * p_rf + 0.5 * p_gb |
| _current_prob_up = prob_up |
|
|
| |
| reg_pred = float(np.asarray(reg_model.predict(X_pred_sel), dtype=float).ravel()[0]) |
| reg_pred *= 0.30 |
| predicted_change = float(_apply_shrinkage(reg_pred, confidence_pct, days_ahead)) |
|
|
| |
| if prob_up >= 0.62: |
| ml_signal = "BUY" |
| elif prob_up <= 0.38: |
| ml_signal = "SELL" |
| else: |
| ml_signal = "HOLD" |
|
|
| tech_signal = "HOLD" |
| required_ok = False |
| gate_failed = False |
| if use_technical_gate: |
| |
| try: |
| hist = df.iloc[max(0, pos_t - 600) : pos_t + 1] |
| scan = compute_scan_signals_for_df(sym, hist) |
| tech_signal = _normalize_signal(scan.technical_signal) |
| required_ok = bool((scan.gates or {}).get("required_ok")) |
| except Exception: |
| |
| tech_signal = "HOLD" |
| required_ok = False |
| gate_failed = True |
|
|
| final_signal = _combine_signals(ml_signal, tech_signal, required_ok) if use_technical_gate else ml_signal |
| |
| if gate_failed and final_signal == "BUY": |
| final_signal = "HOLD" |
|
|
| |
| if final_signal == "BUY": |
| try: |
| _mkt_up = bool(_market_uptrend.iloc[pos_t]) |
| except Exception: |
| _mkt_up = True |
| if not _mkt_up: |
| final_signal = "HOLD" |
|
|
| close_px = float(df_feat["Close"].iloc[pos_t]) |
|
|
| |
| exit_reason = "" |
|
|
| if position and shares > 0: |
| days_in_position += 1 |
| max_close_since_entry = close_px if max_close_since_entry is None else max(max_close_since_entry, close_px) |
|
|
| sl = None |
| tp = None |
| tr = None |
|
|
| |
| _atr_val = entry_atr if entry_atr is not None else 0.0 |
| _atr_sl_pct = (_atr_val * 2.0 / entry_price) if (entry_price and _atr_val > 0) else 0.0 |
| _atr_tp_pct = (_atr_val * 3.5 / entry_price) if (entry_price and _atr_val > 0) else 0.0 |
| _atr_tr_pct = (_atr_val * 2.5 / entry_price) if (entry_price and _atr_val > 0) else 0.0 |
|
|
| |
| effective_sl = stop_loss_pct |
| if _atr_sl_pct > 0 and (effective_sl is None or _atr_sl_pct < effective_sl): |
| effective_sl = _atr_sl_pct |
| effective_tp = take_profit_pct |
| if _atr_tp_pct > 0: |
| effective_tp = _atr_tp_pct if effective_tp is None else max(effective_tp, _atr_tp_pct) |
| effective_tr = trailing_stop_pct |
| if _atr_tr_pct > 0 and (effective_tr is None or effective_tr <= 0): |
| effective_tr = _atr_tr_pct |
|
|
| if entry_price is not None and effective_sl is not None and effective_sl > 0: |
| sl = entry_price * (1.0 - float(effective_sl)) |
| if entry_price is not None and effective_tp is not None and effective_tp > 0: |
| tp = entry_price * (1.0 + float(effective_tp)) |
| if effective_tr is not None and effective_tr > 0 and max_close_since_entry is not None: |
| tr = max_close_since_entry * (1.0 - float(effective_tr)) |
|
|
| |
| if str(exit_rule).lower() in {"fixed", "signal_or_fixed"}: |
| hold_limit = _parse_optional_int(max_hold_days) or int(days_ahead) |
| if days_in_position >= hold_limit: |
| exit_reason = "time_exit" |
|
|
| if str(exit_rule).lower() in {"signal", "signal_or_fixed"} and not exit_reason: |
| if final_signal == "SELL": |
| exit_reason = "signal_sell" |
|
|
| |
| if not exit_reason and sl is not None and close_px <= sl: |
| exit_reason = "stop_loss" |
| if not exit_reason and tp is not None and close_px >= tp: |
| exit_reason = "take_profit" |
| if not exit_reason and tr is not None and close_px <= tr: |
| exit_reason = "trailing_stop" |
|
|
| |
| if (final_signal == "BUY") and (not position): |
| equity_now = capital + (shares * close_px if shares > 0 else 0.0) |
|
|
| |
| _dv, _dvp = _get_vol_data(df_feat, pos_t) |
| _est_shares = int(capital // close_px) if close_px > 0 else 0 |
| cost_in = _dynamic_trade_cost_frac(_slippage_model, close_px, _est_shares, _dv, _dvp) |
| buy_px = close_px * (1.0 + cost_in) |
|
|
| |
| conviction = max(0.0, (_current_prob_up - 0.5) * 2.0) |
| adjusted_position_pct = max_position_pct * (0.3 + 0.7 * conviction) |
|
|
| new_shares = _position_size_shares( |
| equity=equity_now, |
| price=buy_px, |
| max_position_pct=adjusted_position_pct, |
| max_risk_per_trade_pct=max_risk_per_trade_pct, |
| stop_loss_pct=stop_loss_pct, |
| ) |
| |
| new_shares = int(min(new_shares, capital // buy_px)) |
|
|
| if new_shares > 0: |
| trade_value = float(new_shares * buy_px) |
| capital -= trade_value |
| total_trade_value += trade_value |
| shares = new_shares |
| position = True |
| entry_price = float(buy_px) |
| entry_date = _to_iso(date_t) |
| days_in_position = 0 |
| max_close_since_entry = close_px |
| |
| _raw_atr = df_feat["_atr_14"].iloc[pos_t] |
| entry_atr = float(_raw_atr) if np.isfinite(_raw_atr) else None |
| trades.append( |
| Trade(date=_to_iso(date_t), type="BUY", price=float(buy_px), shares=shares, capital=float(capital), reason="signal_buy") |
| ) |
|
|
| if exit_reason and position and shares > 0: |
| _dv, _dvp = _get_vol_data(df_feat, pos_t) |
| cost_out = _dynamic_trade_cost_frac(_slippage_model, close_px, shares, _dv, _dvp) |
| sell_px = close_px * (1.0 - cost_out) |
| trade_value = float(shares * sell_px) |
| capital += trade_value |
| total_trade_value += trade_value |
| trades.append( |
| Trade(date=_to_iso(date_t), type="SELL", price=float(sell_px), shares=shares, capital=float(capital), reason=exit_reason) |
| ) |
| shares = 0 |
| position = False |
| entry_price = None |
| entry_date = None |
| days_in_position = 0 |
| max_close_since_entry = None |
| entry_atr = None |
|
|
| |
| equity = capital + (shares * close_px if shares > 0 else 0.0) |
|
|
| actual_change = float(df_feat["target_return"].iloc[pos_t]) if np.isfinite(df_feat["target_return"].iloc[pos_t]) else np.nan |
|
|
| |
| sl_level = (entry_price * (1.0 - float(stop_loss_pct))) if (position and entry_price is not None and stop_loss_pct is not None and stop_loss_pct > 0) else np.nan |
| tp_level = (entry_price * (1.0 + float(take_profit_pct))) if (position and entry_price is not None and take_profit_pct is not None and take_profit_pct > 0) else np.nan |
| tr_level = ( |
| (max_close_since_entry * (1.0 - float(trailing_stop_pct))) |
| if (position and max_close_since_entry is not None and trailing_stop_pct is not None and trailing_stop_pct > 0) |
| else np.nan |
| ) |
|
|
| records.append( |
| { |
| "date": _to_iso(date_t), |
| "close": close_px, |
| "predicted_change_pct": predicted_change, |
| "actual_change_pct": actual_change, |
| "confidence": confidence_pct, |
| "r2": r2, |
| "mae": mae, |
| "ml_signal": ml_signal, |
| "technical_signal": tech_signal, |
| "required_ok": required_ok, |
| "final_signal": final_signal, |
| "position": int(position), |
| "shares": int(shares), |
| "entry_date": entry_date, |
| "entry_price": float(entry_price) if entry_price is not None else np.nan, |
| "days_in_position": int(days_in_position) if position else 0, |
| "stop_loss_level": sl_level, |
| "take_profit_level": tp_level, |
| "trailing_stop_level": tr_level, |
| "equity": float(equity), |
| } |
| ) |
|
|
| df_out = pd.DataFrame.from_records(records) |
| if df_out.empty: |
| raise RuntimeError("No walk-forward records produced (insufficient clean windows)") |
|
|
| |
| if position and shares > 0: |
| last_close = float(df_out["close"].iloc[-1]) |
| _dv_end, _dvp_end = _get_vol_data(df_feat, min(pos_t, len(df_feat) - 1)) |
| cost_out_end = _dynamic_trade_cost_frac(_slippage_model, last_close, shares, _dv_end, _dvp_end) |
| capital += shares * last_close * (1.0 - cost_out_end) |
| trade_value = float(shares * last_close * (1.0 - cost_out_end)) |
| total_trade_value += trade_value |
| trades.append( |
| Trade( |
| date=str(df_out["date"].iloc[-1]), |
| type="SELL", |
| price=float(last_close), |
| shares=shares, |
| capital=float(capital), |
| reason="end_of_period", |
| ) |
| ) |
| shares = 0 |
| position = False |
| df_out.loc[df_out.index[-1], "equity"] = float(capital) |
|
|
| |
| valid_eval = df_out[np.isfinite(df_out["actual_change_pct"])].copy() |
| _nz = (valid_eval["predicted_change_pct"] != 0) | (valid_eval["actual_change_pct"] != 0) |
| dir_acc = float(np.mean(np.sign(valid_eval.loc[_nz, "predicted_change_pct"]) == np.sign(valid_eval.loc[_nz, "actual_change_pct"]))) if _nz.sum() > 0 else 0.5 |
| pred_mae = float(np.mean(np.abs(valid_eval["predicted_change_pct"] - valid_eval["actual_change_pct"]))) |
|
|
| equity_series = df_out["equity"].astype(float) |
| daily_ret = equity_series.pct_change().dropna() |
|
|
| start_iso = str(df_out["date"].iloc[0]) |
| end_iso = str(df_out["date"].iloc[-1]) |
| years = _years_between(start_iso, end_iso) |
| avg_equity = float(equity_series.mean()) if len(equity_series) else float(initial_capital) |
| turnover_metrics = _turnover(total_trade_value=total_trade_value, avg_equity=avg_equity, years=years) |
|
|
| metrics: Dict[str, Any] = { |
| "symbol": sym, |
| "market_id": market_id, |
| "days_ahead": int(days_ahead), |
| "train_window": int(train_window), |
| "model_type": str(model_type), |
| "use_technical_gate": bool(use_technical_gate), |
| "exit_rule": str(exit_rule), |
| "max_hold_days": _parse_optional_int(max_hold_days), |
| "stop_loss_pct": float(stop_loss_pct) if stop_loss_pct is not None else None, |
| "take_profit_pct": float(take_profit_pct) if take_profit_pct is not None else None, |
| "trailing_stop_pct": float(trailing_stop_pct) if trailing_stop_pct is not None else None, |
| "max_position_pct": float(max_position_pct), |
| "max_risk_per_trade_pct": float(max_risk_per_trade_pct), |
| "cost_model": "dynamic_almgren_chriss", |
| "records": int(len(df_out)), |
| "direction_accuracy": dir_acc, |
| "prediction_mae_pct": pred_mae, |
| "final_capital": float(equity_series.iloc[-1]), |
| "total_return_pct": float((equity_series.iloc[-1] / float(initial_capital) - 1.0) * 100.0), |
| "cagr_pct": _cagr_pct(float(initial_capital), float(equity_series.iloc[-1]), years), |
| "max_drawdown_pct": _max_drawdown(equity_series), |
| "sharpe": _sharpe(daily_ret), |
| "trades_count": int(len(trades)), |
| "total_trade_value": float(total_trade_value), |
| "avg_equity": float(avg_equity), |
| **turnover_metrics, |
| } |
|
|
| |
| wins = 0 |
| buy_prices: List[float] = [] |
| for t in trades: |
| if t.type == "BUY": |
| buy_prices.append(t.price) |
| elif t.type == "SELL" and buy_prices: |
| bp = buy_prices.pop(0) |
| if t.price > bp: |
| wins += 1 |
| hit_rate = float((wins / max(1, len([t for t in trades if t.type == "SELL"])) * 100.0)) |
| metrics["win_rate_pct"] = hit_rate |
| metrics["hit_rate_pct"] = hit_rate |
|
|
| metrics["trades"] = [t.__dict__ for t in trades] |
|
|
| return df_out, metrics |
|
|
|
|
| def main() -> int: |
| p = argparse.ArgumentParser() |
| p.add_argument("--symbol", required=True) |
| p.add_argument("--market", choices=["bist", "us"], default=DEFAULT_MARKET_ID) |
| p.add_argument("--start", required=True, help="YYYY-MM-DD") |
| p.add_argument("--end", required=True, help="YYYY-MM-DD") |
| p.add_argument("--days-ahead", type=int, default=7) |
| p.add_argument("--train-window", type=int, default=504) |
| p.add_argument("--model", choices=["rf", "gbr"], default="rf") |
| p.add_argument("--no-tech-gate", action="store_true") |
| p.add_argument("--initial", type=float, default=100000.0) |
| p.add_argument("--commission-bps", type=float, default=10.0) |
| p.add_argument("--slippage-bps", type=float, default=10.0) |
| p.add_argument("--exit-rule", choices=["signal", "fixed", "signal_or_fixed"], default="signal") |
| p.add_argument("--max-hold-days", type=int, default=0, help="If >0, used by fixed exits; default uses days_ahead") |
| p.add_argument("--stop-loss-pct", type=float, default=0.0, help="e.g. 0.05 for 5%%") |
| p.add_argument("--take-profit-pct", type=float, default=0.0, help="e.g. 0.10 for 10%%") |
| p.add_argument("--trailing-stop-pct", type=float, default=0.0, help="e.g. 0.07 for 7%%") |
| p.add_argument("--max-position-pct", type=float, default=1.0, help="Max allocation fraction of equity, 0..1") |
| p.add_argument("--max-risk-per-trade-pct", type=float, default=1.0, help="Max stop-loss risk fraction of equity, 0..1") |
| p.add_argument("--out", default="walk_forward_out") |
|
|
| args = p.parse_args() |
|
|
| out_dir = Path(args.out) |
| out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| df_out, metrics = walk_forward_backtest( |
| symbol=args.symbol, |
| start_date=args.start, |
| end_date=args.end, |
| market_id=args.market, |
| days_ahead=args.days_ahead, |
| train_window=args.train_window, |
| model_type=args.model, |
| use_technical_gate=not args.no_tech_gate, |
| initial_capital=args.initial, |
| commission_bps=args.commission_bps, |
| slippage_bps=args.slippage_bps, |
| exit_rule=args.exit_rule, |
| max_hold_days=_parse_optional_int(args.max_hold_days), |
| stop_loss_pct=(args.stop_loss_pct if args.stop_loss_pct and args.stop_loss_pct > 0 else None), |
| take_profit_pct=(args.take_profit_pct if args.take_profit_pct and args.take_profit_pct > 0 else None), |
| trailing_stop_pct=(args.trailing_stop_pct if args.trailing_stop_pct and args.trailing_stop_pct > 0 else None), |
| max_position_pct=args.max_position_pct, |
| max_risk_per_trade_pct=args.max_risk_per_trade_pct, |
| ) |
|
|
| stamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") |
| base = f"{args.symbol.upper()}_{args.days_ahead}d_{stamp}" |
|
|
| df_path = out_dir / f"{base}_records.csv" |
| df_out.to_csv(df_path, index=False) |
|
|
| trades_path = out_dir / f"{base}_trades.csv" |
| pd.DataFrame.from_records(metrics.get("trades") or []).to_csv(trades_path, index=False) |
|
|
| |
| print("=== Walk-forward ML backtest ===") |
| for k in [ |
| "symbol", |
| "days_ahead", |
| "train_window", |
| "model_type", |
| "use_technical_gate", |
| "exit_rule", |
| "max_hold_days", |
| "stop_loss_pct", |
| "take_profit_pct", |
| "trailing_stop_pct", |
| "max_position_pct", |
| "max_risk_per_trade_pct", |
| "records", |
| "direction_accuracy", |
| "prediction_mae_pct", |
| "total_return_pct", |
| "cagr_pct", |
| "max_drawdown_pct", |
| "sharpe", |
| "trades_count", |
| "hit_rate_pct", |
| "turnover", |
| "turnover_annualized", |
| ]: |
| print(f"{k}: {metrics.get(k)}") |
|
|
| print(f"records_csv: {df_path}") |
| print(f"trades_csv: {trades_path}") |
|
|
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|