File size: 5,297 Bytes
f952974
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
feature_builder.py β€” Converts raw rule-engine output dicts into a clean
feature vector for the ML model. Single responsibility: no model logic here.

Design decisions:
- All bool features cast to int (0/1) β€” LGBM handles natively but this
  keeps the matrix dtype homogeneous.
- Engineered interaction terms computed here, not in regime/volume modules,
  to keep those modules free of ML concerns.
- Returns a dict (for inference) or DataFrame row (for training).
- FEATURE_COLUMNS from ml_config defines the canonical order β€” any missing
  feature raises KeyError immediately rather than silently producing NaN.
"""

import math
from typing import Dict, Any

import numpy as np
import pandas as pd

from ml_config import FEATURE_COLUMNS


def build_feature_dict(
    regime_data: Dict[str, Any],
    volume_data: Dict[str, Any],
    scores: Dict[str, Any],
) -> Dict[str, float]:
    """
    Build the canonical feature dict from rule-engine outputs.
    All values are Python floats or ints β€” no pandas/numpy scalars.
    """
    adx       = float(regime_data.get("adx", 0.0))
    di_plus   = float(regime_data.get("di_plus", 0.0))
    di_minus  = float(regime_data.get("di_minus", 0.0))
    di_sum    = di_plus + di_minus + 1e-9
    di_diff   = di_plus - di_minus
    di_ratio  = di_plus / di_sum

    atr_pct               = float(regime_data.get("atr_pct", 0.0))
    vol_ratio             = float(regime_data.get("vol_ratio", 1.0))
    vol_compressed        = int(bool(regime_data.get("vol_compressed", False)))
    vol_expanding         = int(bool(regime_data.get("vol_expanding", False)))
    vol_expanding_from_base = int(bool(regime_data.get("vol_expanding_from_base", False)))

    absorption          = int(bool(volume_data.get("absorption", False)))
    failed_breakout     = int(bool(volume_data.get("failed_breakout", False)))
    recent_failed_count = int(volume_data.get("recent_failed_count", 0))
    obv_slope_norm      = float(volume_data.get("obv_slope_norm", 0.0))
    delta_sign          = int(volume_data.get("delta_sign", 0))
    spike               = int(bool(volume_data.get("spike", False)))
    climax              = int(bool(volume_data.get("climax", False)))

    dist_atr     = float(regime_data.get("dist_atr", 0.0))
    dist_atr_abs = abs(dist_atr)

    regime_confidence = float(regime_data.get("regime_confidence", 0.0))
    regime_score      = float(scores.get("regime_score", 0.0))
    volume_score      = float(scores.get("volume_score", 0.0))
    structure_score   = float(scores.get("structure_score", 0.0))
    confidence_score  = float(scores.get("confidence_score", 0.0))
    total_score       = float(scores.get("total_score", 0.0))

    # Interaction terms β€” multiplicative combinations reduce model depth needed
    adx_x_regime = adx * regime_score
    vol_x_obv    = vol_ratio * obv_slope_norm
    score_x_conf = total_score * regime_confidence

    raw = {
        "adx":                    adx,
        "di_plus":                di_plus,
        "di_minus":               di_minus,
        "di_diff":                di_diff,
        "di_ratio":               di_ratio,
        "atr_pct":                atr_pct,
        "vol_ratio":              vol_ratio,
        "vol_compressed":         vol_compressed,
        "vol_expanding":          vol_expanding,
        "vol_expanding_from_base": vol_expanding_from_base,
        "absorption":             absorption,
        "failed_breakout":        failed_breakout,
        "recent_failed_count":    recent_failed_count,
        "obv_slope_norm":         obv_slope_norm,
        "delta_sign":             delta_sign,
        "spike":                  spike,
        "climax":                 climax,
        "dist_atr":               dist_atr,
        "dist_atr_abs":           dist_atr_abs,
        "regime_confidence":      regime_confidence,
        "regime_score":           regime_score,
        "volume_score":           volume_score,
        "structure_score":        structure_score,
        "confidence_score":       confidence_score,
        "total_score":            total_score,
        "adx_x_regime":           adx_x_regime,
        "vol_x_obv":              vol_x_obv,
        "score_x_conf":           score_x_conf,
    }

    # Validate all expected columns are present
    missing = set(FEATURE_COLUMNS) - set(raw.keys())
    if missing:
        raise KeyError(f"Missing features: {missing}")

    # Return in canonical column order
    return {k: raw[k] for k in FEATURE_COLUMNS}


def feature_dict_to_row(feat: Dict[str, float]) -> pd.Series:
    """Convert feature dict to a pandas Series with canonical column order."""
    return pd.Series({k: feat[k] for k in FEATURE_COLUMNS})


def feature_dict_to_matrix(feat: Dict[str, float]) -> np.ndarray:
    """
    Convert single feature dict to (1, n_features) numpy array for inference.
    Preserves canonical column order from FEATURE_COLUMNS.
    """
    return np.array([[feat[k] for k in FEATURE_COLUMNS]], dtype=np.float64)


def validate_features(feat: Dict[str, float]) -> bool:
    """Return True if all features are finite and present."""
    for k in FEATURE_COLUMNS:
        v = feat.get(k)
        if v is None or (isinstance(v, float) and not math.isfinite(v)):
            return False
    return True