File size: 11,855 Bytes
669d6a1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
from __future__ import annotations

import importlib
import json
import sys
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier

ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))
REPORT = ROOT / "diagnostics" / "core_subsystems_report.json"


PACKAGES = [
    "afml.cross_validation",
    "afml.data_structures",
    "afml.datasets",
    "afml.features",
    "afml.filters",
    "afml.labeling",
    "afml.sample_weights",
    "afml.strategies",
]

MODULES = [
    "afml.cross_validation.anchored_walkforward",
    "afml.cross_validation.combinatorial",
    "afml.cross_validation.cpcv_usage",
    "afml.cross_validation.cross_validation",
    "afml.cross_validation.hyper_fit",
    "afml.cross_validation.hyper_fit_analysis",
    "afml.cross_validation.optuna_hyper_fit",
    "afml.cross_validation.pbo",
    "afml.cross_validation.scoring",
    "afml.cross_validation.trial_tracker",
    "afml.data_structures.bars",
    "afml.datasets.load_datasets",
    "afml.features.fracdiff",
    "afml.features.fractals",
    "afml.features.meta_labeling_features",
    "afml.features.moving_averages",
    "afml.features.returns",
    "afml.features.stationary",
    "afml.features.trading_session",
    "afml.features.volatility_regime",
    "afml.filters.filters",
    "afml.labeling.fixed_time_horizon",
    "afml.labeling.trend_scanning",
    "afml.labeling.triple_barrier",
    "afml.sample_weights.attribution",
    "afml.sample_weights.optimized_attribution",
    "afml.strategies.bollinger_features",
    "afml.strategies.genetic_optimizer",
    "afml.strategies.ma_crossover_feature_engine",
    "afml.strategies.ma_whipsaw_ratio",
    "afml.strategies.signal_processing",
    "afml.strategies.strategy_optimizer",
    "afml.strategies.trading_strategies",
    "afml.strategies.trend_scanning_optimizer",
    "afml.strategies.trend_scanning_optimizer_1",
]


def record(results: list[dict], name: str, fn):
    try:
        value = fn()
        results.append({"name": name, "status": "pass", "detail": value})
    except Exception as exc:
        results.append(
            {
                "name": name,
                "status": "fail",
                "error_type": type(exc).__name__,
                "error": str(exc),
            }
        )


def make_price_data(n: int = 240) -> pd.DataFrame:
    idx = pd.date_range("2026-01-01", periods=n, freq="h", tz="UTC")
    trend = np.linspace(100, 112, n)
    cycle = np.sin(np.arange(n) / 6) * 1.5
    close = pd.Series(trend + cycle, index=idx)
    return pd.DataFrame(
        {
            "open": close.shift(1).bfill(),
            "high": close + 0.8,
            "low": close - 0.8,
            "close": close,
            "spread": 0.02,
            "volume": 100 + (np.arange(n) % 20),
        },
        index=idx,
    )


def make_tick_data(n: int = 500) -> pd.DataFrame:
    idx = pd.date_range("2026-01-01", periods=n, freq="s", tz="UTC")
    price = 100 + np.cumsum(np.sin(np.arange(n) / 11) * 0.01)
    return pd.DataFrame(
        {
            "bid": price,
            "ask": price + 0.02,
            "volume": 1 + (np.arange(n) % 5),
        },
        index=idx,
    )


def test_imports(results: list[dict]):
    for package in PACKAGES:
        record(results, f"import package {package}", lambda p=package: str(importlib.import_module(p)))
    for module in MODULES:
        record(results, f"import module {module}", lambda m=module: str(importlib.import_module(m)))


def test_cross_validation(results: list[dict], data: pd.DataFrame):
    from afml.cross_validation import (
        CombinatorialPurgedCV,
        PurgedKFold,
        PurgedSplit,
        PurgedWalkForwardCV,
        ml_cross_val_score,
        probability_weighted_accuracy,
    )

    X = pd.DataFrame({"ret": data["close"].pct_change().fillna(0), "vol": data["volume"]}, index=data.index)
    y = (X["ret"].shift(-1).fillna(0) > 0).astype(int)
    t1 = pd.Series(data.index.to_series().shift(-2).bfill().array, index=data.index)

    record(results, "cross_validation PurgedKFold split", lambda: [tuple(map(len, s)) for s in PurgedKFold(3, t1).split(X)])
    record(results, "cross_validation PurgedSplit split", lambda: tuple(map(len, PurgedSplit(t1, 0.25).split(X))))
    record(results, "cross_validation PurgedWalkForwardCV split", lambda: [tuple(map(len, s)) for s in PurgedWalkForwardCV(3, t1).split(X)])
    record(results, "cross_validation CombinatorialPurgedCV split", lambda: [tuple([len(s[0]), sum(len(x) for x in s[1])]) for s in CombinatorialPurgedCV(6, 2, t1=t1).split(X)][:3])
    record(
        results,
        "cross_validation probability_weighted_accuracy",
        lambda: float(probability_weighted_accuracy(y.iloc[:10], np.column_stack([1 - y.iloc[:10], y.iloc[:10]]), labels=[0, 1])),
    )
    record(
        results,
        "cross_validation ml_cross_val_score",
        lambda: [float(x) for x in ml_cross_val_score(RandomForestClassifier(n_estimators=10, random_state=7), X, y, cv_gen=PurgedKFold(3, t1), scoring="accuracy")],
    )


def test_data_and_datasets(results: list[dict]):
    from afml.data_structures import calculate_ticks_per_period, make_bars
    from afml.datasets import load_dollar_bar_sample, load_stock_prices, load_tick_sample

    ticks = make_tick_data()
    record(results, "data_structures calculate_ticks_per_period", lambda: int(calculate_ticks_per_period(ticks, "M1", verbose=False)))
    record(results, "data_structures tick bars", lambda: make_bars(ticks, "tick", 50).shape)
    record(results, "data_structures volume bars", lambda: make_bars(ticks, "volume", 100).shape)
    record(results, "data_structures dollar bars", lambda: make_bars(ticks, "dollar", 1000, price="bid").shape)
    record(results, "datasets load_stock_prices", lambda: load_stock_prices().shape)
    record(results, "datasets load_tick_sample", lambda: load_tick_sample().shape)
    record(results, "datasets load_dollar_bar_sample", lambda: load_dollar_bar_sample().shape)


def test_features_filters_labeling_weights(results: list[dict], data: pd.DataFrame):
    from afml.features.fracdiff import frac_diff, frac_diff_ffd
    from afml.features.fractals import get_fractal_features
    from afml.features.meta_labeling_features import calculate_market_regime_features
    from afml.features.moving_averages import calculate_ma_differences, get_ma_crossovers
    from afml.features.returns import get_lagged_returns, get_period_autocorr, get_return_dist_features
    from afml.features.stationary import is_stationary
    from afml.features.trading_session import get_time_features
    from afml.filters import cusum_filter, z_score_filter
    from afml.labeling import add_vertical_barrier, fixed_time_horizon, get_bins, get_events, trend_scanning_labels
    from afml.sample_weights import get_weights_by_return, get_weights_by_return_optimized, get_weights_by_time_decay

    close = data["close"]
    events = cusum_filter(close, threshold=close.pct_change().std(), time_stamps=True)
    target = close.pct_change().abs().rolling(10).mean().bfill()
    vb = add_vertical_barrier(events[:20], close, num_bars=5)
    tb_events = get_events(close, events[:20], [1, 1], target, min_ret=0.00001, vertical_barrier_times=vb).dropna()

    record(results, "features frac_diff", lambda: frac_diff(close.to_frame("close"), 0.4).shape)
    record(results, "features frac_diff_ffd", lambda: frac_diff_ffd(close.to_frame("close"), 0.4).shape)
    record(results, "features moving_average_differences", lambda: calculate_ma_differences(close, [5, 10, 20]).shape)
    record(results, "features moving_average_crossovers", lambda: get_ma_crossovers(close, [5, 10, 20]).shape)
    record(results, "features lagged_returns", lambda: get_lagged_returns(close, [1, 2, 5], nperiods=2).shape)
    record(results, "features period_autocorr", lambda: get_period_autocorr(close, hours=1, lookback=20).dropna().shape)
    record(results, "features return_distribution", lambda: get_return_dist_features(close, window=20).dropna().shape)
    record(results, "features time_features", lambda: get_time_features(data, "H1").shape)
    record(results, "features stationary", lambda: is_stationary(close.to_frame("close"), verbose=False))
    record(results, "features fractals", lambda: get_fractal_features(data, target).shape)
    record(results, "features market_regime", lambda: calculate_market_regime_features(data).shape)
    record(results, "filters cusum_filter", lambda: len(events))
    record(results, "filters z_score_filter", lambda: len(z_score_filter(close, mean_window=20, std_window=20, z_score=1.5)))
    record(results, "labeling fixed_time_horizon", lambda: fixed_time_horizon(close, threshold=0.001).value_counts(dropna=True).to_dict())
    record(results, "labeling trend_scanning", lambda: trend_scanning_labels(close, span=(5, 20)).shape)
    record(results, "labeling triple_barrier_events", lambda: tb_events.shape)
    record(results, "labeling get_bins", lambda: get_bins(tb_events, close).shape)
    record(results, "sample_weights return", lambda: get_weights_by_return(tb_events, close, num_threads=1, verbose=False).shape)
    record(results, "sample_weights return_optimized", lambda: get_weights_by_return_optimized(tb_events, close).shape)
    record(results, "sample_weights time_decay", lambda: get_weights_by_time_decay(tb_events, close, num_threads=1, verbose=False).shape)


def test_strategies(results: list[dict], data: pd.DataFrame):
    from afml.strategies.bollinger_features import create_bollinger_features
    from afml.strategies.ma_crossover_feature_engine import ForexFeatureEngine
    from afml.strategies.ma_whipsaw_ratio import calculate_ma_whipsaw_ratio
    from afml.strategies.signal_processing import get_entries
    from afml.strategies.trading_strategies import BollingerStrategy, MACrossoverStrategy

    close = data["close"]
    short_ma = close.rolling(10).mean()
    long_ma = close.rolling(30).mean()
    boll = BollingerStrategy(window=20, std=2)
    ma = MACrossoverStrategy(fast_window=10, slow_window=30)

    record(results, "strategies BollingerStrategy generate_signals", lambda: boll.generate_signals(data).value_counts().to_dict())
    record(results, "strategies MACrossoverStrategy generate_signals", lambda: ma.generate_signals(data).value_counts().to_dict())
    record(results, "strategies signal_processing get_entries", lambda: (get_entries(ma, data)[0].shape, len(get_entries(ma, data)[1])))
    record(results, "strategies bollinger_features", lambda: create_bollinger_features(data).dropna().shape)
    record(results, "strategies ma_whipsaw_ratio", lambda: calculate_ma_whipsaw_ratio(close, 10, 30)[0])
    record(results, "strategies ForexFeatureEngine", lambda: ForexFeatureEngine("TEST").calculate_all_features(data, "H1").shape)


def main() -> int:
    results: list[dict] = []
    data = make_price_data()
    test_imports(results)
    test_cross_validation(results, data)
    test_data_and_datasets(results)
    test_features_filters_labeling_weights(results, data)
    test_strategies(results, data)

    REPORT.parent.mkdir(parents=True, exist_ok=True)
    summary = pd.Series([r["status"] for r in results]).value_counts().to_dict()
    payload = {"summary": summary, "results": results}
    REPORT.write_text(json.dumps(payload, indent=2, default=str), encoding="utf-8")

    print(json.dumps(summary, indent=2))
    print(f"Report: {REPORT}")
    failures = [r for r in results if r["status"] != "pass"]
    if failures:
        print("\nFailures:")
        for failure in failures:
            print(f"- {failure['name']}: {failure.get('error_type')} {failure.get('error')}")
        return 1
    return 0


if __name__ == "__main__":
    raise SystemExit(main())