Spaces:
Sleeping
Sleeping
| """ | |
| Concept drift measurement for DDG-DA. | |
| Computes distribution snapshots (statistical moments per feature) over | |
| rolling windows and scores how much the current distribution has drifted | |
| from the historical reference. | |
| No scipy dependency β moments computed with pure NumPy. | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| from dataclasses import dataclass, field | |
| from typing import Optional | |
| from app.services.feature_engineer import N_FEATURES, FEATURE_COLS | |
| SNAPSHOT_WINDOW = 20 # trading days per snapshot window (~1 month) | |
| N_STAT_MOMENTS = 4 # mean, std, skewness, kurtosis | |
| SNAPSHOT_DIM = N_FEATURES * N_STAT_MOMENTS # 11 Γ 4 = 44 | |
| DRIFT_THRESHOLD_SOFT = 1.8 # sigma β triggers DDG-DA adaptation | |
| DRIFT_THRESHOLD_HARD = 3.0 # sigma β strong drift (logged/flagged only) | |
| K_HISTORY = 8 # number of past snapshots used by the MLP predictor | |
| # ββ Statistical moment helpers (pure NumPy) ββββββββββββββββββββββββββββββββββ | |
| def _skewness(x: np.ndarray) -> float: | |
| """Fisher's skewness: E[(x-ΞΌ)Β³] / ΟΒ³""" | |
| n = len(x) | |
| if n < 3: | |
| return 0.0 | |
| mu = x.mean() | |
| sigma = x.std() | |
| if sigma == 0: | |
| return 0.0 | |
| return float(((x - mu) ** 3).mean() / sigma ** 3) | |
| def _kurtosis(x: np.ndarray) -> float: | |
| """Excess kurtosis: E[(x-ΞΌ)β΄] / Οβ΄ β 3""" | |
| n = len(x) | |
| if n < 4: | |
| return 0.0 | |
| mu = x.mean() | |
| sigma = x.std() | |
| if sigma == 0: | |
| return 0.0 | |
| return float(((x - mu) ** 4).mean() / sigma ** 4) - 3.0 | |
| # ββ Core snapshot functions βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_snapshot(features: np.ndarray, window: int = SNAPSHOT_WINDOW) -> np.ndarray: | |
| """ | |
| Compute a distribution snapshot from the last `window` rows of features. | |
| Args: | |
| features: (T, N_FEATURES) normalized feature matrix β T >= window recommended. | |
| window: number of trailing rows to include in the snapshot. | |
| Returns: | |
| (SNAPSHOT_DIM,) = (44,) float32 vector: | |
| [mean_f0, std_f0, skew_f0, kurt_f0, mean_f1, ..., kurt_f10] | |
| """ | |
| tail = features[-window:] if len(features) >= window else features | |
| stats = [] | |
| for f in range(N_FEATURES): | |
| col = tail[:, f].astype(np.float64) | |
| stats.extend([ | |
| float(col.mean()), | |
| float(col.std()), | |
| _skewness(col), | |
| _kurtosis(col), | |
| ]) | |
| return np.array(stats, dtype=np.float32) | |
| def extract_snapshots_from_series( | |
| features: np.ndarray, | |
| window: int = SNAPSHOT_WINDOW, | |
| ) -> np.ndarray: | |
| """ | |
| Slide a non-overlapping window across features to extract K snapshots. | |
| Args: | |
| features: (T, N_FEATURES) | |
| window: snapshot window size | |
| Returns: | |
| (K, SNAPSHOT_DIM) where K = T // window. Returns empty (0, SNAPSHOT_DIM) | |
| if T < window. | |
| """ | |
| T = len(features) | |
| k = T // window | |
| if k == 0: | |
| return np.empty((0, SNAPSHOT_DIM), dtype=np.float32) | |
| snapshots = [] | |
| for i in range(k): | |
| segment = features[i * window : (i + 1) * window] | |
| snapshots.append(compute_snapshot(segment, window=window)) | |
| return np.stack(snapshots, axis=0) # (K, 44) | |
| def compute_drift_score( | |
| current_snapshot: np.ndarray, | |
| reference_snapshots: np.ndarray, | |
| ) -> tuple[float, bool]: | |
| """ | |
| Score how much `current_snapshot` deviates from the reference distribution. | |
| Each of the 44 stat dimensions is z-scored against the reference history. | |
| Drift score = mean |z| across all 44 dimensions. | |
| Args: | |
| current_snapshot: (44,) snapshot of current window | |
| reference_snapshots: (K, 44) historical snapshots (K >= 2 required) | |
| Returns: | |
| (drift_score: float, drift_detected: bool) | |
| """ | |
| if len(reference_snapshots) < 2: | |
| return 0.0, False | |
| ref_mean = reference_snapshots.mean(axis=0) # (44,) | |
| ref_std = reference_snapshots.std(axis=0) + 1e-8 # (44,) β avoid /0 | |
| z_scores = np.abs((current_snapshot - ref_mean) / ref_std) # (44,) | |
| drift_score = float(z_scores.mean()) | |
| return drift_score, drift_score > DRIFT_THRESHOLD_SOFT | |
| # ββ DriftState ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class DriftState: | |
| """Result of concept drift assessment for one inference call.""" | |
| snapshot: np.ndarray # (44,) current snapshot | |
| drift_score: float | |
| drift_detected: bool | |
| predicted_next_snapshot: Optional[np.ndarray] = field(default=None) # (44,) | |