from __future__ import annotations import argparse import importlib import inspect import json import pkgutil import sys import traceback from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Callable import numpy as np import pandas as pd ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) def json_default(value: Any) -> str: return str(value) def record(results: list[dict], name: str, status: str, **details: Any) -> None: row = {"name": name, "status": status} row.update(details) results.append(row) def discover_modules() -> list[str]: import afml return [ module.name for module in pkgutil.walk_packages(afml.__path__, afml.__name__ + ".") if ".guides." not in module.name ] def import_inventory(module_names: list[str]) -> tuple[list[dict], dict[str, Any]]: results: list[dict] = [] inventory: dict[str, Any] = {} for name in module_names: try: module = importlib.import_module(name) functions = [] classes = [] for obj_name, obj in inspect.getmembers(module): if getattr(obj, "__module__", None) != module.__name__: continue if inspect.isfunction(obj): functions.append(obj_name) elif inspect.isclass(obj): classes.append(obj_name) inventory[name] = {"functions": functions, "classes": classes} record( results, name, "pass", functions=len(functions), classes=len(classes), ) except Exception as exc: record( results, name, "fail", error_type=type(exc).__name__, error=str(exc), traceback=traceback.format_exc(limit=5), ) return results, inventory def run_smoke(name: str, func: Callable[[], Any], results: list[dict]) -> Any: try: output = func() record(results, name, "pass", output=output) return output except Exception as exc: record( results, name, "fail", error_type=type(exc).__name__, error=str(exc), traceback=traceback.format_exc(limit=5), ) return None def smoke_tests(include_mt5: bool, mt5_account: str, mt5_days: int) -> list[dict]: results: list[dict] = [] def datasets_smoke() -> dict: from afml.datasets import load_dollar_bar_sample, load_stock_prices, load_tick_sample tick = load_tick_sample() stock = load_stock_prices() dollar = load_dollar_bar_sample() return { "tick": tick.shape, "stock": stock.shape, "dollar": dollar.shape, } data = run_smoke("datasets.load_samples", datasets_smoke, results) def bars_smoke() -> dict: from afml.data_structures.bars import make_bars from afml.datasets import load_tick_sample tick = load_tick_sample() bars = make_bars(tick, bar_type="tick", bar_size=20, price="mid_price", verbose=False) time_bars = make_bars(tick, bar_type="time", bar_size="M1", price="mid_price", verbose=False) return {"tick_bars": bars.shape, "time_bars": time_bars.shape} run_smoke("data_structures.make_bars", bars_smoke, results) def features_smoke() -> dict: from afml.datasets import load_stock_prices from afml.features.fracdiff import frac_diff_ffd from afml.features.returns import get_lagged_returns, get_return_dist_features from afml.features.stationary import is_stationary from afml.features.trading_session import get_time_features close = load_stock_prices()["SPY"].dropna().tail(400) lagged = get_lagged_returns(close, lags=[1, 5, 20], nperiods=2) dist = get_return_dist_features(close, window=20) time_features = get_time_features(close.to_frame("close"), timeframe="D1") frac = frac_diff_ffd(close.to_frame("SPY"), d=0.5, thres=1e-3) stationary = is_stationary(close.pct_change().dropna().to_frame("SPY_returns"), verbose=False) return { "lagged": lagged.shape, "dist": dist.shape, "time_features": time_features.shape, "frac_diff": frac.shape, "is_stationary": stationary, } run_smoke("features.core", features_smoke, results) def filters_smoke() -> dict: from afml.datasets import load_stock_prices from afml.filters.filters import cusum_filter, z_score_filter close = load_stock_prices()["SPY"].dropna().tail(400) events = cusum_filter(close, threshold=0.01, time_stamps=True) z_events = z_score_filter(close.pct_change().dropna(), mean_window=20, std_window=20) return {"cusum_events": len(events), "z_score_events": len(z_events)} run_smoke("filters.core", filters_smoke, results) def labeling_smoke() -> dict: from afml.datasets import load_stock_prices from afml.filters.filters import cusum_filter from afml.labeling.fixed_time_horizon import fixed_time_horizon from afml.labeling.trend_scanning import trend_scanning_labels from afml.labeling.triple_barrier import add_vertical_barrier, get_events from afml.util.volatility import get_daily_vol close = load_stock_prices()["SPY"].dropna().tail(500) target = get_daily_vol(close, lookback=50).dropna() t_events = cusum_filter(close.loc[target.index], threshold=0.01, time_stamps=True) t_events = t_events.intersection(target.index)[:30] t1 = add_vertical_barrier(t_events, close, days=5) events = get_events(close, t_events, pt_sl=[1, 1], target=target, min_ret=0.001, vertical_barrier_times=t1) fixed = fixed_time_horizon(close, threshold=0.001, lag=True) trend = trend_scanning_labels(close, span=(5, 20)) return {"triple_events": events.shape, "fixed": fixed.shape, "trend": trend.shape} run_smoke("labeling.core", labeling_smoke, results) def sampling_smoke() -> dict: from afml.sample_weights.attribution import get_weights_by_return from afml.sampling.concurrent import num_concurrent_events idx = pd.date_range("2026-01-01", periods=20, freq="h", tz="UTC") close = pd.Series(np.linspace(100, 110, len(idx)), index=idx) t1 = pd.Series(idx[5:15], index=idx[:10]) events = pd.DataFrame({"t1": t1}) concurrent = num_concurrent_events(close.index, t1, t1.index) weights = get_weights_by_return(events, close, num_threads=1, num_conc_events=concurrent, verbose=False) return {"concurrent": concurrent.shape, "weights": weights.shape} run_smoke("sampling.sample_weights", sampling_smoke, results) def cross_validation_smoke() -> dict: from afml.cross_validation.cross_validation import PurgedKFold, ml_get_train_times from afml.cross_validation.pbo import compute_pbo idx = pd.date_range("2026-01-01", periods=60, freq="h", tz="UTC") X = pd.DataFrame({"x": np.arange(len(idx))}, index=idx) y = pd.Series((np.arange(len(idx)) % 2), index=idx) t1 = pd.Series(idx.shift(2, freq="h"), index=idx) cv = PurgedKFold(n_splits=3, t1=t1, pct_embargo=0.01) splits = list(cv.split(X, y)) train_times = ml_get_train_times(t1, t1.iloc[:5]) returns = pd.DataFrame( { "strategy_a": np.sin(np.arange(len(idx)) / 5) / 100, "strategy_b": np.cos(np.arange(len(idx)) / 7) / 100, }, index=idx, ) pbo = compute_pbo(returns, t1, n_folds=4) return {"splits": len(splits), "train_times": train_times.shape, "pbo": pbo["pbo"]} run_smoke("cross_validation.core", cross_validation_smoke, results) def strategies_smoke() -> dict: from afml.datasets import load_dollar_bar_sample from afml.strategies.trading_strategies import BollingerStrategy, MACrossoverStrategy bars = load_dollar_bar_sample().tail(300) boll = BollingerStrategy(window=20, std=2).generate_signals(bars) ma = MACrossoverStrategy(fast_window=10, slow_window=30).generate_signals(bars) return {"bollinger_signals": boll.value_counts().to_dict(), "ma_signals": ma.value_counts().to_dict()} run_smoke("strategies.basic", strategies_smoke, results) def mt5_smoke() -> dict: import MetaTrader5 as mt5 from afml.mt5.load_data import login_mt5 if not login_mt5(mt5_account, verbose=False): raise RuntimeError("MT5 login failed") try: end = datetime.now(timezone.utc) start = end - timedelta(days=mt5_days) out: dict[str, Any] = {} for symbol in ("XAUUSD", "BTCUSD"): if not mt5.symbol_select(symbol, True): out[symbol] = "symbol_select failed" continue rates = mt5.copy_rates_range(symbol, mt5.TIMEFRAME_M5, start, end) out[symbol] = 0 if rates is None else len(rates) return out finally: mt5.shutdown() if include_mt5: run_smoke("mt5.live_xau_btc_m5", mt5_smoke, results) return results def main() -> int: parser = argparse.ArgumentParser(description="Run broad AFML diagnostics.") parser.add_argument("--include-mt5", action="store_true") parser.add_argument("--mt5-account", default="LIVE") parser.add_argument("--mt5-days", type=int, default=3) parser.add_argument("--out", default="diagnostics") args = parser.parse_args() out_dir = ROOT / args.out out_dir.mkdir(parents=True, exist_ok=True) module_names = discover_modules() import_results, inventory = import_inventory(module_names) smoke_results = smoke_tests(args.include_mt5, args.mt5_account, args.mt5_days) payload = { "generated_at": datetime.now(timezone.utc).isoformat(), "module_count": len(module_names), "import_results": import_results, "inventory": inventory, "smoke_results": smoke_results, } (out_dir / "full_project_diagnostics.json").write_text( json.dumps(payload, indent=2, default=json_default), encoding="utf-8", ) pd.DataFrame(import_results).to_csv(out_dir / "module_imports.csv", index=False) pd.DataFrame(smoke_results).to_csv(out_dir / "smoke_tests.csv", index=False) import_failures = [r for r in import_results if r["status"] != "pass"] smoke_failures = [r for r in smoke_results if r["status"] != "pass"] print(f"Modules discovered: {len(module_names)}") print(f"Module import failures: {len(import_failures)}") print(f"Smoke tests: {len(smoke_results)}") print(f"Smoke failures: {len(smoke_failures)}") print(f"Reports saved to: {out_dir}") if import_failures: print("\nImport failures:") for failure in import_failures: print(f"- {failure['name']}: {failure.get('error_type')} - {failure.get('error')}") if smoke_failures: print("\nSmoke failures:") for failure in smoke_failures: print(f"- {failure['name']}: {failure.get('error_type')} - {failure.get('error')}") return 1 if smoke_failures else 0 if __name__ == "__main__": raise SystemExit(main())