"""Drift simulation engine. Supports four drift types: gradual - features shift linearly over N steps sudden - abrupt distribution change at a single point seasonal - sinusoidal oscillation mixed - combination of gradual and seasonal """ from __future__ import annotations import numpy as np import pandas as pd from typing import Literal, Optional, Sequence from src.utils.logging_config import get_logger log = get_logger(__name__) DriftType = Literal["gradual", "sudden", "seasonal", "mixed"] class DriftSimulator: """Inject configurable drift into feature DataFrames.""" DRIFTABLE_CONTINUOUS = [ "trip_distance", "passenger_count", "pickup_hour", ] DRIFTABLE_CATEGORICAL = [ "rate_code_id", "payment_type", "pu_location_zone", "do_location_zone", "vendor_id", ] def __init__(self, random_seed: int = 42) -> None: self.rng = np.random.default_rng(random_seed) log.info("DriftSimulator initialised (seed=%d)", random_seed) def apply( self, df: pd.DataFrame, drift_type: DriftType = "gradual", affected_features: Optional[Sequence[str]] = None, severity: float = 1.0, step: int = 0, total_steps: int = 500, ) -> pd.DataFrame: """Apply drift to `df` and return a modified copy.""" df = df.copy() if affected_features is None: n_features = self.rng.integers(2, 4) affected_features = list( self.rng.choice( self.DRIFTABLE_CONTINUOUS, size=min(n_features, len(self.DRIFTABLE_CONTINUOUS)), replace=False, ) ) log.debug( "Applying %s drift (step=%d/%d, features=%s, severity=%.2f)", drift_type, step, total_steps, affected_features, severity, ) if drift_type == "gradual": df = self._gradual(df, affected_features, severity, step, total_steps) elif drift_type == "sudden": df = self._sudden(df, affected_features, severity) elif drift_type == "seasonal": df = self._seasonal(df, affected_features, severity, step) elif drift_type == "mixed": df = self._gradual(df, affected_features[:1], severity * 0.5, step, total_steps) df = self._seasonal(df, affected_features[1:2], severity * 0.7, step) else: raise ValueError(f"Unknown drift_type: {drift_type!r}") return df def generate_drift_scenario( self, base_df: pd.DataFrame, drift_type: DriftType = "gradual", n_steps: int = 1000, severity: float = 1.0, ) -> tuple[list[pd.DataFrame], dict]: """Generate a full drift scenario as a sequence of DataFrames.""" batch_size = max(1, len(base_df) // n_steps) drifted_batches: list[pd.DataFrame] = [] metadata: dict = { "drift_type": drift_type, "n_steps": n_steps, "severity": severity, "affected_features": [], } n_features = self.rng.integers(2, 4) affected = list( self.rng.choice( self.DRIFTABLE_CONTINUOUS, size=min(n_features, len(self.DRIFTABLE_CONTINUOUS)), replace=False, ) ) metadata["affected_features"] = affected for step in range(n_steps): batch = base_df.sample(n=batch_size, replace=True, random_state=int(self.rng.integers(0, 100000))) drifted = self.apply( batch, drift_type=drift_type, affected_features=affected, severity=severity, step=step, total_steps=n_steps, ) drifted_batches.append(drifted) log.info( "Generated drift scenario: type=%s, steps=%d, features=%s", drift_type, n_steps, affected, ) return drifted_batches, metadata def _gradual( self, df: pd.DataFrame, features: Sequence[str], severity: float, step: int, total_steps: int, ) -> pd.DataFrame: progress = step / max(total_steps, 1) for feat in features: if feat not in df.columns: continue col = df[feat].to_numpy(dtype=float) shift = severity * progress * col.std() * 2.0 df[feat] = col + shift + self.rng.normal(0, 0.1 * shift + 0.01, size=len(col)) return df def _sudden( self, df: pd.DataFrame, features: Sequence[str], severity: float, ) -> pd.DataFrame: for feat in features: if feat not in df.columns: continue col = df[feat].to_numpy(dtype=float) df[feat] = col + severity * col.std() * 3.0 return df def _seasonal( self, df: pd.DataFrame, features: Sequence[str], severity: float, step: int, period: int = 100, ) -> pd.DataFrame: angle = 2 * np.pi * (step % period) / period for feat in features: if feat not in df.columns: continue col = df[feat].to_numpy(dtype=float) amplitude = severity * col.std() * 1.5 df[feat] = col + amplitude * np.sin(angle) return df