AFML / scripts /run_full_project_diagnostics.py
akshayboora's picture
Upload 940 files
669d6a1 verified
from __future__ import annotations
import argparse
import importlib
import inspect
import json
import pkgutil
import sys
import traceback
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Callable
import numpy as np
import pandas as pd
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
def json_default(value: Any) -> str:
return str(value)
def record(results: list[dict], name: str, status: str, **details: Any) -> None:
row = {"name": name, "status": status}
row.update(details)
results.append(row)
def discover_modules() -> list[str]:
import afml
return [
module.name
for module in pkgutil.walk_packages(afml.__path__, afml.__name__ + ".")
if ".guides." not in module.name
]
def import_inventory(module_names: list[str]) -> tuple[list[dict], dict[str, Any]]:
results: list[dict] = []
inventory: dict[str, Any] = {}
for name in module_names:
try:
module = importlib.import_module(name)
functions = []
classes = []
for obj_name, obj in inspect.getmembers(module):
if getattr(obj, "__module__", None) != module.__name__:
continue
if inspect.isfunction(obj):
functions.append(obj_name)
elif inspect.isclass(obj):
classes.append(obj_name)
inventory[name] = {"functions": functions, "classes": classes}
record(
results,
name,
"pass",
functions=len(functions),
classes=len(classes),
)
except Exception as exc:
record(
results,
name,
"fail",
error_type=type(exc).__name__,
error=str(exc),
traceback=traceback.format_exc(limit=5),
)
return results, inventory
def run_smoke(name: str, func: Callable[[], Any], results: list[dict]) -> Any:
try:
output = func()
record(results, name, "pass", output=output)
return output
except Exception as exc:
record(
results,
name,
"fail",
error_type=type(exc).__name__,
error=str(exc),
traceback=traceback.format_exc(limit=5),
)
return None
def smoke_tests(include_mt5: bool, mt5_account: str, mt5_days: int) -> list[dict]:
results: list[dict] = []
def datasets_smoke() -> dict:
from afml.datasets import load_dollar_bar_sample, load_stock_prices, load_tick_sample
tick = load_tick_sample()
stock = load_stock_prices()
dollar = load_dollar_bar_sample()
return {
"tick": tick.shape,
"stock": stock.shape,
"dollar": dollar.shape,
}
data = run_smoke("datasets.load_samples", datasets_smoke, results)
def bars_smoke() -> dict:
from afml.data_structures.bars import make_bars
from afml.datasets import load_tick_sample
tick = load_tick_sample()
bars = make_bars(tick, bar_type="tick", bar_size=20, price="mid_price", verbose=False)
time_bars = make_bars(tick, bar_type="time", bar_size="M1", price="mid_price", verbose=False)
return {"tick_bars": bars.shape, "time_bars": time_bars.shape}
run_smoke("data_structures.make_bars", bars_smoke, results)
def features_smoke() -> dict:
from afml.datasets import load_stock_prices
from afml.features.fracdiff import frac_diff_ffd
from afml.features.returns import get_lagged_returns, get_return_dist_features
from afml.features.stationary import is_stationary
from afml.features.trading_session import get_time_features
close = load_stock_prices()["SPY"].dropna().tail(400)
lagged = get_lagged_returns(close, lags=[1, 5, 20], nperiods=2)
dist = get_return_dist_features(close, window=20)
time_features = get_time_features(close.to_frame("close"), timeframe="D1")
frac = frac_diff_ffd(close.to_frame("SPY"), d=0.5, thres=1e-3)
stationary = is_stationary(close.pct_change().dropna().to_frame("SPY_returns"), verbose=False)
return {
"lagged": lagged.shape,
"dist": dist.shape,
"time_features": time_features.shape,
"frac_diff": frac.shape,
"is_stationary": stationary,
}
run_smoke("features.core", features_smoke, results)
def filters_smoke() -> dict:
from afml.datasets import load_stock_prices
from afml.filters.filters import cusum_filter, z_score_filter
close = load_stock_prices()["SPY"].dropna().tail(400)
events = cusum_filter(close, threshold=0.01, time_stamps=True)
z_events = z_score_filter(close.pct_change().dropna(), mean_window=20, std_window=20)
return {"cusum_events": len(events), "z_score_events": len(z_events)}
run_smoke("filters.core", filters_smoke, results)
def labeling_smoke() -> dict:
from afml.datasets import load_stock_prices
from afml.filters.filters import cusum_filter
from afml.labeling.fixed_time_horizon import fixed_time_horizon
from afml.labeling.trend_scanning import trend_scanning_labels
from afml.labeling.triple_barrier import add_vertical_barrier, get_events
from afml.util.volatility import get_daily_vol
close = load_stock_prices()["SPY"].dropna().tail(500)
target = get_daily_vol(close, lookback=50).dropna()
t_events = cusum_filter(close.loc[target.index], threshold=0.01, time_stamps=True)
t_events = t_events.intersection(target.index)[:30]
t1 = add_vertical_barrier(t_events, close, days=5)
events = get_events(close, t_events, pt_sl=[1, 1], target=target, min_ret=0.001, vertical_barrier_times=t1)
fixed = fixed_time_horizon(close, threshold=0.001, lag=True)
trend = trend_scanning_labels(close, span=(5, 20))
return {"triple_events": events.shape, "fixed": fixed.shape, "trend": trend.shape}
run_smoke("labeling.core", labeling_smoke, results)
def sampling_smoke() -> dict:
from afml.sample_weights.attribution import get_weights_by_return
from afml.sampling.concurrent import num_concurrent_events
idx = pd.date_range("2026-01-01", periods=20, freq="h", tz="UTC")
close = pd.Series(np.linspace(100, 110, len(idx)), index=idx)
t1 = pd.Series(idx[5:15], index=idx[:10])
events = pd.DataFrame({"t1": t1})
concurrent = num_concurrent_events(close.index, t1, t1.index)
weights = get_weights_by_return(events, close, num_threads=1, num_conc_events=concurrent, verbose=False)
return {"concurrent": concurrent.shape, "weights": weights.shape}
run_smoke("sampling.sample_weights", sampling_smoke, results)
def cross_validation_smoke() -> dict:
from afml.cross_validation.cross_validation import PurgedKFold, ml_get_train_times
from afml.cross_validation.pbo import compute_pbo
idx = pd.date_range("2026-01-01", periods=60, freq="h", tz="UTC")
X = pd.DataFrame({"x": np.arange(len(idx))}, index=idx)
y = pd.Series((np.arange(len(idx)) % 2), index=idx)
t1 = pd.Series(idx.shift(2, freq="h"), index=idx)
cv = PurgedKFold(n_splits=3, t1=t1, pct_embargo=0.01)
splits = list(cv.split(X, y))
train_times = ml_get_train_times(t1, t1.iloc[:5])
returns = pd.DataFrame(
{
"strategy_a": np.sin(np.arange(len(idx)) / 5) / 100,
"strategy_b": np.cos(np.arange(len(idx)) / 7) / 100,
},
index=idx,
)
pbo = compute_pbo(returns, t1, n_folds=4)
return {"splits": len(splits), "train_times": train_times.shape, "pbo": pbo["pbo"]}
run_smoke("cross_validation.core", cross_validation_smoke, results)
def strategies_smoke() -> dict:
from afml.datasets import load_dollar_bar_sample
from afml.strategies.trading_strategies import BollingerStrategy, MACrossoverStrategy
bars = load_dollar_bar_sample().tail(300)
boll = BollingerStrategy(window=20, std=2).generate_signals(bars)
ma = MACrossoverStrategy(fast_window=10, slow_window=30).generate_signals(bars)
return {"bollinger_signals": boll.value_counts().to_dict(), "ma_signals": ma.value_counts().to_dict()}
run_smoke("strategies.basic", strategies_smoke, results)
def mt5_smoke() -> dict:
import MetaTrader5 as mt5
from afml.mt5.load_data import login_mt5
if not login_mt5(mt5_account, verbose=False):
raise RuntimeError("MT5 login failed")
try:
end = datetime.now(timezone.utc)
start = end - timedelta(days=mt5_days)
out: dict[str, Any] = {}
for symbol in ("XAUUSD", "BTCUSD"):
if not mt5.symbol_select(symbol, True):
out[symbol] = "symbol_select failed"
continue
rates = mt5.copy_rates_range(symbol, mt5.TIMEFRAME_M5, start, end)
out[symbol] = 0 if rates is None else len(rates)
return out
finally:
mt5.shutdown()
if include_mt5:
run_smoke("mt5.live_xau_btc_m5", mt5_smoke, results)
return results
def main() -> int:
parser = argparse.ArgumentParser(description="Run broad AFML diagnostics.")
parser.add_argument("--include-mt5", action="store_true")
parser.add_argument("--mt5-account", default="LIVE")
parser.add_argument("--mt5-days", type=int, default=3)
parser.add_argument("--out", default="diagnostics")
args = parser.parse_args()
out_dir = ROOT / args.out
out_dir.mkdir(parents=True, exist_ok=True)
module_names = discover_modules()
import_results, inventory = import_inventory(module_names)
smoke_results = smoke_tests(args.include_mt5, args.mt5_account, args.mt5_days)
payload = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"module_count": len(module_names),
"import_results": import_results,
"inventory": inventory,
"smoke_results": smoke_results,
}
(out_dir / "full_project_diagnostics.json").write_text(
json.dumps(payload, indent=2, default=json_default),
encoding="utf-8",
)
pd.DataFrame(import_results).to_csv(out_dir / "module_imports.csv", index=False)
pd.DataFrame(smoke_results).to_csv(out_dir / "smoke_tests.csv", index=False)
import_failures = [r for r in import_results if r["status"] != "pass"]
smoke_failures = [r for r in smoke_results if r["status"] != "pass"]
print(f"Modules discovered: {len(module_names)}")
print(f"Module import failures: {len(import_failures)}")
print(f"Smoke tests: {len(smoke_results)}")
print(f"Smoke failures: {len(smoke_failures)}")
print(f"Reports saved to: {out_dir}")
if import_failures:
print("\nImport failures:")
for failure in import_failures:
print(f"- {failure['name']}: {failure.get('error_type')} - {failure.get('error')}")
if smoke_failures:
print("\nSmoke failures:")
for failure in smoke_failures:
print(f"- {failure['name']}: {failure.get('error_type')} - {failure.get('error')}")
return 1 if smoke_failures else 0
if __name__ == "__main__":
raise SystemExit(main())