AFML / scripts /test_core_subsystems.py
akshayboora's picture
Upload 940 files
669d6a1 verified
from __future__ import annotations
import importlib
import json
import sys
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
REPORT = ROOT / "diagnostics" / "core_subsystems_report.json"
PACKAGES = [
"afml.cross_validation",
"afml.data_structures",
"afml.datasets",
"afml.features",
"afml.filters",
"afml.labeling",
"afml.sample_weights",
"afml.strategies",
]
MODULES = [
"afml.cross_validation.anchored_walkforward",
"afml.cross_validation.combinatorial",
"afml.cross_validation.cpcv_usage",
"afml.cross_validation.cross_validation",
"afml.cross_validation.hyper_fit",
"afml.cross_validation.hyper_fit_analysis",
"afml.cross_validation.optuna_hyper_fit",
"afml.cross_validation.pbo",
"afml.cross_validation.scoring",
"afml.cross_validation.trial_tracker",
"afml.data_structures.bars",
"afml.datasets.load_datasets",
"afml.features.fracdiff",
"afml.features.fractals",
"afml.features.meta_labeling_features",
"afml.features.moving_averages",
"afml.features.returns",
"afml.features.stationary",
"afml.features.trading_session",
"afml.features.volatility_regime",
"afml.filters.filters",
"afml.labeling.fixed_time_horizon",
"afml.labeling.trend_scanning",
"afml.labeling.triple_barrier",
"afml.sample_weights.attribution",
"afml.sample_weights.optimized_attribution",
"afml.strategies.bollinger_features",
"afml.strategies.genetic_optimizer",
"afml.strategies.ma_crossover_feature_engine",
"afml.strategies.ma_whipsaw_ratio",
"afml.strategies.signal_processing",
"afml.strategies.strategy_optimizer",
"afml.strategies.trading_strategies",
"afml.strategies.trend_scanning_optimizer",
"afml.strategies.trend_scanning_optimizer_1",
]
def record(results: list[dict], name: str, fn):
try:
value = fn()
results.append({"name": name, "status": "pass", "detail": value})
except Exception as exc:
results.append(
{
"name": name,
"status": "fail",
"error_type": type(exc).__name__,
"error": str(exc),
}
)
def make_price_data(n: int = 240) -> pd.DataFrame:
idx = pd.date_range("2026-01-01", periods=n, freq="h", tz="UTC")
trend = np.linspace(100, 112, n)
cycle = np.sin(np.arange(n) / 6) * 1.5
close = pd.Series(trend + cycle, index=idx)
return pd.DataFrame(
{
"open": close.shift(1).bfill(),
"high": close + 0.8,
"low": close - 0.8,
"close": close,
"spread": 0.02,
"volume": 100 + (np.arange(n) % 20),
},
index=idx,
)
def make_tick_data(n: int = 500) -> pd.DataFrame:
idx = pd.date_range("2026-01-01", periods=n, freq="s", tz="UTC")
price = 100 + np.cumsum(np.sin(np.arange(n) / 11) * 0.01)
return pd.DataFrame(
{
"bid": price,
"ask": price + 0.02,
"volume": 1 + (np.arange(n) % 5),
},
index=idx,
)
def test_imports(results: list[dict]):
for package in PACKAGES:
record(results, f"import package {package}", lambda p=package: str(importlib.import_module(p)))
for module in MODULES:
record(results, f"import module {module}", lambda m=module: str(importlib.import_module(m)))
def test_cross_validation(results: list[dict], data: pd.DataFrame):
from afml.cross_validation import (
CombinatorialPurgedCV,
PurgedKFold,
PurgedSplit,
PurgedWalkForwardCV,
ml_cross_val_score,
probability_weighted_accuracy,
)
X = pd.DataFrame({"ret": data["close"].pct_change().fillna(0), "vol": data["volume"]}, index=data.index)
y = (X["ret"].shift(-1).fillna(0) > 0).astype(int)
t1 = pd.Series(data.index.to_series().shift(-2).bfill().array, index=data.index)
record(results, "cross_validation PurgedKFold split", lambda: [tuple(map(len, s)) for s in PurgedKFold(3, t1).split(X)])
record(results, "cross_validation PurgedSplit split", lambda: tuple(map(len, PurgedSplit(t1, 0.25).split(X))))
record(results, "cross_validation PurgedWalkForwardCV split", lambda: [tuple(map(len, s)) for s in PurgedWalkForwardCV(3, t1).split(X)])
record(results, "cross_validation CombinatorialPurgedCV split", lambda: [tuple([len(s[0]), sum(len(x) for x in s[1])]) for s in CombinatorialPurgedCV(6, 2, t1=t1).split(X)][:3])
record(
results,
"cross_validation probability_weighted_accuracy",
lambda: float(probability_weighted_accuracy(y.iloc[:10], np.column_stack([1 - y.iloc[:10], y.iloc[:10]]), labels=[0, 1])),
)
record(
results,
"cross_validation ml_cross_val_score",
lambda: [float(x) for x in ml_cross_val_score(RandomForestClassifier(n_estimators=10, random_state=7), X, y, cv_gen=PurgedKFold(3, t1), scoring="accuracy")],
)
def test_data_and_datasets(results: list[dict]):
from afml.data_structures import calculate_ticks_per_period, make_bars
from afml.datasets import load_dollar_bar_sample, load_stock_prices, load_tick_sample
ticks = make_tick_data()
record(results, "data_structures calculate_ticks_per_period", lambda: int(calculate_ticks_per_period(ticks, "M1", verbose=False)))
record(results, "data_structures tick bars", lambda: make_bars(ticks, "tick", 50).shape)
record(results, "data_structures volume bars", lambda: make_bars(ticks, "volume", 100).shape)
record(results, "data_structures dollar bars", lambda: make_bars(ticks, "dollar", 1000, price="bid").shape)
record(results, "datasets load_stock_prices", lambda: load_stock_prices().shape)
record(results, "datasets load_tick_sample", lambda: load_tick_sample().shape)
record(results, "datasets load_dollar_bar_sample", lambda: load_dollar_bar_sample().shape)
def test_features_filters_labeling_weights(results: list[dict], data: pd.DataFrame):
from afml.features.fracdiff import frac_diff, frac_diff_ffd
from afml.features.fractals import get_fractal_features
from afml.features.meta_labeling_features import calculate_market_regime_features
from afml.features.moving_averages import calculate_ma_differences, get_ma_crossovers
from afml.features.returns import get_lagged_returns, get_period_autocorr, get_return_dist_features
from afml.features.stationary import is_stationary
from afml.features.trading_session import get_time_features
from afml.filters import cusum_filter, z_score_filter
from afml.labeling import add_vertical_barrier, fixed_time_horizon, get_bins, get_events, trend_scanning_labels
from afml.sample_weights import get_weights_by_return, get_weights_by_return_optimized, get_weights_by_time_decay
close = data["close"]
events = cusum_filter(close, threshold=close.pct_change().std(), time_stamps=True)
target = close.pct_change().abs().rolling(10).mean().bfill()
vb = add_vertical_barrier(events[:20], close, num_bars=5)
tb_events = get_events(close, events[:20], [1, 1], target, min_ret=0.00001, vertical_barrier_times=vb).dropna()
record(results, "features frac_diff", lambda: frac_diff(close.to_frame("close"), 0.4).shape)
record(results, "features frac_diff_ffd", lambda: frac_diff_ffd(close.to_frame("close"), 0.4).shape)
record(results, "features moving_average_differences", lambda: calculate_ma_differences(close, [5, 10, 20]).shape)
record(results, "features moving_average_crossovers", lambda: get_ma_crossovers(close, [5, 10, 20]).shape)
record(results, "features lagged_returns", lambda: get_lagged_returns(close, [1, 2, 5], nperiods=2).shape)
record(results, "features period_autocorr", lambda: get_period_autocorr(close, hours=1, lookback=20).dropna().shape)
record(results, "features return_distribution", lambda: get_return_dist_features(close, window=20).dropna().shape)
record(results, "features time_features", lambda: get_time_features(data, "H1").shape)
record(results, "features stationary", lambda: is_stationary(close.to_frame("close"), verbose=False))
record(results, "features fractals", lambda: get_fractal_features(data, target).shape)
record(results, "features market_regime", lambda: calculate_market_regime_features(data).shape)
record(results, "filters cusum_filter", lambda: len(events))
record(results, "filters z_score_filter", lambda: len(z_score_filter(close, mean_window=20, std_window=20, z_score=1.5)))
record(results, "labeling fixed_time_horizon", lambda: fixed_time_horizon(close, threshold=0.001).value_counts(dropna=True).to_dict())
record(results, "labeling trend_scanning", lambda: trend_scanning_labels(close, span=(5, 20)).shape)
record(results, "labeling triple_barrier_events", lambda: tb_events.shape)
record(results, "labeling get_bins", lambda: get_bins(tb_events, close).shape)
record(results, "sample_weights return", lambda: get_weights_by_return(tb_events, close, num_threads=1, verbose=False).shape)
record(results, "sample_weights return_optimized", lambda: get_weights_by_return_optimized(tb_events, close).shape)
record(results, "sample_weights time_decay", lambda: get_weights_by_time_decay(tb_events, close, num_threads=1, verbose=False).shape)
def test_strategies(results: list[dict], data: pd.DataFrame):
from afml.strategies.bollinger_features import create_bollinger_features
from afml.strategies.ma_crossover_feature_engine import ForexFeatureEngine
from afml.strategies.ma_whipsaw_ratio import calculate_ma_whipsaw_ratio
from afml.strategies.signal_processing import get_entries
from afml.strategies.trading_strategies import BollingerStrategy, MACrossoverStrategy
close = data["close"]
short_ma = close.rolling(10).mean()
long_ma = close.rolling(30).mean()
boll = BollingerStrategy(window=20, std=2)
ma = MACrossoverStrategy(fast_window=10, slow_window=30)
record(results, "strategies BollingerStrategy generate_signals", lambda: boll.generate_signals(data).value_counts().to_dict())
record(results, "strategies MACrossoverStrategy generate_signals", lambda: ma.generate_signals(data).value_counts().to_dict())
record(results, "strategies signal_processing get_entries", lambda: (get_entries(ma, data)[0].shape, len(get_entries(ma, data)[1])))
record(results, "strategies bollinger_features", lambda: create_bollinger_features(data).dropna().shape)
record(results, "strategies ma_whipsaw_ratio", lambda: calculate_ma_whipsaw_ratio(close, 10, 30)[0])
record(results, "strategies ForexFeatureEngine", lambda: ForexFeatureEngine("TEST").calculate_all_features(data, "H1").shape)
def main() -> int:
results: list[dict] = []
data = make_price_data()
test_imports(results)
test_cross_validation(results, data)
test_data_and_datasets(results)
test_features_filters_labeling_weights(results, data)
test_strategies(results, data)
REPORT.parent.mkdir(parents=True, exist_ok=True)
summary = pd.Series([r["status"] for r in results]).value_counts().to_dict()
payload = {"summary": summary, "results": results}
REPORT.write_text(json.dumps(payload, indent=2, default=str), encoding="utf-8")
print(json.dumps(summary, indent=2))
print(f"Report: {REPORT}")
failures = [r for r in results if r["status"] != "pass"]
if failures:
print("\nFailures:")
for failure in failures:
print(f"- {failure['name']}: {failure.get('error_type')} {failure.get('error')}")
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())