Spaces:
No application file
No application file
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| import MetaTrader5 as mt5 | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.metrics import accuracy_score | |
| ROOT = Path(__file__).resolve().parents[1] | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| from afml.cache import setup_production_cache | |
| from afml.production.model_export import complete_export_workflow | |
| TIMEFRAMES = { | |
| "M5": mt5.TIMEFRAME_M5, | |
| "M15": mt5.TIMEFRAME_M15, | |
| "H1": mt5.TIMEFRAME_H1, | |
| } | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser( | |
| description="Build production ONNX models from live MT5 bars and run latest inference." | |
| ) | |
| parser.add_argument("--symbols", nargs="+", default=["BTCUSD", "XAUUSD"]) | |
| parser.add_argument("--timeframe", default="M15", choices=sorted(TIMEFRAMES)) | |
| parser.add_argument("--days", type=int, default=180) | |
| parser.add_argument("--horizon", type=int, default=3) | |
| parser.add_argument("--out", default="production_models/live_mt5") | |
| return parser.parse_args() | |
| def initialize_production() -> dict: | |
| return setup_production_cache( | |
| enable_mlflow=True, | |
| mlflow_experiment="afml_live_production_build", | |
| mlflow_uri=None, | |
| max_cache_size_mb=1000, | |
| ) | |
| def fetch_rates(symbol: str, timeframe: str, start: datetime, end: datetime) -> pd.DataFrame: | |
| if not mt5.symbol_select(symbol, True): | |
| raise RuntimeError(f"Symbol unavailable in MT5 Market Watch: {symbol}") | |
| rates = mt5.copy_rates_range(symbol, TIMEFRAMES[timeframe], start, end) | |
| if rates is None or len(rates) < 300: | |
| raise RuntimeError(f"Not enough bars for {symbol} {timeframe}; last_error={mt5.last_error()}") | |
| data = pd.DataFrame(rates) | |
| data["time"] = pd.to_datetime(data["time"], unit="s", utc=True) | |
| data.set_index("time", inplace=True) | |
| if "tick_volume" in data.columns and "volume" not in data.columns: | |
| data["volume"] = data["tick_volume"] | |
| return data.sort_index() | |
| def rsi(close: pd.Series, window: int = 14) -> pd.Series: | |
| delta = close.diff() | |
| gain = delta.clip(lower=0).ewm(alpha=1 / window, adjust=False).mean() | |
| loss = -delta.clip(upper=0).ewm(alpha=1 / window, adjust=False).mean() | |
| rs = gain / loss.replace(0, np.nan) | |
| return 100 - (100 / (1 + rs)) | |
| def atr(data: pd.DataFrame, window: int = 14) -> pd.Series: | |
| high_low = data["high"] - data["low"] | |
| high_close = (data["high"] - data["close"].shift()).abs() | |
| low_close = (data["low"] - data["close"].shift()).abs() | |
| tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) | |
| return tr.ewm(alpha=1 / window, adjust=False).mean() | |
| def build_features(data: pd.DataFrame) -> pd.DataFrame: | |
| close = data["close"] | |
| features = pd.DataFrame(index=data.index) | |
| features["ret_1"] = close.pct_change(1) | |
| features["ret_3"] = close.pct_change(3) | |
| features["ret_6"] = close.pct_change(6) | |
| features["close_open"] = (data["close"] / data["open"]) - 1.0 | |
| features["range_hl"] = (data["high"] - data["low"]) / close.replace(0, np.nan) | |
| features["ma_10_rel"] = close / close.rolling(10).mean() - 1.0 | |
| features["ma_30_rel"] = close / close.rolling(30).mean() - 1.0 | |
| features["mom_12"] = close.pct_change(12) | |
| features["vol_20"] = close.pct_change().rolling(20).std() | |
| features["rsi_14"] = rsi(close, 14) / 100.0 | |
| features["atr_14_rel"] = atr(data, 14) / close.replace(0, np.nan) | |
| features["volume_z20"] = ( | |
| (data["volume"] - data["volume"].rolling(20).mean()) | |
| / data["volume"].rolling(20).std().replace(0, np.nan) | |
| ) | |
| return features.replace([np.inf, -np.inf], np.nan).dropna() | |
| def build_labels(data: pd.DataFrame, horizon: int) -> pd.Series: | |
| future_return = data["close"].shift(-horizon) / data["close"] - 1.0 | |
| return (future_return > 0).astype(int) | |
| def fit_model(features: pd.DataFrame, labels: pd.Series) -> tuple[RandomForestClassifier, dict]: | |
| common_index = features.index.intersection(labels.index) | |
| X = features.loc[common_index] | |
| y = labels.loc[common_index] | |
| split = int(len(X) * 0.8) | |
| if split < 100 or len(X) - split < 50: | |
| raise RuntimeError(f"Insufficient training/test split after feature build: {len(X)} rows") | |
| X_train, X_test = X.iloc[:split], X.iloc[split:] | |
| y_train, y_test = y.iloc[:split], y.iloc[split:] | |
| model = RandomForestClassifier( | |
| n_estimators=200, | |
| max_depth=8, | |
| min_samples_leaf=5, | |
| random_state=42, | |
| n_jobs=-1, | |
| ) | |
| model.fit(X_train, y_train) | |
| train_pred = model.predict(X_train) | |
| test_pred = model.predict(X_test) | |
| test_prob = model.predict_proba(X_test)[:, 1] | |
| metrics = { | |
| "rows": int(len(X)), | |
| "train_rows": int(len(X_train)), | |
| "test_rows": int(len(X_test)), | |
| "train_accuracy": float(accuracy_score(y_train, train_pred)), | |
| "test_accuracy": float(accuracy_score(y_test, test_pred)), | |
| "test_probability_mean": float(np.mean(test_prob)), | |
| "class_balance": float(y.mean()), | |
| } | |
| return model, metrics | |
| def latest_inference(model: RandomForestClassifier, features: pd.DataFrame) -> dict: | |
| latest = features.iloc[[-1]] | |
| prob_up = float(model.predict_proba(latest)[0, 1]) | |
| signal = "BUY" if prob_up >= 0.55 else "SELL" if prob_up <= 0.45 else "WAIT" | |
| return { | |
| "latest_feature_time_utc": latest.index[-1].isoformat(), | |
| "prob_up": prob_up, | |
| "signal": signal, | |
| } | |
| def run_symbol(symbol: str, timeframe: str, days: int, horizon: int, out_dir: Path) -> dict: | |
| end = datetime.now(timezone.utc) | |
| start = end - timedelta(days=days) | |
| bars = fetch_rates(symbol, timeframe, start, end) | |
| features = build_features(bars) | |
| labels = build_labels(bars, horizon) | |
| model, metrics = fit_model(features, labels) | |
| inference = latest_inference(model, features) | |
| export_dir = out_dir / symbol / timeframe | |
| export_path = complete_export_workflow( | |
| model, | |
| feature_names=list(features.columns), | |
| output_dir=str(export_dir), | |
| model_name=f"{symbol.lower()}_{timeframe.lower()}", | |
| ) | |
| if not export_path: | |
| raise RuntimeError(f"ONNX export failed for {symbol} {timeframe}") | |
| summary = { | |
| "symbol": symbol, | |
| "timeframe": timeframe, | |
| "bars": int(len(bars)), | |
| "feature_count": int(features.shape[1]), | |
| "features": list(features.columns), | |
| "metrics": metrics, | |
| "latest": inference, | |
| "onnx_path": export_path, | |
| "generated_at_utc": datetime.now(timezone.utc).isoformat(), | |
| } | |
| summary_path = export_dir / "summary.json" | |
| summary_path.write_text(json.dumps(summary, indent=2), encoding="utf-8") | |
| return summary | |
| def main() -> int: | |
| args = parse_args() | |
| out_dir = ROOT / args.out | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| initialize_production() | |
| if not mt5.initialize(): | |
| raise RuntimeError(f"MT5 initialization failed: {mt5.last_error()}") | |
| try: | |
| results = [run_symbol(symbol, args.timeframe, args.days, args.horizon, out_dir) for symbol in args.symbols] | |
| finally: | |
| mt5.shutdown() | |
| report_path = out_dir / "production_run_report.json" | |
| report_path.write_text(json.dumps(results, indent=2), encoding="utf-8") | |
| frame = pd.DataFrame( | |
| { | |
| "symbol": item["symbol"], | |
| "timeframe": item["timeframe"], | |
| "bars": item["bars"], | |
| "train_accuracy": item["metrics"]["train_accuracy"], | |
| "test_accuracy": item["metrics"]["test_accuracy"], | |
| "prob_up": item["latest"]["prob_up"], | |
| "signal": item["latest"]["signal"], | |
| "onnx_path": item["onnx_path"], | |
| } | |
| for item in results | |
| ) | |
| print(frame.to_string(index=False)) | |
| print(f"\nSaved production report to: {report_path}") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |