""" api.routers.simulate ==================== Bulk battery lifecycle simulation endpoint - vectorized ML-driven. Performance design (O(1) Python overhead per battery regardless of step count): 1. SEI impedance growth - numpy cumsum (no Python loop) 2. Feature matrix build - numpy column_stack -> (N_steps, 12) 3. ML prediction - single model.predict() call via predict_array() 4. RUL / EOL - numpy diff / cumsum / searchsorted 5. Classify / colorize - numpy searchsorted on pre-built label arrays Scaler dispatch mirrors NB03 training EXACTLY: Tree models (RF / ET / XGB / LGB / GB) -> raw numpy (no scaler) Linear / SVR / KNN -> standard_scaler.joblib.transform(X) best_ensemble -> per-component dispatch (same rules) Deep sequence models (PyTorch / Keras) -> not batchable, falls back to physics """ from __future__ import annotations import logging import math from typing import List, Optional import numpy as np from fastapi import APIRouter from pydantic import BaseModel, Field from api.model_registry import ( FEATURE_COLS_SCALAR, classify_degradation, soh_to_color, registry_v3 as registry_v2, ) log = logging.getLogger(__name__) router = APIRouter(prefix="/api/v3", tags=["simulation"]) # -- Physics constants -------------------------------------------------------- _EA_OVER_R = 6200.0 # Ea/R in Kelvin _Q_NOM = 2.0 # NASA PCoE nominal capacity (Ah) _T_REF = 24.0 # Reference ambient temperature (deg C) _I_REF = 1.82 # Reference discharge current (A) _V_REF = 4.19 # Reference peak voltage (V) _TIME_UNIT_SECONDS: dict[str, float | None] = { "cycle": None, "second": 1.0, "minute": 60.0, "hour": 3_600.0, "day": 86_400.0, "week": 604_800.0, "month": 2_592_000.0, "year": 31_536_000.0, } _TIME_UNIT_LABELS: dict[str, str] = { "cycle": "Cycles", "second": "Seconds", "minute": "Minutes", "hour": "Hours", "day": "Days", "week": "Weeks", "month": "Months", "year": "Years", } # Column index map - must stay in sync with FEATURE_COLS_SCALAR _F = {col: idx for idx, col in enumerate(FEATURE_COLS_SCALAR)} # Pre-built label/color arrays for O(1) numpy-vectorized classification _SOH_BINS = np.array([70.0, 80.0, 90.0]) # searchsorted thresholds _DEG_LABELS = np.array(["End-of-Life", "Degraded", "Moderate", "Healthy"], dtype=object) _COLOR_HEX = np.array(["#ef4444", "#f97316", "#eab308", "#22c55e"], dtype=object) def _vec_classify(soh: np.ndarray) -> list[str]: """Vectorized classify_degradation - single numpy call, no Python for-loop.""" return _DEG_LABELS[np.searchsorted(_SOH_BINS, soh, side="left")].tolist() def _vec_color(soh: np.ndarray) -> list[str]: """Vectorized soh_to_color - single numpy call, no Python for-loop.""" return _COLOR_HEX[np.searchsorted(_SOH_BINS, soh, side="left")].tolist() # -- Schemas ------------------------------------------------------------------ class BatterySimConfig(BaseModel): battery_id: str label: Optional[str] = None initial_soh: float = Field(default=100.0, ge=0.0, le=100.0) start_cycle: int = Field(default=1, ge=1) ambient_temperature: float = Field(default=24.0) peak_voltage: float = Field(default=4.19) min_voltage: float = Field(default=2.61) avg_current: float = Field(default=1.82) avg_temp: float = Field(default=32.6) temp_rise: float = Field(default=14.7) cycle_duration: float = Field(default=3690.0) Re: float = Field(default=0.045) Rct: float = Field(default=0.069) delta_capacity: float = Field(default=-0.005) class SimulateRequest(BaseModel): batteries: List[BatterySimConfig] steps: int = Field(default=200, ge=1, le=10_000) time_unit: str = Field(default="day") eol_threshold: float = Field(default=70.0, ge=0.0, le=100.0) model_name: Optional[str] = Field(default=None) use_ml: bool = Field(default=True) class BatterySimResult(BaseModel): battery_id: str label: Optional[str] soh_history: List[float] rul_history: List[float] rul_time_history: List[float] re_history: List[float] rct_history: List[float] cycle_history: List[int] time_history: List[float] degradation_history: List[str] color_history: List[str] eol_cycle: Optional[int] eol_time: Optional[float] final_soh: float final_rul: float deg_rate_avg: float model_used: str = "physics" class SimulateResponse(BaseModel): results: List[BatterySimResult] time_unit: str time_unit_label: str steps: int model_used: str = "physics" # -- Helpers ------------------------------------------------------------------ def _sei_growth( re0: float, rct0: float, steps: int, temp_f: float ) -> tuple[np.ndarray, np.ndarray]: """Vectorized SEI impedance growth over `steps` cycles. Returns (re_arr, rct_arr) each shaped (steps,) using cumsum - no Python loop. Matches the incremental SEI model used during feature engineering (NB02). """ s = np.arange(steps, dtype=np.float64) delta_re = 0.00012 * temp_f * (1.0 + s * 5e-5) delta_rct = 0.00018 * temp_f * (1.0 + s * 8e-5) re_arr = np.minimum(re0 + np.cumsum(delta_re), 2.0) rct_arr = np.minimum(rct0 + np.cumsum(delta_rct), 3.0) return re_arr, rct_arr def _build_feature_matrix( b: BatterySimConfig, steps: int, re_arr: np.ndarray, rct_arr: np.ndarray, ) -> np.ndarray: """Build (steps, 12) feature matrix in FEATURE_COLS_SCALAR order. Column ordering is guaranteed by the _F index map so the resulting matrix is byte-identical to what the NB03 models were trained on, before any scaling step. Scaling is applied inside predict_array() per model family. """ N = steps cycles = np.arange(b.start_cycle, b.start_cycle + N, dtype=np.float64) X = np.empty((N, len(FEATURE_COLS_SCALAR)), dtype=np.float64) X[:, _F["cycle_number"]] = cycles X[:, _F["ambient_temperature"]] = b.ambient_temperature X[:, _F["peak_voltage"]] = b.peak_voltage X[:, _F["min_voltage"]] = b.min_voltage X[:, _F["voltage_range"]] = b.peak_voltage - b.min_voltage X[:, _F["avg_current"]] = b.avg_current X[:, _F["avg_temp"]] = b.avg_temp X[:, _F["temp_rise"]] = b.temp_rise X[:, _F["cycle_duration"]] = b.cycle_duration X[:, _F["Re"]] = re_arr X[:, _F["Rct"]] = rct_arr X[:, _F["delta_capacity"]] = b.delta_capacity return X def _physics_soh(b: BatterySimConfig, steps: int, temp_f: float) -> np.ndarray: """Pure Arrhenius physics fallback - fully vectorized, returns (steps,) SOH.""" rate_base = float(np.clip(abs(b.delta_capacity) / _Q_NOM * 100.0, 0.005, 1.5)) curr_f = 1.0 + max(0.0, (b.avg_current - _I_REF) * 0.18) volt_f = 1.0 + max(0.0, (b.peak_voltage - _V_REF) * 0.55) age_f = 1.0 + (0.08 if b.initial_soh < 85.0 else 0.0) + (0.12 if b.initial_soh < 75.0 else 0.0) deg_rate = float(np.clip(rate_base * temp_f * curr_f * volt_f * age_f, 0.0, 2.0)) soh_arr = b.initial_soh - deg_rate * np.arange(1, steps + 1, dtype=np.float64) return np.clip(soh_arr, 0.0, 100.0) def _compute_rul_and_eol( soh_arr: np.ndarray, initial_soh: float, eol_thr: float, cycle_start: int, cycle_dur: float, tu_sec: float | None, ) -> tuple[np.ndarray, np.ndarray, Optional[int], Optional[float]]: """Vectorized RUL and EOL from SOH trajectory. Returns (rul_cycles, rul_time, eol_cycle, eol_time). Uses rolling-average degradation rate for smooth RUL estimate. """ N = len(soh_arr) steps = np.arange(N, dtype=np.float64) cycles = (cycle_start + steps).astype(np.int64) # Rolling average degradation rate (smoothed, avoids division-by-zero) soh_prev = np.concatenate([[initial_soh], soh_arr[:-1]]) step_deg = np.maximum(0.0, soh_prev - soh_arr) cum_deg = np.cumsum(step_deg) avg_rate = np.maximum(cum_deg / (steps + 1), 1e-6) rul_cycles = np.where(soh_arr > eol_thr, (soh_arr - eol_thr) / avg_rate, 0.0) rul_time = (rul_cycles * cycle_dur / tu_sec) if tu_sec is not None else rul_cycles.copy() # EOL: first step where SOH <= threshold below = soh_arr <= eol_thr eol_cycle: Optional[int] = None eol_time: Optional[float] = None if below.any(): idx = int(np.argmax(below)) eol_cycle = int(cycles[idx]) elapsed_s = eol_cycle * cycle_dur eol_time = round((elapsed_s / tu_sec) if tu_sec else float(eol_cycle), 3) return rul_cycles, rul_time, eol_cycle, eol_time # -- Endpoint ----------------------------------------------------------------- @router.post( "/simulate", response_model=SimulateResponse, summary="Bulk battery lifecycle simulation (vectorized, ML-driven)", ) async def simulate_batteries(req: SimulateRequest): """ Vectorized simulation: builds all N feature rows at once per battery, dispatches to the ML model as a single batch predict() call, then post-processes entirely with numpy (no Python for-loops). Scaler usage mirrors NB03 training exactly: - Tree models (RF/ET/XGB/LGB/GB): raw numpy X, no scaler - Linear/SVR/KNN: standard_scaler.joblib.transform(X) - best_ensemble: per-component family dispatch """ time_unit = req.time_unit.lower() if time_unit not in _TIME_UNIT_SECONDS: time_unit = "day" tu_sec = _TIME_UNIT_SECONDS[time_unit] tu_label = _TIME_UNIT_LABELS[time_unit] eol_thr = req.eol_threshold N = req.steps model_name = req.model_name or registry_v2.default_model or "best_ensemble" # Deep sequence models need per-sample tensors — cannot batch vectorise # Tree / linear / ensemble models support predict_array() batch calls. # We do NOT gate on model_count here: predict_array() has a try/except # fallback to physics, so a partial load still works. family = registry_v2.model_meta.get(model_name, {}).get("family", "classical") is_deep = family in ("deep_pytorch", "deep_keras") ml_batchable = ( req.use_ml and not is_deep and (model_name == "best_ensemble" or model_name in registry_v2.models) ) # Determine scaler note for logging (mirrors training decision exactly) if model_name in registry_v2._LINEAR_FAMILIES: scaler_note = "standard_scaler" elif model_name == "best_ensemble": scaler_note = "per-component (tree=none / linear=standard_scaler)" else: scaler_note = "none (tree)" effective_model = "physics" log.info( "simulate: %d batteries x %d steps | model=%s | batchable=%s | scaler=%s | unit=%s", len(req.batteries), N, model_name, ml_batchable, scaler_note, time_unit, ) results: list[BatterySimResult] = [] for b in req.batteries: # 1. SEI impedance growth - vectorized cumsum (no Python loop) T_K = 273.15 + b.ambient_temperature T_REF_K = 273.15 + _T_REF temp_f = float(np.clip(math.exp(_EA_OVER_R * (1.0 / T_REF_K - 1.0 / T_K)), 0.15, 25.0)) re_arr, rct_arr = _sei_growth(b.Re, b.Rct, N, temp_f) # 2. SOH prediction - single batch call regardless of N # predict_array() applies the correct scaler per model family, # exactly matching the preprocessing done during NB03 training: # * standard_scaler.transform(X) for Ridge / SVR / KNN / Lasso / ElasticNet # * raw numpy for RF / ET / XGB / LGB / GB # * per-component dispatch for best_ensemble if ml_batchable: X = _build_feature_matrix(b, N, re_arr, rct_arr) try: soh_arr, effective_model = registry_v2.predict_array(X, model_name) except Exception as exc: log.warning( "predict_array failed for %s (%s) - falling back to physics", b.battery_id, exc, ) soh_arr = _physics_soh(b, N, temp_f) effective_model = "physics" else: soh_arr = _physics_soh(b, N, temp_f) effective_model = "physics" soh_arr = np.clip(soh_arr, 0.0, 100.0) # 3. RUL + EOL - vectorized rul_cycles, rul_time, eol_cycle, eol_time = _compute_rul_and_eol( soh_arr, b.initial_soh, eol_thr, b.start_cycle, b.cycle_duration, tu_sec, ) # 4. Time axis - vectorized cycle_arr = np.arange(b.start_cycle, b.start_cycle + N, dtype=np.int64) time_arr = ( (cycle_arr * b.cycle_duration / tu_sec).astype(np.float64) if tu_sec is not None else cycle_arr.astype(np.float64) ) # 5. Labels + colors - fully vectorized via numpy searchsorted # Replaces O(N) Python for-loop with a single C-level call deg_h = _vec_classify(soh_arr) color_h = _vec_color(soh_arr) avg_dr = float(np.mean(np.maximum(0.0, -np.diff(soh_arr, prepend=b.initial_soh)))) # 6. Build result - numpy round + .tolist() (no per-element Python conversion) results.append(BatterySimResult( battery_id = b.battery_id, label = b.label or b.battery_id, soh_history = np.round(soh_arr, 3).tolist(), rul_history = np.round(rul_cycles, 1).tolist(), rul_time_history = np.round(rul_time, 2).tolist(), re_history = np.round(re_arr, 6).tolist(), rct_history = np.round(rct_arr, 6).tolist(), cycle_history = cycle_arr.tolist(), time_history = np.round(time_arr, 3).tolist(), degradation_history = deg_h, color_history = color_h, eol_cycle = eol_cycle, eol_time = eol_time, final_soh = round(float(soh_arr[-1]), 3), final_rul = round(float(rul_cycles[-1]), 1), deg_rate_avg = round(avg_dr, 6), model_used = effective_model, )) return SimulateResponse( results = results, time_unit = time_unit, time_unit_label = tu_label, steps = N, model_used = effective_model, )