Spaces:
No application file
No application file
| from __future__ import annotations | |
| import argparse | |
| import importlib | |
| import inspect | |
| import json | |
| import pkgutil | |
| import sys | |
| import traceback | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| from typing import Any, Callable | |
| import numpy as np | |
| import pandas as pd | |
| ROOT = Path(__file__).resolve().parents[1] | |
| sys.path.insert(0, str(ROOT)) | |
| def json_default(value: Any) -> str: | |
| return str(value) | |
| def record(results: list[dict], name: str, status: str, **details: Any) -> None: | |
| row = {"name": name, "status": status} | |
| row.update(details) | |
| results.append(row) | |
| def discover_modules() -> list[str]: | |
| import afml | |
| return [ | |
| module.name | |
| for module in pkgutil.walk_packages(afml.__path__, afml.__name__ + ".") | |
| if ".guides." not in module.name | |
| ] | |
| def import_inventory(module_names: list[str]) -> tuple[list[dict], dict[str, Any]]: | |
| results: list[dict] = [] | |
| inventory: dict[str, Any] = {} | |
| for name in module_names: | |
| try: | |
| module = importlib.import_module(name) | |
| functions = [] | |
| classes = [] | |
| for obj_name, obj in inspect.getmembers(module): | |
| if getattr(obj, "__module__", None) != module.__name__: | |
| continue | |
| if inspect.isfunction(obj): | |
| functions.append(obj_name) | |
| elif inspect.isclass(obj): | |
| classes.append(obj_name) | |
| inventory[name] = {"functions": functions, "classes": classes} | |
| record( | |
| results, | |
| name, | |
| "pass", | |
| functions=len(functions), | |
| classes=len(classes), | |
| ) | |
| except Exception as exc: | |
| record( | |
| results, | |
| name, | |
| "fail", | |
| error_type=type(exc).__name__, | |
| error=str(exc), | |
| traceback=traceback.format_exc(limit=5), | |
| ) | |
| return results, inventory | |
| def run_smoke(name: str, func: Callable[[], Any], results: list[dict]) -> Any: | |
| try: | |
| output = func() | |
| record(results, name, "pass", output=output) | |
| return output | |
| except Exception as exc: | |
| record( | |
| results, | |
| name, | |
| "fail", | |
| error_type=type(exc).__name__, | |
| error=str(exc), | |
| traceback=traceback.format_exc(limit=5), | |
| ) | |
| return None | |
| def smoke_tests(include_mt5: bool, mt5_account: str, mt5_days: int) -> list[dict]: | |
| results: list[dict] = [] | |
| def datasets_smoke() -> dict: | |
| from afml.datasets import load_dollar_bar_sample, load_stock_prices, load_tick_sample | |
| tick = load_tick_sample() | |
| stock = load_stock_prices() | |
| dollar = load_dollar_bar_sample() | |
| return { | |
| "tick": tick.shape, | |
| "stock": stock.shape, | |
| "dollar": dollar.shape, | |
| } | |
| data = run_smoke("datasets.load_samples", datasets_smoke, results) | |
| def bars_smoke() -> dict: | |
| from afml.data_structures.bars import make_bars | |
| from afml.datasets import load_tick_sample | |
| tick = load_tick_sample() | |
| bars = make_bars(tick, bar_type="tick", bar_size=20, price="mid_price", verbose=False) | |
| time_bars = make_bars(tick, bar_type="time", bar_size="M1", price="mid_price", verbose=False) | |
| return {"tick_bars": bars.shape, "time_bars": time_bars.shape} | |
| run_smoke("data_structures.make_bars", bars_smoke, results) | |
| def features_smoke() -> dict: | |
| from afml.datasets import load_stock_prices | |
| from afml.features.fracdiff import frac_diff_ffd | |
| from afml.features.returns import get_lagged_returns, get_return_dist_features | |
| from afml.features.stationary import is_stationary | |
| from afml.features.trading_session import get_time_features | |
| close = load_stock_prices()["SPY"].dropna().tail(400) | |
| lagged = get_lagged_returns(close, lags=[1, 5, 20], nperiods=2) | |
| dist = get_return_dist_features(close, window=20) | |
| time_features = get_time_features(close.to_frame("close"), timeframe="D1") | |
| frac = frac_diff_ffd(close.to_frame("SPY"), d=0.5, thres=1e-3) | |
| stationary = is_stationary(close.pct_change().dropna().to_frame("SPY_returns"), verbose=False) | |
| return { | |
| "lagged": lagged.shape, | |
| "dist": dist.shape, | |
| "time_features": time_features.shape, | |
| "frac_diff": frac.shape, | |
| "is_stationary": stationary, | |
| } | |
| run_smoke("features.core", features_smoke, results) | |
| def filters_smoke() -> dict: | |
| from afml.datasets import load_stock_prices | |
| from afml.filters.filters import cusum_filter, z_score_filter | |
| close = load_stock_prices()["SPY"].dropna().tail(400) | |
| events = cusum_filter(close, threshold=0.01, time_stamps=True) | |
| z_events = z_score_filter(close.pct_change().dropna(), mean_window=20, std_window=20) | |
| return {"cusum_events": len(events), "z_score_events": len(z_events)} | |
| run_smoke("filters.core", filters_smoke, results) | |
| def labeling_smoke() -> dict: | |
| from afml.datasets import load_stock_prices | |
| from afml.filters.filters import cusum_filter | |
| from afml.labeling.fixed_time_horizon import fixed_time_horizon | |
| from afml.labeling.trend_scanning import trend_scanning_labels | |
| from afml.labeling.triple_barrier import add_vertical_barrier, get_events | |
| from afml.util.volatility import get_daily_vol | |
| close = load_stock_prices()["SPY"].dropna().tail(500) | |
| target = get_daily_vol(close, lookback=50).dropna() | |
| t_events = cusum_filter(close.loc[target.index], threshold=0.01, time_stamps=True) | |
| t_events = t_events.intersection(target.index)[:30] | |
| t1 = add_vertical_barrier(t_events, close, days=5) | |
| events = get_events(close, t_events, pt_sl=[1, 1], target=target, min_ret=0.001, vertical_barrier_times=t1) | |
| fixed = fixed_time_horizon(close, threshold=0.001, lag=True) | |
| trend = trend_scanning_labels(close, span=(5, 20)) | |
| return {"triple_events": events.shape, "fixed": fixed.shape, "trend": trend.shape} | |
| run_smoke("labeling.core", labeling_smoke, results) | |
| def sampling_smoke() -> dict: | |
| from afml.sample_weights.attribution import get_weights_by_return | |
| from afml.sampling.concurrent import num_concurrent_events | |
| idx = pd.date_range("2026-01-01", periods=20, freq="h", tz="UTC") | |
| close = pd.Series(np.linspace(100, 110, len(idx)), index=idx) | |
| t1 = pd.Series(idx[5:15], index=idx[:10]) | |
| events = pd.DataFrame({"t1": t1}) | |
| concurrent = num_concurrent_events(close.index, t1, t1.index) | |
| weights = get_weights_by_return(events, close, num_threads=1, num_conc_events=concurrent, verbose=False) | |
| return {"concurrent": concurrent.shape, "weights": weights.shape} | |
| run_smoke("sampling.sample_weights", sampling_smoke, results) | |
| def cross_validation_smoke() -> dict: | |
| from afml.cross_validation.cross_validation import PurgedKFold, ml_get_train_times | |
| from afml.cross_validation.pbo import compute_pbo | |
| idx = pd.date_range("2026-01-01", periods=60, freq="h", tz="UTC") | |
| X = pd.DataFrame({"x": np.arange(len(idx))}, index=idx) | |
| y = pd.Series((np.arange(len(idx)) % 2), index=idx) | |
| t1 = pd.Series(idx.shift(2, freq="h"), index=idx) | |
| cv = PurgedKFold(n_splits=3, t1=t1, pct_embargo=0.01) | |
| splits = list(cv.split(X, y)) | |
| train_times = ml_get_train_times(t1, t1.iloc[:5]) | |
| returns = pd.DataFrame( | |
| { | |
| "strategy_a": np.sin(np.arange(len(idx)) / 5) / 100, | |
| "strategy_b": np.cos(np.arange(len(idx)) / 7) / 100, | |
| }, | |
| index=idx, | |
| ) | |
| pbo = compute_pbo(returns, t1, n_folds=4) | |
| return {"splits": len(splits), "train_times": train_times.shape, "pbo": pbo["pbo"]} | |
| run_smoke("cross_validation.core", cross_validation_smoke, results) | |
| def strategies_smoke() -> dict: | |
| from afml.datasets import load_dollar_bar_sample | |
| from afml.strategies.trading_strategies import BollingerStrategy, MACrossoverStrategy | |
| bars = load_dollar_bar_sample().tail(300) | |
| boll = BollingerStrategy(window=20, std=2).generate_signals(bars) | |
| ma = MACrossoverStrategy(fast_window=10, slow_window=30).generate_signals(bars) | |
| return {"bollinger_signals": boll.value_counts().to_dict(), "ma_signals": ma.value_counts().to_dict()} | |
| run_smoke("strategies.basic", strategies_smoke, results) | |
| def mt5_smoke() -> dict: | |
| import MetaTrader5 as mt5 | |
| from afml.mt5.load_data import login_mt5 | |
| if not login_mt5(mt5_account, verbose=False): | |
| raise RuntimeError("MT5 login failed") | |
| try: | |
| end = datetime.now(timezone.utc) | |
| start = end - timedelta(days=mt5_days) | |
| out: dict[str, Any] = {} | |
| for symbol in ("XAUUSD", "BTCUSD"): | |
| if not mt5.symbol_select(symbol, True): | |
| out[symbol] = "symbol_select failed" | |
| continue | |
| rates = mt5.copy_rates_range(symbol, mt5.TIMEFRAME_M5, start, end) | |
| out[symbol] = 0 if rates is None else len(rates) | |
| return out | |
| finally: | |
| mt5.shutdown() | |
| if include_mt5: | |
| run_smoke("mt5.live_xau_btc_m5", mt5_smoke, results) | |
| return results | |
| def main() -> int: | |
| parser = argparse.ArgumentParser(description="Run broad AFML diagnostics.") | |
| parser.add_argument("--include-mt5", action="store_true") | |
| parser.add_argument("--mt5-account", default="LIVE") | |
| parser.add_argument("--mt5-days", type=int, default=3) | |
| parser.add_argument("--out", default="diagnostics") | |
| args = parser.parse_args() | |
| out_dir = ROOT / args.out | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| module_names = discover_modules() | |
| import_results, inventory = import_inventory(module_names) | |
| smoke_results = smoke_tests(args.include_mt5, args.mt5_account, args.mt5_days) | |
| payload = { | |
| "generated_at": datetime.now(timezone.utc).isoformat(), | |
| "module_count": len(module_names), | |
| "import_results": import_results, | |
| "inventory": inventory, | |
| "smoke_results": smoke_results, | |
| } | |
| (out_dir / "full_project_diagnostics.json").write_text( | |
| json.dumps(payload, indent=2, default=json_default), | |
| encoding="utf-8", | |
| ) | |
| pd.DataFrame(import_results).to_csv(out_dir / "module_imports.csv", index=False) | |
| pd.DataFrame(smoke_results).to_csv(out_dir / "smoke_tests.csv", index=False) | |
| import_failures = [r for r in import_results if r["status"] != "pass"] | |
| smoke_failures = [r for r in smoke_results if r["status"] != "pass"] | |
| print(f"Modules discovered: {len(module_names)}") | |
| print(f"Module import failures: {len(import_failures)}") | |
| print(f"Smoke tests: {len(smoke_results)}") | |
| print(f"Smoke failures: {len(smoke_failures)}") | |
| print(f"Reports saved to: {out_dir}") | |
| if import_failures: | |
| print("\nImport failures:") | |
| for failure in import_failures: | |
| print(f"- {failure['name']}: {failure.get('error_type')} - {failure.get('error')}") | |
| if smoke_failures: | |
| print("\nSmoke failures:") | |
| for failure in smoke_failures: | |
| print(f"- {failure['name']}: {failure.get('error_type')} - {failure.get('error')}") | |
| return 1 if smoke_failures else 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |