from __future__ import annotations import argparse import json import sys from datetime import datetime, timedelta, timezone from pathlib import Path import MetaTrader5 as mt5 import numpy as np import pandas as pd from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from afml.cache import setup_production_cache from afml.production.model_export import complete_export_workflow TIMEFRAMES = { "M5": mt5.TIMEFRAME_M5, "M15": mt5.TIMEFRAME_M15, "H1": mt5.TIMEFRAME_H1, } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Build production ONNX models from live MT5 bars and run latest inference." ) parser.add_argument("--symbols", nargs="+", default=["BTCUSD", "XAUUSD"]) parser.add_argument("--timeframe", default="M15", choices=sorted(TIMEFRAMES)) parser.add_argument("--days", type=int, default=180) parser.add_argument("--horizon", type=int, default=3) parser.add_argument("--out", default="production_models/live_mt5") return parser.parse_args() def initialize_production() -> dict: return setup_production_cache( enable_mlflow=True, mlflow_experiment="afml_live_production_build", mlflow_uri=None, max_cache_size_mb=1000, ) def fetch_rates(symbol: str, timeframe: str, start: datetime, end: datetime) -> pd.DataFrame: if not mt5.symbol_select(symbol, True): raise RuntimeError(f"Symbol unavailable in MT5 Market Watch: {symbol}") rates = mt5.copy_rates_range(symbol, TIMEFRAMES[timeframe], start, end) if rates is None or len(rates) < 300: raise RuntimeError(f"Not enough bars for {symbol} {timeframe}; last_error={mt5.last_error()}") data = pd.DataFrame(rates) data["time"] = pd.to_datetime(data["time"], unit="s", utc=True) data.set_index("time", inplace=True) if "tick_volume" in data.columns and "volume" not in data.columns: data["volume"] = data["tick_volume"] return data.sort_index() def rsi(close: pd.Series, window: int = 14) -> pd.Series: delta = close.diff() gain = delta.clip(lower=0).ewm(alpha=1 / window, adjust=False).mean() loss = -delta.clip(upper=0).ewm(alpha=1 / window, adjust=False).mean() rs = gain / loss.replace(0, np.nan) return 100 - (100 / (1 + rs)) def atr(data: pd.DataFrame, window: int = 14) -> pd.Series: high_low = data["high"] - data["low"] high_close = (data["high"] - data["close"].shift()).abs() low_close = (data["low"] - data["close"].shift()).abs() tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) return tr.ewm(alpha=1 / window, adjust=False).mean() def build_features(data: pd.DataFrame) -> pd.DataFrame: close = data["close"] features = pd.DataFrame(index=data.index) features["ret_1"] = close.pct_change(1) features["ret_3"] = close.pct_change(3) features["ret_6"] = close.pct_change(6) features["close_open"] = (data["close"] / data["open"]) - 1.0 features["range_hl"] = (data["high"] - data["low"]) / close.replace(0, np.nan) features["ma_10_rel"] = close / close.rolling(10).mean() - 1.0 features["ma_30_rel"] = close / close.rolling(30).mean() - 1.0 features["mom_12"] = close.pct_change(12) features["vol_20"] = close.pct_change().rolling(20).std() features["rsi_14"] = rsi(close, 14) / 100.0 features["atr_14_rel"] = atr(data, 14) / close.replace(0, np.nan) features["volume_z20"] = ( (data["volume"] - data["volume"].rolling(20).mean()) / data["volume"].rolling(20).std().replace(0, np.nan) ) return features.replace([np.inf, -np.inf], np.nan).dropna() def build_labels(data: pd.DataFrame, horizon: int) -> pd.Series: future_return = data["close"].shift(-horizon) / data["close"] - 1.0 return (future_return > 0).astype(int) def fit_model(features: pd.DataFrame, labels: pd.Series) -> tuple[RandomForestClassifier, dict]: common_index = features.index.intersection(labels.index) X = features.loc[common_index] y = labels.loc[common_index] split = int(len(X) * 0.8) if split < 100 or len(X) - split < 50: raise RuntimeError(f"Insufficient training/test split after feature build: {len(X)} rows") X_train, X_test = X.iloc[:split], X.iloc[split:] y_train, y_test = y.iloc[:split], y.iloc[split:] model = RandomForestClassifier( n_estimators=200, max_depth=8, min_samples_leaf=5, random_state=42, n_jobs=-1, ) model.fit(X_train, y_train) train_pred = model.predict(X_train) test_pred = model.predict(X_test) test_prob = model.predict_proba(X_test)[:, 1] metrics = { "rows": int(len(X)), "train_rows": int(len(X_train)), "test_rows": int(len(X_test)), "train_accuracy": float(accuracy_score(y_train, train_pred)), "test_accuracy": float(accuracy_score(y_test, test_pred)), "test_probability_mean": float(np.mean(test_prob)), "class_balance": float(y.mean()), } return model, metrics def latest_inference(model: RandomForestClassifier, features: pd.DataFrame) -> dict: latest = features.iloc[[-1]] prob_up = float(model.predict_proba(latest)[0, 1]) signal = "BUY" if prob_up >= 0.55 else "SELL" if prob_up <= 0.45 else "WAIT" return { "latest_feature_time_utc": latest.index[-1].isoformat(), "prob_up": prob_up, "signal": signal, } def run_symbol(symbol: str, timeframe: str, days: int, horizon: int, out_dir: Path) -> dict: end = datetime.now(timezone.utc) start = end - timedelta(days=days) bars = fetch_rates(symbol, timeframe, start, end) features = build_features(bars) labels = build_labels(bars, horizon) model, metrics = fit_model(features, labels) inference = latest_inference(model, features) export_dir = out_dir / symbol / timeframe export_path = complete_export_workflow( model, feature_names=list(features.columns), output_dir=str(export_dir), model_name=f"{symbol.lower()}_{timeframe.lower()}", ) if not export_path: raise RuntimeError(f"ONNX export failed for {symbol} {timeframe}") summary = { "symbol": symbol, "timeframe": timeframe, "bars": int(len(bars)), "feature_count": int(features.shape[1]), "features": list(features.columns), "metrics": metrics, "latest": inference, "onnx_path": export_path, "generated_at_utc": datetime.now(timezone.utc).isoformat(), } summary_path = export_dir / "summary.json" summary_path.write_text(json.dumps(summary, indent=2), encoding="utf-8") return summary def main() -> int: args = parse_args() out_dir = ROOT / args.out out_dir.mkdir(parents=True, exist_ok=True) initialize_production() if not mt5.initialize(): raise RuntimeError(f"MT5 initialization failed: {mt5.last_error()}") try: results = [run_symbol(symbol, args.timeframe, args.days, args.horizon, out_dir) for symbol in args.symbols] finally: mt5.shutdown() report_path = out_dir / "production_run_report.json" report_path.write_text(json.dumps(results, indent=2), encoding="utf-8") frame = pd.DataFrame( { "symbol": item["symbol"], "timeframe": item["timeframe"], "bars": item["bars"], "train_accuracy": item["metrics"]["train_accuracy"], "test_accuracy": item["metrics"]["test_accuracy"], "prob_up": item["latest"]["prob_up"], "signal": item["latest"]["signal"], "onnx_path": item["onnx_path"], } for item in results ) print(frame.to_string(index=False)) print(f"\nSaved production report to: {report_path}") return 0 if __name__ == "__main__": raise SystemExit(main())