AFML / scripts /run_production_build.py
akshayboora's picture
Upload 940 files
669d6a1 verified
from __future__ import annotations
import argparse
import json
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
import MetaTrader5 as mt5
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from afml.cache import setup_production_cache
from afml.production.model_export import complete_export_workflow
TIMEFRAMES = {
"M5": mt5.TIMEFRAME_M5,
"M15": mt5.TIMEFRAME_M15,
"H1": mt5.TIMEFRAME_H1,
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Build production ONNX models from live MT5 bars and run latest inference."
)
parser.add_argument("--symbols", nargs="+", default=["BTCUSD", "XAUUSD"])
parser.add_argument("--timeframe", default="M15", choices=sorted(TIMEFRAMES))
parser.add_argument("--days", type=int, default=180)
parser.add_argument("--horizon", type=int, default=3)
parser.add_argument("--out", default="production_models/live_mt5")
return parser.parse_args()
def initialize_production() -> dict:
return setup_production_cache(
enable_mlflow=True,
mlflow_experiment="afml_live_production_build",
mlflow_uri=None,
max_cache_size_mb=1000,
)
def fetch_rates(symbol: str, timeframe: str, start: datetime, end: datetime) -> pd.DataFrame:
if not mt5.symbol_select(symbol, True):
raise RuntimeError(f"Symbol unavailable in MT5 Market Watch: {symbol}")
rates = mt5.copy_rates_range(symbol, TIMEFRAMES[timeframe], start, end)
if rates is None or len(rates) < 300:
raise RuntimeError(f"Not enough bars for {symbol} {timeframe}; last_error={mt5.last_error()}")
data = pd.DataFrame(rates)
data["time"] = pd.to_datetime(data["time"], unit="s", utc=True)
data.set_index("time", inplace=True)
if "tick_volume" in data.columns and "volume" not in data.columns:
data["volume"] = data["tick_volume"]
return data.sort_index()
def rsi(close: pd.Series, window: int = 14) -> pd.Series:
delta = close.diff()
gain = delta.clip(lower=0).ewm(alpha=1 / window, adjust=False).mean()
loss = -delta.clip(upper=0).ewm(alpha=1 / window, adjust=False).mean()
rs = gain / loss.replace(0, np.nan)
return 100 - (100 / (1 + rs))
def atr(data: pd.DataFrame, window: int = 14) -> pd.Series:
high_low = data["high"] - data["low"]
high_close = (data["high"] - data["close"].shift()).abs()
low_close = (data["low"] - data["close"].shift()).abs()
tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
return tr.ewm(alpha=1 / window, adjust=False).mean()
def build_features(data: pd.DataFrame) -> pd.DataFrame:
close = data["close"]
features = pd.DataFrame(index=data.index)
features["ret_1"] = close.pct_change(1)
features["ret_3"] = close.pct_change(3)
features["ret_6"] = close.pct_change(6)
features["close_open"] = (data["close"] / data["open"]) - 1.0
features["range_hl"] = (data["high"] - data["low"]) / close.replace(0, np.nan)
features["ma_10_rel"] = close / close.rolling(10).mean() - 1.0
features["ma_30_rel"] = close / close.rolling(30).mean() - 1.0
features["mom_12"] = close.pct_change(12)
features["vol_20"] = close.pct_change().rolling(20).std()
features["rsi_14"] = rsi(close, 14) / 100.0
features["atr_14_rel"] = atr(data, 14) / close.replace(0, np.nan)
features["volume_z20"] = (
(data["volume"] - data["volume"].rolling(20).mean())
/ data["volume"].rolling(20).std().replace(0, np.nan)
)
return features.replace([np.inf, -np.inf], np.nan).dropna()
def build_labels(data: pd.DataFrame, horizon: int) -> pd.Series:
future_return = data["close"].shift(-horizon) / data["close"] - 1.0
return (future_return > 0).astype(int)
def fit_model(features: pd.DataFrame, labels: pd.Series) -> tuple[RandomForestClassifier, dict]:
common_index = features.index.intersection(labels.index)
X = features.loc[common_index]
y = labels.loc[common_index]
split = int(len(X) * 0.8)
if split < 100 or len(X) - split < 50:
raise RuntimeError(f"Insufficient training/test split after feature build: {len(X)} rows")
X_train, X_test = X.iloc[:split], X.iloc[split:]
y_train, y_test = y.iloc[:split], y.iloc[split:]
model = RandomForestClassifier(
n_estimators=200,
max_depth=8,
min_samples_leaf=5,
random_state=42,
n_jobs=-1,
)
model.fit(X_train, y_train)
train_pred = model.predict(X_train)
test_pred = model.predict(X_test)
test_prob = model.predict_proba(X_test)[:, 1]
metrics = {
"rows": int(len(X)),
"train_rows": int(len(X_train)),
"test_rows": int(len(X_test)),
"train_accuracy": float(accuracy_score(y_train, train_pred)),
"test_accuracy": float(accuracy_score(y_test, test_pred)),
"test_probability_mean": float(np.mean(test_prob)),
"class_balance": float(y.mean()),
}
return model, metrics
def latest_inference(model: RandomForestClassifier, features: pd.DataFrame) -> dict:
latest = features.iloc[[-1]]
prob_up = float(model.predict_proba(latest)[0, 1])
signal = "BUY" if prob_up >= 0.55 else "SELL" if prob_up <= 0.45 else "WAIT"
return {
"latest_feature_time_utc": latest.index[-1].isoformat(),
"prob_up": prob_up,
"signal": signal,
}
def run_symbol(symbol: str, timeframe: str, days: int, horizon: int, out_dir: Path) -> dict:
end = datetime.now(timezone.utc)
start = end - timedelta(days=days)
bars = fetch_rates(symbol, timeframe, start, end)
features = build_features(bars)
labels = build_labels(bars, horizon)
model, metrics = fit_model(features, labels)
inference = latest_inference(model, features)
export_dir = out_dir / symbol / timeframe
export_path = complete_export_workflow(
model,
feature_names=list(features.columns),
output_dir=str(export_dir),
model_name=f"{symbol.lower()}_{timeframe.lower()}",
)
if not export_path:
raise RuntimeError(f"ONNX export failed for {symbol} {timeframe}")
summary = {
"symbol": symbol,
"timeframe": timeframe,
"bars": int(len(bars)),
"feature_count": int(features.shape[1]),
"features": list(features.columns),
"metrics": metrics,
"latest": inference,
"onnx_path": export_path,
"generated_at_utc": datetime.now(timezone.utc).isoformat(),
}
summary_path = export_dir / "summary.json"
summary_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
return summary
def main() -> int:
args = parse_args()
out_dir = ROOT / args.out
out_dir.mkdir(parents=True, exist_ok=True)
initialize_production()
if not mt5.initialize():
raise RuntimeError(f"MT5 initialization failed: {mt5.last_error()}")
try:
results = [run_symbol(symbol, args.timeframe, args.days, args.horizon, out_dir) for symbol in args.symbols]
finally:
mt5.shutdown()
report_path = out_dir / "production_run_report.json"
report_path.write_text(json.dumps(results, indent=2), encoding="utf-8")
frame = pd.DataFrame(
{
"symbol": item["symbol"],
"timeframe": item["timeframe"],
"bars": item["bars"],
"train_accuracy": item["metrics"]["train_accuracy"],
"test_accuracy": item["metrics"]["test_accuracy"],
"prob_up": item["latest"]["prob_up"],
"signal": item["latest"]["signal"],
"onnx_path": item["onnx_path"],
}
for item in results
)
print(frame.to_string(index=False))
print(f"\nSaved production report to: {report_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())