from __future__ import annotations import argparse import json import os import socket import subprocess import sys import time from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Callable import numpy as np import pandas as pd from sklearn.ensemble import RandomForestClassifier ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) def _guide_expensive_calculation(data: pd.DataFrame) -> pd.DataFrame: time.sleep(0.15) return data.rolling(5).mean().fillna(0) def row(results: list[dict[str, Any]], guide: str, step: str, status: str, **details: Any) -> None: item = {"guide": guide, "step": step, "status": status} item.update(details) results.append(item) def run_step(results: list[dict[str, Any]], guide: str, step: str, func: Callable[[], Any]) -> Any: try: out = func() row(results, guide, step, "pass", output=out) return out except Exception as exc: row(results, guide, step, "fail", error_type=type(exc).__name__, error=str(exc)) return None def cache_setup() -> dict[str, Any]: from afml.cache import get_comprehensive_cache_status, initialize_cache_system, setup_production_cache initialize_cache_system() components = setup_production_cache(enable_mlflow=True, max_cache_size_mb=500) status = get_comprehensive_cache_status() return { "core_cache": bool(components.get("core_cache")), "backtest_cache": bool(components.get("backtest_cache")), "monitor": bool(components.get("monitor")), "tracked_functions": status["core"]["functions_tracked"], } def cache_speedup() -> dict[str, Any]: from afml.cache import robust_cacheable expensive_calculation = robust_cacheable(_guide_expensive_calculation) data = pd.DataFrame(np.random.default_rng(42).normal(size=(500, 4))) t0 = time.perf_counter() first = expensive_calculation(data) first_seconds = time.perf_counter() - t0 t0 = time.perf_counter() second = expensive_calculation(data) second_seconds = time.perf_counter() - t0 return { "first_seconds": round(first_seconds, 4), "second_seconds": round(second_seconds, 4), "same_output": first.equals(second), } def mql5_bridge_smoke() -> dict[str, Any]: from afml.cache.mql5_bridge import MQL5Bridge, SignalPacket bridge = MQL5Bridge(host="127.0.0.1", port=0, mode="backtest") bridge.start_server() port = bridge.server_socket.getsockname()[1] try: with socket.create_connection(("127.0.0.1", port), timeout=3): connected = True signal = SignalPacket( timestamp=datetime.now(timezone.utc).isoformat(), symbol="XAUUSD", signal_type="BUY", entry_price=2400.0, stop_loss=2390.0, take_profit=2420.0, position_size=0.01, strategy_name="guide_smoke", confidence=0.75, ) queued = bridge.send_signal(signal) stats = bridge.get_performance_stats() return {"connected": connected, "queued": queued, "stats": stats} finally: bridge.stop() def gold_btc_mt5_smoke(days: int) -> dict[str, Any]: import MetaTrader5 as mt5 required = ["MT5_ACCOUNT_LIVE_LOGIN", "MT5_ACCOUNT_LIVE_PASSWORD", "MT5_ACCOUNT_LIVE_SERVER"] if not all(os.environ.get(name) for name in required): return {"skipped": "MT5_ACCOUNT_LIVE_* env vars are not set"} from afml.mt5.load_data import login_mt5 if not login_mt5("LIVE", verbose=False): raise RuntimeError("MT5 login failed") try: end = datetime.now(timezone.utc) start = end - timedelta(days=days) counts = {} for symbol in ("XAUUSD", "BTCUSD"): mt5.symbol_select(symbol, True) rates = mt5.copy_rates_range(symbol, mt5.TIMEFRAME_M5, start, end) counts[symbol] = 0 if rates is None else len(rates) return counts finally: mt5.shutdown() def onnx_export_smoke() -> dict[str, Any]: from afml.production.model_export import export_model_to_onnx out = ROOT / "diagnostics" / "guide_model.onnx" rng = np.random.default_rng(42) X = rng.normal(size=(200, 4)).astype(np.float32) y = (X[:, 0] + X[:, 1] * 0.25 > 0).astype(int) model = RandomForestClassifier(n_estimators=20, random_state=42).fit(X, y) ok = export_model_to_onnx( model, feature_names=["f0", "f1", "f2", "f3"], output_path=str(out), metadata={"source": "scripts/run_guides_steps.py"}, ) return {"exported": ok, "path": str(out), "exists": out.exists(), "size": out.stat().st_size if out.exists() else 0} def user_guide_workflow(timeout: int) -> dict[str, Any]: cmd = [sys.executable, str(ROOT / "afml" / "cache" / "guides" / "user_guide.py")] env = os.environ.copy() env["PYTHONPATH"] = str(ROOT) + os.pathsep + env.get("PYTHONPATH", "") env["PYTHONUTF8"] = "1" env["PYTHONIOENCODING"] = "utf-8" proc = subprocess.run( cmd, cwd=ROOT, env=env, capture_output=True, text=True, timeout=timeout, encoding="utf-8", errors="replace", ) return { "returncode": proc.returncode, "completed": proc.returncode == 0 and "Workflow completed successfully" in proc.stdout, "stdout_tail": proc.stdout[-1200:], "stderr_tail": proc.stderr[-1200:], } def guide_examples() -> dict[str, Any]: env = os.environ.copy() env["PYTHONPATH"] = str(ROOT) + os.pathsep + env.get("PYTHONPATH", "") env["PYTHONUTF8"] = "1" env["PYTHONIOENCODING"] = "utf-8" outputs = {} for filename in ("from afml.py", "from my_project.py"): path = ROOT / "afml" / "cache" / "guides" / filename proc = subprocess.run( [sys.executable, str(path)], cwd=ROOT, env=env, capture_output=True, text=True, timeout=30, encoding="utf-8", errors="replace", ) outputs[filename] = { "returncode": proc.returncode, "stdout": proc.stdout[-500:], "stderr": proc.stderr[-500:], } return outputs def markdown_inventory() -> dict[str, Any]: guide_dir = ROOT / "afml" / "cache" / "guides" docs = {} for path in sorted(guide_dir.glob("*.md")): text = path.read_text(encoding="utf-8", errors="replace") docs[path.name] = { "headings": sum(1 for line in text.splitlines() if line.startswith("#")), "python_blocks": text.count("```python"), "mql5_blocks": text.count("```mql5") + text.count("```cpp"), "status": "reference/blueprint; executable steps covered by guide runner where repo code exists", } return docs def main() -> int: parser = argparse.ArgumentParser(description="Implement and verify actionable steps from afml/cache/guides.") parser.add_argument("--include-mt5", action="store_true") parser.add_argument("--mt5-days", type=int, default=3) parser.add_argument("--include-user-guide", action="store_true") parser.add_argument("--timeout", type=int, default=90) parser.add_argument("--out", default="diagnostics/guides_steps_report.json") args = parser.parse_args() results: list[dict[str, Any]] = [] run_step(results, "from afml.py / from my_project.py", "cache decorator examples", guide_examples) run_step(results, "mql5_integration_guide.md", "python cache setup", cache_setup) run_step(results, "mql5_integration_guide.md", "cache speedup test", cache_speedup) run_step(results, "mql5_integration_guide.md", "MQL5 bridge startup/socket test", mql5_bridge_smoke) run_step(results, "Part 7.md / Part 7.1.md", "ONNX export and validation", onnx_export_smoke) run_step(results, "all markdown guides", "inventory reference steps", markdown_inventory) if args.include_mt5: run_step(results, "Gold/BTC guide extension", "live MT5 XAUUSD/BTCUSD bars", lambda: gold_btc_mt5_smoke(args.mt5_days)) if args.include_user_guide: run_step(results, "user_guide.py", "full 7-step cached ML workflow", lambda: user_guide_workflow(args.timeout)) payload = { "generated_at": datetime.now(timezone.utc).isoformat(), "results": results, } out = ROOT / args.out out.parent.mkdir(parents=True, exist_ok=True) out.write_text(json.dumps(payload, indent=2, default=str), encoding="utf-8") passed = sum(1 for r in results if r["status"] == "pass") failed = [r for r in results if r["status"] != "pass"] print(f"Guide steps passed: {passed}/{len(results)}") print(f"Report: {out}") if failed: print("Failures:") for item in failed: print(f"- {item['guide']} / {item['step']}: {item.get('error_type')} {item.get('error')}") return 1 if failed else 0 if __name__ == "__main__": raise SystemExit(main())