Spaces:
No application file
No application file
| from __future__ import annotations | |
| import importlib | |
| import json | |
| import sys | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.ensemble import RandomForestClassifier | |
| ROOT = Path(__file__).resolve().parents[1] | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| REPORT = ROOT / "diagnostics" / "core_subsystems_report.json" | |
| PACKAGES = [ | |
| "afml.cross_validation", | |
| "afml.data_structures", | |
| "afml.datasets", | |
| "afml.features", | |
| "afml.filters", | |
| "afml.labeling", | |
| "afml.sample_weights", | |
| "afml.strategies", | |
| ] | |
| MODULES = [ | |
| "afml.cross_validation.anchored_walkforward", | |
| "afml.cross_validation.combinatorial", | |
| "afml.cross_validation.cpcv_usage", | |
| "afml.cross_validation.cross_validation", | |
| "afml.cross_validation.hyper_fit", | |
| "afml.cross_validation.hyper_fit_analysis", | |
| "afml.cross_validation.optuna_hyper_fit", | |
| "afml.cross_validation.pbo", | |
| "afml.cross_validation.scoring", | |
| "afml.cross_validation.trial_tracker", | |
| "afml.data_structures.bars", | |
| "afml.datasets.load_datasets", | |
| "afml.features.fracdiff", | |
| "afml.features.fractals", | |
| "afml.features.meta_labeling_features", | |
| "afml.features.moving_averages", | |
| "afml.features.returns", | |
| "afml.features.stationary", | |
| "afml.features.trading_session", | |
| "afml.features.volatility_regime", | |
| "afml.filters.filters", | |
| "afml.labeling.fixed_time_horizon", | |
| "afml.labeling.trend_scanning", | |
| "afml.labeling.triple_barrier", | |
| "afml.sample_weights.attribution", | |
| "afml.sample_weights.optimized_attribution", | |
| "afml.strategies.bollinger_features", | |
| "afml.strategies.genetic_optimizer", | |
| "afml.strategies.ma_crossover_feature_engine", | |
| "afml.strategies.ma_whipsaw_ratio", | |
| "afml.strategies.signal_processing", | |
| "afml.strategies.strategy_optimizer", | |
| "afml.strategies.trading_strategies", | |
| "afml.strategies.trend_scanning_optimizer", | |
| "afml.strategies.trend_scanning_optimizer_1", | |
| ] | |
| def record(results: list[dict], name: str, fn): | |
| try: | |
| value = fn() | |
| results.append({"name": name, "status": "pass", "detail": value}) | |
| except Exception as exc: | |
| results.append( | |
| { | |
| "name": name, | |
| "status": "fail", | |
| "error_type": type(exc).__name__, | |
| "error": str(exc), | |
| } | |
| ) | |
| def make_price_data(n: int = 240) -> pd.DataFrame: | |
| idx = pd.date_range("2026-01-01", periods=n, freq="h", tz="UTC") | |
| trend = np.linspace(100, 112, n) | |
| cycle = np.sin(np.arange(n) / 6) * 1.5 | |
| close = pd.Series(trend + cycle, index=idx) | |
| return pd.DataFrame( | |
| { | |
| "open": close.shift(1).bfill(), | |
| "high": close + 0.8, | |
| "low": close - 0.8, | |
| "close": close, | |
| "spread": 0.02, | |
| "volume": 100 + (np.arange(n) % 20), | |
| }, | |
| index=idx, | |
| ) | |
| def make_tick_data(n: int = 500) -> pd.DataFrame: | |
| idx = pd.date_range("2026-01-01", periods=n, freq="s", tz="UTC") | |
| price = 100 + np.cumsum(np.sin(np.arange(n) / 11) * 0.01) | |
| return pd.DataFrame( | |
| { | |
| "bid": price, | |
| "ask": price + 0.02, | |
| "volume": 1 + (np.arange(n) % 5), | |
| }, | |
| index=idx, | |
| ) | |
| def test_imports(results: list[dict]): | |
| for package in PACKAGES: | |
| record(results, f"import package {package}", lambda p=package: str(importlib.import_module(p))) | |
| for module in MODULES: | |
| record(results, f"import module {module}", lambda m=module: str(importlib.import_module(m))) | |
| def test_cross_validation(results: list[dict], data: pd.DataFrame): | |
| from afml.cross_validation import ( | |
| CombinatorialPurgedCV, | |
| PurgedKFold, | |
| PurgedSplit, | |
| PurgedWalkForwardCV, | |
| ml_cross_val_score, | |
| probability_weighted_accuracy, | |
| ) | |
| X = pd.DataFrame({"ret": data["close"].pct_change().fillna(0), "vol": data["volume"]}, index=data.index) | |
| y = (X["ret"].shift(-1).fillna(0) > 0).astype(int) | |
| t1 = pd.Series(data.index.to_series().shift(-2).bfill().array, index=data.index) | |
| record(results, "cross_validation PurgedKFold split", lambda: [tuple(map(len, s)) for s in PurgedKFold(3, t1).split(X)]) | |
| record(results, "cross_validation PurgedSplit split", lambda: tuple(map(len, PurgedSplit(t1, 0.25).split(X)))) | |
| record(results, "cross_validation PurgedWalkForwardCV split", lambda: [tuple(map(len, s)) for s in PurgedWalkForwardCV(3, t1).split(X)]) | |
| record(results, "cross_validation CombinatorialPurgedCV split", lambda: [tuple([len(s[0]), sum(len(x) for x in s[1])]) for s in CombinatorialPurgedCV(6, 2, t1=t1).split(X)][:3]) | |
| record( | |
| results, | |
| "cross_validation probability_weighted_accuracy", | |
| lambda: float(probability_weighted_accuracy(y.iloc[:10], np.column_stack([1 - y.iloc[:10], y.iloc[:10]]), labels=[0, 1])), | |
| ) | |
| record( | |
| results, | |
| "cross_validation ml_cross_val_score", | |
| lambda: [float(x) for x in ml_cross_val_score(RandomForestClassifier(n_estimators=10, random_state=7), X, y, cv_gen=PurgedKFold(3, t1), scoring="accuracy")], | |
| ) | |
| def test_data_and_datasets(results: list[dict]): | |
| from afml.data_structures import calculate_ticks_per_period, make_bars | |
| from afml.datasets import load_dollar_bar_sample, load_stock_prices, load_tick_sample | |
| ticks = make_tick_data() | |
| record(results, "data_structures calculate_ticks_per_period", lambda: int(calculate_ticks_per_period(ticks, "M1", verbose=False))) | |
| record(results, "data_structures tick bars", lambda: make_bars(ticks, "tick", 50).shape) | |
| record(results, "data_structures volume bars", lambda: make_bars(ticks, "volume", 100).shape) | |
| record(results, "data_structures dollar bars", lambda: make_bars(ticks, "dollar", 1000, price="bid").shape) | |
| record(results, "datasets load_stock_prices", lambda: load_stock_prices().shape) | |
| record(results, "datasets load_tick_sample", lambda: load_tick_sample().shape) | |
| record(results, "datasets load_dollar_bar_sample", lambda: load_dollar_bar_sample().shape) | |
| def test_features_filters_labeling_weights(results: list[dict], data: pd.DataFrame): | |
| from afml.features.fracdiff import frac_diff, frac_diff_ffd | |
| from afml.features.fractals import get_fractal_features | |
| from afml.features.meta_labeling_features import calculate_market_regime_features | |
| from afml.features.moving_averages import calculate_ma_differences, get_ma_crossovers | |
| from afml.features.returns import get_lagged_returns, get_period_autocorr, get_return_dist_features | |
| from afml.features.stationary import is_stationary | |
| from afml.features.trading_session import get_time_features | |
| from afml.filters import cusum_filter, z_score_filter | |
| from afml.labeling import add_vertical_barrier, fixed_time_horizon, get_bins, get_events, trend_scanning_labels | |
| from afml.sample_weights import get_weights_by_return, get_weights_by_return_optimized, get_weights_by_time_decay | |
| close = data["close"] | |
| events = cusum_filter(close, threshold=close.pct_change().std(), time_stamps=True) | |
| target = close.pct_change().abs().rolling(10).mean().bfill() | |
| vb = add_vertical_barrier(events[:20], close, num_bars=5) | |
| tb_events = get_events(close, events[:20], [1, 1], target, min_ret=0.00001, vertical_barrier_times=vb).dropna() | |
| record(results, "features frac_diff", lambda: frac_diff(close.to_frame("close"), 0.4).shape) | |
| record(results, "features frac_diff_ffd", lambda: frac_diff_ffd(close.to_frame("close"), 0.4).shape) | |
| record(results, "features moving_average_differences", lambda: calculate_ma_differences(close, [5, 10, 20]).shape) | |
| record(results, "features moving_average_crossovers", lambda: get_ma_crossovers(close, [5, 10, 20]).shape) | |
| record(results, "features lagged_returns", lambda: get_lagged_returns(close, [1, 2, 5], nperiods=2).shape) | |
| record(results, "features period_autocorr", lambda: get_period_autocorr(close, hours=1, lookback=20).dropna().shape) | |
| record(results, "features return_distribution", lambda: get_return_dist_features(close, window=20).dropna().shape) | |
| record(results, "features time_features", lambda: get_time_features(data, "H1").shape) | |
| record(results, "features stationary", lambda: is_stationary(close.to_frame("close"), verbose=False)) | |
| record(results, "features fractals", lambda: get_fractal_features(data, target).shape) | |
| record(results, "features market_regime", lambda: calculate_market_regime_features(data).shape) | |
| record(results, "filters cusum_filter", lambda: len(events)) | |
| record(results, "filters z_score_filter", lambda: len(z_score_filter(close, mean_window=20, std_window=20, z_score=1.5))) | |
| record(results, "labeling fixed_time_horizon", lambda: fixed_time_horizon(close, threshold=0.001).value_counts(dropna=True).to_dict()) | |
| record(results, "labeling trend_scanning", lambda: trend_scanning_labels(close, span=(5, 20)).shape) | |
| record(results, "labeling triple_barrier_events", lambda: tb_events.shape) | |
| record(results, "labeling get_bins", lambda: get_bins(tb_events, close).shape) | |
| record(results, "sample_weights return", lambda: get_weights_by_return(tb_events, close, num_threads=1, verbose=False).shape) | |
| record(results, "sample_weights return_optimized", lambda: get_weights_by_return_optimized(tb_events, close).shape) | |
| record(results, "sample_weights time_decay", lambda: get_weights_by_time_decay(tb_events, close, num_threads=1, verbose=False).shape) | |
| def test_strategies(results: list[dict], data: pd.DataFrame): | |
| from afml.strategies.bollinger_features import create_bollinger_features | |
| from afml.strategies.ma_crossover_feature_engine import ForexFeatureEngine | |
| from afml.strategies.ma_whipsaw_ratio import calculate_ma_whipsaw_ratio | |
| from afml.strategies.signal_processing import get_entries | |
| from afml.strategies.trading_strategies import BollingerStrategy, MACrossoverStrategy | |
| close = data["close"] | |
| short_ma = close.rolling(10).mean() | |
| long_ma = close.rolling(30).mean() | |
| boll = BollingerStrategy(window=20, std=2) | |
| ma = MACrossoverStrategy(fast_window=10, slow_window=30) | |
| record(results, "strategies BollingerStrategy generate_signals", lambda: boll.generate_signals(data).value_counts().to_dict()) | |
| record(results, "strategies MACrossoverStrategy generate_signals", lambda: ma.generate_signals(data).value_counts().to_dict()) | |
| record(results, "strategies signal_processing get_entries", lambda: (get_entries(ma, data)[0].shape, len(get_entries(ma, data)[1]))) | |
| record(results, "strategies bollinger_features", lambda: create_bollinger_features(data).dropna().shape) | |
| record(results, "strategies ma_whipsaw_ratio", lambda: calculate_ma_whipsaw_ratio(close, 10, 30)[0]) | |
| record(results, "strategies ForexFeatureEngine", lambda: ForexFeatureEngine("TEST").calculate_all_features(data, "H1").shape) | |
| def main() -> int: | |
| results: list[dict] = [] | |
| data = make_price_data() | |
| test_imports(results) | |
| test_cross_validation(results, data) | |
| test_data_and_datasets(results) | |
| test_features_filters_labeling_weights(results, data) | |
| test_strategies(results, data) | |
| REPORT.parent.mkdir(parents=True, exist_ok=True) | |
| summary = pd.Series([r["status"] for r in results]).value_counts().to_dict() | |
| payload = {"summary": summary, "results": results} | |
| REPORT.write_text(json.dumps(payload, indent=2, default=str), encoding="utf-8") | |
| print(json.dumps(summary, indent=2)) | |
| print(f"Report: {REPORT}") | |
| failures = [r for r in results if r["status"] != "pass"] | |
| if failures: | |
| print("\nFailures:") | |
| for failure in failures: | |
| print(f"- {failure['name']}: {failure.get('error_type')} {failure.get('error')}") | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |