from __future__ import annotations import importlib import json import sys from pathlib import Path import numpy as np import pandas as pd from sklearn.ensemble import RandomForestClassifier ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) REPORT = ROOT / "diagnostics" / "core_subsystems_report.json" PACKAGES = [ "afml.cross_validation", "afml.data_structures", "afml.datasets", "afml.features", "afml.filters", "afml.labeling", "afml.sample_weights", "afml.strategies", ] MODULES = [ "afml.cross_validation.anchored_walkforward", "afml.cross_validation.combinatorial", "afml.cross_validation.cpcv_usage", "afml.cross_validation.cross_validation", "afml.cross_validation.hyper_fit", "afml.cross_validation.hyper_fit_analysis", "afml.cross_validation.optuna_hyper_fit", "afml.cross_validation.pbo", "afml.cross_validation.scoring", "afml.cross_validation.trial_tracker", "afml.data_structures.bars", "afml.datasets.load_datasets", "afml.features.fracdiff", "afml.features.fractals", "afml.features.meta_labeling_features", "afml.features.moving_averages", "afml.features.returns", "afml.features.stationary", "afml.features.trading_session", "afml.features.volatility_regime", "afml.filters.filters", "afml.labeling.fixed_time_horizon", "afml.labeling.trend_scanning", "afml.labeling.triple_barrier", "afml.sample_weights.attribution", "afml.sample_weights.optimized_attribution", "afml.strategies.bollinger_features", "afml.strategies.genetic_optimizer", "afml.strategies.ma_crossover_feature_engine", "afml.strategies.ma_whipsaw_ratio", "afml.strategies.signal_processing", "afml.strategies.strategy_optimizer", "afml.strategies.trading_strategies", "afml.strategies.trend_scanning_optimizer", "afml.strategies.trend_scanning_optimizer_1", ] def record(results: list[dict], name: str, fn): try: value = fn() results.append({"name": name, "status": "pass", "detail": value}) except Exception as exc: results.append( { "name": name, "status": "fail", "error_type": type(exc).__name__, "error": str(exc), } ) def make_price_data(n: int = 240) -> pd.DataFrame: idx = pd.date_range("2026-01-01", periods=n, freq="h", tz="UTC") trend = np.linspace(100, 112, n) cycle = np.sin(np.arange(n) / 6) * 1.5 close = pd.Series(trend + cycle, index=idx) return pd.DataFrame( { "open": close.shift(1).bfill(), "high": close + 0.8, "low": close - 0.8, "close": close, "spread": 0.02, "volume": 100 + (np.arange(n) % 20), }, index=idx, ) def make_tick_data(n: int = 500) -> pd.DataFrame: idx = pd.date_range("2026-01-01", periods=n, freq="s", tz="UTC") price = 100 + np.cumsum(np.sin(np.arange(n) / 11) * 0.01) return pd.DataFrame( { "bid": price, "ask": price + 0.02, "volume": 1 + (np.arange(n) % 5), }, index=idx, ) def test_imports(results: list[dict]): for package in PACKAGES: record(results, f"import package {package}", lambda p=package: str(importlib.import_module(p))) for module in MODULES: record(results, f"import module {module}", lambda m=module: str(importlib.import_module(m))) def test_cross_validation(results: list[dict], data: pd.DataFrame): from afml.cross_validation import ( CombinatorialPurgedCV, PurgedKFold, PurgedSplit, PurgedWalkForwardCV, ml_cross_val_score, probability_weighted_accuracy, ) X = pd.DataFrame({"ret": data["close"].pct_change().fillna(0), "vol": data["volume"]}, index=data.index) y = (X["ret"].shift(-1).fillna(0) > 0).astype(int) t1 = pd.Series(data.index.to_series().shift(-2).bfill().array, index=data.index) record(results, "cross_validation PurgedKFold split", lambda: [tuple(map(len, s)) for s in PurgedKFold(3, t1).split(X)]) record(results, "cross_validation PurgedSplit split", lambda: tuple(map(len, PurgedSplit(t1, 0.25).split(X)))) record(results, "cross_validation PurgedWalkForwardCV split", lambda: [tuple(map(len, s)) for s in PurgedWalkForwardCV(3, t1).split(X)]) record(results, "cross_validation CombinatorialPurgedCV split", lambda: [tuple([len(s[0]), sum(len(x) for x in s[1])]) for s in CombinatorialPurgedCV(6, 2, t1=t1).split(X)][:3]) record( results, "cross_validation probability_weighted_accuracy", lambda: float(probability_weighted_accuracy(y.iloc[:10], np.column_stack([1 - y.iloc[:10], y.iloc[:10]]), labels=[0, 1])), ) record( results, "cross_validation ml_cross_val_score", lambda: [float(x) for x in ml_cross_val_score(RandomForestClassifier(n_estimators=10, random_state=7), X, y, cv_gen=PurgedKFold(3, t1), scoring="accuracy")], ) def test_data_and_datasets(results: list[dict]): from afml.data_structures import calculate_ticks_per_period, make_bars from afml.datasets import load_dollar_bar_sample, load_stock_prices, load_tick_sample ticks = make_tick_data() record(results, "data_structures calculate_ticks_per_period", lambda: int(calculate_ticks_per_period(ticks, "M1", verbose=False))) record(results, "data_structures tick bars", lambda: make_bars(ticks, "tick", 50).shape) record(results, "data_structures volume bars", lambda: make_bars(ticks, "volume", 100).shape) record(results, "data_structures dollar bars", lambda: make_bars(ticks, "dollar", 1000, price="bid").shape) record(results, "datasets load_stock_prices", lambda: load_stock_prices().shape) record(results, "datasets load_tick_sample", lambda: load_tick_sample().shape) record(results, "datasets load_dollar_bar_sample", lambda: load_dollar_bar_sample().shape) def test_features_filters_labeling_weights(results: list[dict], data: pd.DataFrame): from afml.features.fracdiff import frac_diff, frac_diff_ffd from afml.features.fractals import get_fractal_features from afml.features.meta_labeling_features import calculate_market_regime_features from afml.features.moving_averages import calculate_ma_differences, get_ma_crossovers from afml.features.returns import get_lagged_returns, get_period_autocorr, get_return_dist_features from afml.features.stationary import is_stationary from afml.features.trading_session import get_time_features from afml.filters import cusum_filter, z_score_filter from afml.labeling import add_vertical_barrier, fixed_time_horizon, get_bins, get_events, trend_scanning_labels from afml.sample_weights import get_weights_by_return, get_weights_by_return_optimized, get_weights_by_time_decay close = data["close"] events = cusum_filter(close, threshold=close.pct_change().std(), time_stamps=True) target = close.pct_change().abs().rolling(10).mean().bfill() vb = add_vertical_barrier(events[:20], close, num_bars=5) tb_events = get_events(close, events[:20], [1, 1], target, min_ret=0.00001, vertical_barrier_times=vb).dropna() record(results, "features frac_diff", lambda: frac_diff(close.to_frame("close"), 0.4).shape) record(results, "features frac_diff_ffd", lambda: frac_diff_ffd(close.to_frame("close"), 0.4).shape) record(results, "features moving_average_differences", lambda: calculate_ma_differences(close, [5, 10, 20]).shape) record(results, "features moving_average_crossovers", lambda: get_ma_crossovers(close, [5, 10, 20]).shape) record(results, "features lagged_returns", lambda: get_lagged_returns(close, [1, 2, 5], nperiods=2).shape) record(results, "features period_autocorr", lambda: get_period_autocorr(close, hours=1, lookback=20).dropna().shape) record(results, "features return_distribution", lambda: get_return_dist_features(close, window=20).dropna().shape) record(results, "features time_features", lambda: get_time_features(data, "H1").shape) record(results, "features stationary", lambda: is_stationary(close.to_frame("close"), verbose=False)) record(results, "features fractals", lambda: get_fractal_features(data, target).shape) record(results, "features market_regime", lambda: calculate_market_regime_features(data).shape) record(results, "filters cusum_filter", lambda: len(events)) record(results, "filters z_score_filter", lambda: len(z_score_filter(close, mean_window=20, std_window=20, z_score=1.5))) record(results, "labeling fixed_time_horizon", lambda: fixed_time_horizon(close, threshold=0.001).value_counts(dropna=True).to_dict()) record(results, "labeling trend_scanning", lambda: trend_scanning_labels(close, span=(5, 20)).shape) record(results, "labeling triple_barrier_events", lambda: tb_events.shape) record(results, "labeling get_bins", lambda: get_bins(tb_events, close).shape) record(results, "sample_weights return", lambda: get_weights_by_return(tb_events, close, num_threads=1, verbose=False).shape) record(results, "sample_weights return_optimized", lambda: get_weights_by_return_optimized(tb_events, close).shape) record(results, "sample_weights time_decay", lambda: get_weights_by_time_decay(tb_events, close, num_threads=1, verbose=False).shape) def test_strategies(results: list[dict], data: pd.DataFrame): from afml.strategies.bollinger_features import create_bollinger_features from afml.strategies.ma_crossover_feature_engine import ForexFeatureEngine from afml.strategies.ma_whipsaw_ratio import calculate_ma_whipsaw_ratio from afml.strategies.signal_processing import get_entries from afml.strategies.trading_strategies import BollingerStrategy, MACrossoverStrategy close = data["close"] short_ma = close.rolling(10).mean() long_ma = close.rolling(30).mean() boll = BollingerStrategy(window=20, std=2) ma = MACrossoverStrategy(fast_window=10, slow_window=30) record(results, "strategies BollingerStrategy generate_signals", lambda: boll.generate_signals(data).value_counts().to_dict()) record(results, "strategies MACrossoverStrategy generate_signals", lambda: ma.generate_signals(data).value_counts().to_dict()) record(results, "strategies signal_processing get_entries", lambda: (get_entries(ma, data)[0].shape, len(get_entries(ma, data)[1]))) record(results, "strategies bollinger_features", lambda: create_bollinger_features(data).dropna().shape) record(results, "strategies ma_whipsaw_ratio", lambda: calculate_ma_whipsaw_ratio(close, 10, 30)[0]) record(results, "strategies ForexFeatureEngine", lambda: ForexFeatureEngine("TEST").calculate_all_features(data, "H1").shape) def main() -> int: results: list[dict] = [] data = make_price_data() test_imports(results) test_cross_validation(results, data) test_data_and_datasets(results) test_features_filters_labeling_weights(results, data) test_strategies(results, data) REPORT.parent.mkdir(parents=True, exist_ok=True) summary = pd.Series([r["status"] for r in results]).value_counts().to_dict() payload = {"summary": summary, "results": results} REPORT.write_text(json.dumps(payload, indent=2, default=str), encoding="utf-8") print(json.dumps(summary, indent=2)) print(f"Report: {REPORT}") failures = [r for r in results if r["status"] != "pass"] if failures: print("\nFailures:") for failure in failures: print(f"- {failure['name']}: {failure.get('error_type')} {failure.get('error')}") return 1 return 0 if __name__ == "__main__": raise SystemExit(main())