stockpro-ml / app /services /concept_drift.py
will702's picture
StockPro ML backend with pytorch-forecasting TFT
9334ec6
"""
Concept drift measurement for DDG-DA.
Computes distribution snapshots (statistical moments per feature) over
rolling windows and scores how much the current distribution has drifted
from the historical reference.
No scipy dependency β€” moments computed with pure NumPy.
"""
from __future__ import annotations
import numpy as np
from dataclasses import dataclass, field
from typing import Optional
from app.services.feature_engineer import N_FEATURES, FEATURE_COLS
SNAPSHOT_WINDOW = 20 # trading days per snapshot window (~1 month)
N_STAT_MOMENTS = 4 # mean, std, skewness, kurtosis
SNAPSHOT_DIM = N_FEATURES * N_STAT_MOMENTS # 11 Γ— 4 = 44
DRIFT_THRESHOLD_SOFT = 1.8 # sigma β€” triggers DDG-DA adaptation
DRIFT_THRESHOLD_HARD = 3.0 # sigma β€” strong drift (logged/flagged only)
K_HISTORY = 8 # number of past snapshots used by the MLP predictor
# ── Statistical moment helpers (pure NumPy) ──────────────────────────────────
def _skewness(x: np.ndarray) -> float:
"""Fisher's skewness: E[(x-ΞΌ)Β³] / σ³"""
n = len(x)
if n < 3:
return 0.0
mu = x.mean()
sigma = x.std()
if sigma == 0:
return 0.0
return float(((x - mu) ** 3).mean() / sigma ** 3)
def _kurtosis(x: np.ndarray) -> float:
"""Excess kurtosis: E[(x-ΞΌ)⁴] / σ⁴ βˆ’ 3"""
n = len(x)
if n < 4:
return 0.0
mu = x.mean()
sigma = x.std()
if sigma == 0:
return 0.0
return float(((x - mu) ** 4).mean() / sigma ** 4) - 3.0
# ── Core snapshot functions ───────────────────────────────────────────────────
def compute_snapshot(features: np.ndarray, window: int = SNAPSHOT_WINDOW) -> np.ndarray:
"""
Compute a distribution snapshot from the last `window` rows of features.
Args:
features: (T, N_FEATURES) normalized feature matrix β€” T >= window recommended.
window: number of trailing rows to include in the snapshot.
Returns:
(SNAPSHOT_DIM,) = (44,) float32 vector:
[mean_f0, std_f0, skew_f0, kurt_f0, mean_f1, ..., kurt_f10]
"""
tail = features[-window:] if len(features) >= window else features
stats = []
for f in range(N_FEATURES):
col = tail[:, f].astype(np.float64)
stats.extend([
float(col.mean()),
float(col.std()),
_skewness(col),
_kurtosis(col),
])
return np.array(stats, dtype=np.float32)
def extract_snapshots_from_series(
features: np.ndarray,
window: int = SNAPSHOT_WINDOW,
) -> np.ndarray:
"""
Slide a non-overlapping window across features to extract K snapshots.
Args:
features: (T, N_FEATURES)
window: snapshot window size
Returns:
(K, SNAPSHOT_DIM) where K = T // window. Returns empty (0, SNAPSHOT_DIM)
if T < window.
"""
T = len(features)
k = T // window
if k == 0:
return np.empty((0, SNAPSHOT_DIM), dtype=np.float32)
snapshots = []
for i in range(k):
segment = features[i * window : (i + 1) * window]
snapshots.append(compute_snapshot(segment, window=window))
return np.stack(snapshots, axis=0) # (K, 44)
def compute_drift_score(
current_snapshot: np.ndarray,
reference_snapshots: np.ndarray,
) -> tuple[float, bool]:
"""
Score how much `current_snapshot` deviates from the reference distribution.
Each of the 44 stat dimensions is z-scored against the reference history.
Drift score = mean |z| across all 44 dimensions.
Args:
current_snapshot: (44,) snapshot of current window
reference_snapshots: (K, 44) historical snapshots (K >= 2 required)
Returns:
(drift_score: float, drift_detected: bool)
"""
if len(reference_snapshots) < 2:
return 0.0, False
ref_mean = reference_snapshots.mean(axis=0) # (44,)
ref_std = reference_snapshots.std(axis=0) + 1e-8 # (44,) β€” avoid /0
z_scores = np.abs((current_snapshot - ref_mean) / ref_std) # (44,)
drift_score = float(z_scores.mean())
return drift_score, drift_score > DRIFT_THRESHOLD_SOFT
# ── DriftState ────────────────────────────────────────────────────────────────
@dataclass
class DriftState:
"""Result of concept drift assessment for one inference call."""
snapshot: np.ndarray # (44,) current snapshot
drift_score: float
drift_detected: bool
predicted_next_snapshot: Optional[np.ndarray] = field(default=None) # (44,)