Spaces:
Running
Running
| """ | |
| api.model_registry | |
| ================== | |
| Singleton model registry providing unified loading, versioning, and inference | |
| for all trained battery lifecycle models. | |
| Model versioning | |
| ---------------- | |
| * v1.x β Classical (tree-based / linear) models trained in NB03. | |
| * v2.x β Deep sequence models trained in NB04 β NB07. | |
| * v3.x β Ensemble / meta-models trained in NB08. | |
| Usage | |
| ----- | |
| from api.model_registry import registry | |
| registry.load_all() # FastAPI lifespan startup | |
| result = registry.predict( | |
| features={"cycle_number": 150, ...}, | |
| model_name="best_ensemble", | |
| ) | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Any | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from src.utils.logger import get_logger | |
| log = get_logger(__name__) | |
| # ββ Architecture constants (must match NB04 β NB07 training) βββββββββββββββββ | |
| _N_FEAT: int = 12 # len(FEATURE_COLS_SCALAR) | |
| _SEQ_LEN: int = 32 # WINDOW_SIZE | |
| _HIDDEN: int = 128 # LSTM_HIDDEN | |
| _LSTM_LAYERS: int = 2 # LSTM_LAYERS | |
| _ATTN_LAYERS: int = 3 # AttentionLSTM trained with n_layers=3 | |
| _D_MODEL: int = 64 # TRANSFORMER_D_MODEL | |
| _N_HEADS: int = 4 # TRANSFORMER_NHEAD | |
| _TF_LAYERS: int = 2 # TRANSFORMER_NLAYERS | |
| _DROPOUT: float = 0.2 # DROPOUT | |
| # ββ Paths βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _HERE = Path(__file__).resolve().parent | |
| _PROJECT = _HERE.parent | |
| _MODELS_DIR = _PROJECT / "artifacts" / "models" | |
| _ARTIFACTS = _PROJECT / "artifacts" | |
| def _versioned_paths(version: str = "v1") -> dict[str, Path]: | |
| """Return artifact paths for a specific model version (v1 or v2).""" | |
| root = _PROJECT / "artifacts" / version | |
| return { | |
| "models_dir": root / "models", | |
| "artifacts": root, | |
| "scalers": root / "scalers", | |
| "results": root / "results", | |
| } | |
| FEATURE_COLS_SCALAR: list[str] = [ | |
| "cycle_number", "ambient_temperature", | |
| "peak_voltage", "min_voltage", "voltage_range", | |
| "avg_current", "avg_temp", "temp_rise", | |
| "cycle_duration", "Re", "Rct", "delta_capacity", | |
| ] | |
| # ββ Model catalog (single source of truth for versions & metadata) ββββββββββββ | |
| MODEL_CATALOG: dict[str, dict[str, Any]] = { | |
| "random_forest": {"version": "1.0.0", "display_name": "Random Forest", "family": "classical", "algorithm": "RandomForestRegressor", "target": "soh", "r2": 0.9567}, | |
| "xgboost": {"version": "1.0.0", "display_name": "XGBoost", "family": "classical", "algorithm": "XGBRegressor", "target": "soh", "r2": 0.928}, | |
| "lightgbm": {"version": "1.0.0", "display_name": "LightGBM", "family": "classical", "algorithm": "LGBMRegressor", "target": "soh", "r2": 0.928}, | |
| "ridge": {"version": "1.0.0", "display_name": "Ridge Regression", "family": "classical", "algorithm": "Ridge", "target": "soh", "r2": 0.72}, | |
| "svr": {"version": "1.0.0", "display_name": "SVR (RBF)", "family": "classical", "algorithm": "SVR", "target": "soh", "r2": 0.805}, | |
| "lasso": {"version": "1.0.0", "display_name": "Lasso", "family": "classical", "algorithm": "Lasso", "target": "soh", "r2": 0.52}, | |
| "elasticnet": {"version": "1.0.0", "display_name": "ElasticNet", "family": "classical", "algorithm": "ElasticNet", "target": "soh", "r2": 0.52}, | |
| "knn_k5": {"version": "1.0.0", "display_name": "KNN (k=5)", "family": "classical", "algorithm": "KNeighborsRegressor", "target": "soh", "r2": 0.72}, | |
| "knn_k10": {"version": "1.0.0", "display_name": "KNN (k=10)", "family": "classical", "algorithm": "KNeighborsRegressor", "target": "soh", "r2": 0.724}, | |
| "knn_k20": {"version": "1.0.0", "display_name": "KNN (k=20)", "family": "classical", "algorithm": "KNeighborsRegressor", "target": "soh", "r2": 0.717}, | |
| "extra_trees": {"version": "2.0.0", "display_name": "ExtraTrees", "family": "classical", "algorithm": "ExtraTreesRegressor", "target": "soh", "r2": 0.967}, | |
| "gradient_boosting": {"version": "2.0.0", "display_name": "GradientBoosting", "family": "classical", "algorithm": "GradientBoostingRegressor", "target": "soh", "r2": 0.934}, | |
| "vanilla_lstm": {"version": "2.0.0", "display_name": "Vanilla LSTM", "family": "deep_pytorch", "algorithm": "VanillaLSTM", "target": "soh", "r2": 0.507}, | |
| "bidirectional_lstm": {"version": "2.0.0", "display_name": "Bidirectional LSTM", "family": "deep_pytorch", "algorithm": "BidirectionalLSTM", "target": "soh", "r2": 0.520}, | |
| "gru": {"version": "2.0.0", "display_name": "GRU", "family": "deep_pytorch", "algorithm": "GRUModel", "target": "soh", "r2": 0.510}, | |
| "attention_lstm": {"version": "2.0.0", "display_name": "Attention LSTM", "family": "deep_pytorch", "algorithm": "AttentionLSTM", "target": "soh", "r2": 0.540}, | |
| "batterygpt": {"version": "2.1.0", "display_name": "BatteryGPT", "family": "deep_pytorch", "algorithm": "BatteryGPT", "target": "soh", "r2": 0.881}, | |
| "tft": {"version": "2.2.0", "display_name": "Temporal Fusion Transformer", "family": "deep_pytorch", "algorithm": "TemporalFusionTransformer", "target": "soh", "r2": 0.881}, | |
| "vae_lstm": {"version": "2.3.0", "display_name": "VAE-LSTM", "family": "deep_pytorch", "algorithm": "VAE_LSTM", "target": "soh", "r2": 0.730}, | |
| "itransformer": {"version": "2.4.0", "display_name": "iTransformer", "family": "deep_keras", "algorithm": "iTransformer", "target": "soh", "r2": 0.595}, | |
| "physics_itransformer": {"version": "2.4.1", "display_name": "Physics iTransformer", "family": "deep_keras", "algorithm": "PhysicsITransformer", "target": "soh", "r2": 0.600}, | |
| "dynamic_graph_itransformer": {"version": "2.5.0", "display_name": "DG-iTransformer", "family": "deep_keras", "algorithm": "DynamicGraphITransformer", "target": "soh", "r2": 0.595}, | |
| "best_ensemble": {"version": "3.0.0", "display_name": "Best Ensemble (RF+XGB+LGB)", "family": "ensemble", "algorithm": "WeightedAverage", "target": "soh", "r2": 0.957}, | |
| } | |
| # RΒ²-proportional weights for BestEnsemble | |
| _ENSEMBLE_WEIGHTS: dict[str, float] = { | |
| "random_forest": 0.957, | |
| "xgboost": 0.928, | |
| "lightgbm": 0.928, | |
| "extra_trees": 0.967, | |
| "gradient_boosting": 0.934, | |
| } | |
| # ββ Degradation state βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def classify_degradation(soh: float) -> str: | |
| if soh >= 90: | |
| return "Healthy" | |
| elif soh >= 80: | |
| return "Moderate" | |
| elif soh >= 70: | |
| return "Degraded" | |
| else: | |
| return "End-of-Life" | |
| def soh_to_color(soh: float) -> str: | |
| """Map SOH percentage to a hex colour (greenβyellowβred).""" | |
| if soh >= 90: | |
| return "#22c55e" # green | |
| elif soh >= 80: | |
| return "#eab308" # yellow | |
| elif soh >= 70: | |
| return "#f97316" # orange | |
| else: | |
| return "#ef4444" # red | |
| # ββ Registry βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ModelRegistry: | |
| """Thread-safe singleton that owns all model objects and inference logic. | |
| Attributes | |
| ---------- | |
| models: | |
| Mapping from name to loaded model object (sklearn/XGBoost/LightGBM | |
| or PyTorch ``nn.Module`` or Keras model). | |
| default_model: | |
| Name of the best available model (set by :meth:`_choose_default`). | |
| device: | |
| PyTorch device string β ``"cuda"`` when a GPU is available, else ``"cpu"``. | |
| """ | |
| # Model families that need the linear StandardScaler at inference | |
| _LINEAR_FAMILIES = {"ridge", "lasso", "elasticnet", "svr", | |
| "knn_k5", "knn_k10", "knn_k20"} | |
| # Tree families that are scale-invariant (no scaler needed) | |
| _TREE_FAMILIES = {"random_forest", "xgboost", "lightgbm", "best_ensemble", | |
| "extra_trees", "gradient_boosting"} | |
| def __init__(self, version: str = "v1"): | |
| self.models: dict[str, Any] = {} | |
| self.model_meta: dict[str, dict] = {} | |
| self.default_model: str | None = None | |
| self.scaler = None # kept for backward compat | |
| self.linear_scaler = None # StandardScaler for Ridge/Lasso/SVR/KNN | |
| self.sequence_scaler = None # StandardScaler for sequence deep models | |
| self.device = "cpu" | |
| self.version = version | |
| # Set version-aware paths | |
| vp = _versioned_paths(version) | |
| self._models_dir = vp["models_dir"] | |
| self._artifacts = vp["artifacts"] | |
| self._scalers_dir = vp["scalers"] | |
| # ββ Loading ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_all(self) -> None: | |
| """Scan artifacts/models and load all available model artefacts. | |
| Safe to call multiple times β subsequent calls are no-ops when the | |
| registry is already populated. | |
| """ | |
| if self.models: | |
| log.debug("Registry already populated β skipping load_all()") | |
| return | |
| self._detect_device() | |
| self._load_scaler() | |
| self._load_classical() | |
| self._load_deep_pytorch() | |
| self._load_deep_keras() | |
| self._register_ensemble() | |
| self._choose_default() | |
| log.info( | |
| "Registry ready β %d models active, default='%s', device=%s", | |
| len(self.models), self.default_model, self.device, | |
| ) | |
| def _detect_device(self) -> None: | |
| """Detect PyTorch compute device (CUDA > CPU).""" | |
| try: | |
| import torch | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| log.info("PyTorch device: %s", self.device) | |
| except ImportError: | |
| log.info("torch not installed β deep PyTorch models unavailable") | |
| def _load_classical(self) -> None: | |
| """Eagerly load all sklearn/XGBoost/LightGBM joblib artefacts.""" | |
| cdir = self._models_dir / "classical" | |
| if not cdir.exists(): | |
| log.warning("Classical models dir not found: %s", cdir) | |
| return | |
| for p in sorted(cdir.glob("*.joblib")): | |
| name = p.stem | |
| # Skip non-model dumps (param search results, classifiers) | |
| if "best_params" in name or "classifier" in name: | |
| continue | |
| try: | |
| self.models[name] = joblib.load(p) | |
| catalog = MODEL_CATALOG.get(name, {}) | |
| self.model_meta[name] = { | |
| **catalog, | |
| "family": "classical", | |
| "loaded": True, | |
| "path": str(p), | |
| } | |
| log.info("Loaded classical: %-22s v%s", name, catalog.get("version", "?")) | |
| except Exception as exc: | |
| log.warning("Failed to load %s: %s", p.name, exc) | |
| def _build_pytorch_model(self, name: str) -> Any | None: | |
| """Instantiate a PyTorch module with the architecture used during training.""" | |
| try: | |
| if name == "vanilla_lstm": | |
| from src.models.deep.lstm import VanillaLSTM | |
| return VanillaLSTM(_N_FEAT, _HIDDEN, _LSTM_LAYERS, _DROPOUT) | |
| if name == "bidirectional_lstm": | |
| from src.models.deep.lstm import BidirectionalLSTM | |
| return BidirectionalLSTM(_N_FEAT, _HIDDEN, _LSTM_LAYERS, _DROPOUT) | |
| if name == "gru": | |
| from src.models.deep.lstm import GRUModel | |
| return GRUModel(_N_FEAT, _HIDDEN, _LSTM_LAYERS, _DROPOUT) | |
| if name == "attention_lstm": | |
| from src.models.deep.lstm import AttentionLSTM | |
| return AttentionLSTM(_N_FEAT, _HIDDEN, _ATTN_LAYERS, _DROPOUT) | |
| if name == "batterygpt": | |
| from src.models.deep.transformer import BatteryGPT | |
| return BatteryGPT( | |
| input_dim=_N_FEAT, d_model=_D_MODEL, n_heads=_N_HEADS, | |
| n_layers=_TF_LAYERS, dropout=_DROPOUT, max_len=64, | |
| ) | |
| if name == "tft": | |
| from src.models.deep.transformer import TemporalFusionTransformer | |
| return TemporalFusionTransformer( | |
| n_features=_N_FEAT, d_model=_D_MODEL, n_heads=_N_HEADS, | |
| n_layers=_TF_LAYERS, dropout=_DROPOUT, | |
| ) | |
| if name == "vae_lstm": | |
| from src.models.deep.vae_lstm import VAE_LSTM | |
| return VAE_LSTM( | |
| input_dim=_N_FEAT, seq_len=_SEQ_LEN, | |
| hidden_dim=_HIDDEN, latent_dim=16, | |
| n_layers=_LSTM_LAYERS, dropout=_DROPOUT, | |
| ) | |
| except Exception as exc: | |
| log.warning("Cannot build PyTorch model '%s': %s", name, exc) | |
| return None | |
| def _load_deep_pytorch(self) -> None: | |
| """Load PyTorch .pt state-dict files into reconstructed model instances.""" | |
| ddir = self._models_dir / "deep" | |
| if not ddir.exists(): | |
| return | |
| try: | |
| import torch | |
| except ImportError: | |
| log.info("torch not installed β skipping deep PyTorch model loading") | |
| return | |
| for p in sorted(ddir.glob("*.pt")): | |
| name = p.stem | |
| model = self._build_pytorch_model(name) | |
| if model is None: | |
| self.model_meta[name] = { | |
| **MODEL_CATALOG.get(name, {}), | |
| "family": "deep_pytorch", "loaded": False, | |
| "path": str(p), "load_error": "architecture unavailable", | |
| } | |
| continue | |
| try: | |
| state = torch.load(p, map_location=self.device, weights_only=True) | |
| model.load_state_dict(state) | |
| model.to(self.device) | |
| model.eval() | |
| self.models[name] = model | |
| catalog = MODEL_CATALOG.get(name, {}) | |
| self.model_meta[name] = { | |
| **catalog, "family": "deep_pytorch", | |
| "loaded": True, "path": str(p), | |
| } | |
| log.info("Loaded PyTorch: %-22s v%s", name, catalog.get("version", "?")) | |
| except Exception as exc: | |
| log.warning("Could not load PyTorch '%s': %s", name, exc) | |
| self.model_meta[name] = { | |
| **MODEL_CATALOG.get(name, {}), | |
| "family": "deep_pytorch", "loaded": False, | |
| "path": str(p), "load_error": str(exc), | |
| } | |
| def _load_deep_keras(self) -> None: | |
| """Load TensorFlow/Keras .keras model files.""" | |
| ddir = self._models_dir / "deep" | |
| if not ddir.exists(): | |
| return | |
| try: | |
| import tensorflow as tf | |
| except ImportError: | |
| log.info("TensorFlow not installed β skipping Keras model loading") | |
| return | |
| # Import the custom Keras classes so they are registered before load | |
| try: | |
| from src.models.deep.itransformer import ( | |
| FeatureWiseMHA, | |
| TokenWiseMHA, | |
| Conv1DFeedForward, | |
| DynamicGraphConv, | |
| PhysicsInformedLoss, | |
| AbsCumCurrentLayer, | |
| ) | |
| _custom_objects: dict = { | |
| "FeatureWiseMHA": FeatureWiseMHA, | |
| "TokenWiseMHA": TokenWiseMHA, | |
| "Conv1DFeedForward": Conv1DFeedForward, | |
| "DynamicGraphConv": DynamicGraphConv, | |
| "PhysicsInformedLoss": PhysicsInformedLoss, | |
| "AbsCumCurrentLayer": AbsCumCurrentLayer, | |
| } | |
| except Exception as imp_err: | |
| log.warning("Could not import iTransformer custom classes: %s", imp_err) | |
| _custom_objects = {} | |
| for p in sorted(ddir.glob("*.keras")): | |
| name = p.stem | |
| try: | |
| model = tf.keras.models.load_model(str(p), custom_objects=_custom_objects, safe_mode=False) | |
| self.models[name] = model | |
| catalog = MODEL_CATALOG.get(name, {}) | |
| self.model_meta[name] = { | |
| **catalog, "family": "deep_keras", | |
| "loaded": True, "path": str(p), | |
| } | |
| log.info("Loaded Keras: %-22s v%s", name, catalog.get("version", "?")) | |
| except Exception as exc: | |
| log.warning("Could not load Keras '%s': %s", name, exc) | |
| self.model_meta[name] = { | |
| **MODEL_CATALOG.get(name, {}), | |
| "family": "deep_keras", "loaded": False, | |
| "path": str(p), "load_error": str(exc), | |
| } | |
| def _register_ensemble(self) -> None: | |
| """Register the BestEnsemble virtual model when components are loaded.""" | |
| available = [m for m in _ENSEMBLE_WEIGHTS if m in self.models] | |
| if not available: | |
| log.warning("BestEnsemble: no component models loaded") | |
| return | |
| self.models["best_ensemble"] = "virtual_ensemble" | |
| self.model_meta["best_ensemble"] = { | |
| **MODEL_CATALOG["best_ensemble"], | |
| "components": available, "loaded": True, | |
| } | |
| log.info("BestEnsemble registered β components: %s", ", ".join(available)) | |
| def _load_scaler(self) -> None: | |
| # Scaler mapping (from notebooks/03_classical_ml.ipynb): | |
| # standard_scaler.joblib β StandardScaler fitted on X_train | |
| # Used for: SVR, Ridge, Lasso, ElasticNet, KNN | |
| # sequence_scaler.joblib β StandardScaler for deep-model sequences | |
| # Tree models (RF, ET, GB, XGB, LGB) were fitted on raw numpy X_train | |
| # β NO scaler applied, passed as-is | |
| # | |
| # Both standard_scaler.joblib and linear_scaler.joblib are identical | |
| # (same mean_ / scale_). Prefer standard_scaler.joblib (canonical name | |
| # from training notebook), fall back to linear_scaler.joblib. | |
| scalers_dir = self._scalers_dir | |
| for fname in ("standard_scaler.joblib", "linear_scaler.joblib"): | |
| sp = scalers_dir / fname | |
| if sp.exists(): | |
| try: | |
| self.linear_scaler = joblib.load(sp) | |
| log.info("Linear scaler loaded from %s", sp) | |
| break | |
| except Exception as exc: | |
| log.warning("Could not load %s: %s", fname, exc) | |
| else: | |
| log.warning("No linear scaler found β Ridge/Lasso/SVR/KNN will use raw features") | |
| sp_seq = scalers_dir / "sequence_scaler.joblib" | |
| if sp_seq.exists(): | |
| try: | |
| self.sequence_scaler = joblib.load(sp_seq) | |
| log.info("Sequence scaler loaded from %s", sp_seq) | |
| except Exception as exc: | |
| log.warning("Could not load sequence_scaler.joblib: %s", exc) | |
| else: | |
| log.warning("sequence_scaler.joblib not found β deep models will use raw features") | |
| def _choose_default(self) -> None: | |
| """Select the highest-quality loaded model as the registry default.""" | |
| priority = [ | |
| "best_ensemble", | |
| "extra_trees", | |
| "random_forest", | |
| "xgboost", | |
| "lightgbm", | |
| "gradient_boosting", | |
| "tft", | |
| "batterygpt", | |
| "attention_lstm", | |
| "ridge", | |
| ] | |
| for name in priority: | |
| if name in self.models: | |
| self.default_model = name | |
| log.info("Default model: %s", name) | |
| return | |
| if self.models: | |
| self.default_model = next(iter(self.models)) | |
| log.info("Default model (fallback): %s", self.default_model) | |
| # ββ Metrics retrieval ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_metrics(self) -> dict[str, dict[str, float]]: | |
| """Return unified evaluation metrics from results CSV/JSON artefacts. | |
| CSV model name headers are normalised to lower-case underscore keys. | |
| Entries missing from result files fall back to the ``r2`` field in | |
| :data:`MODEL_CATALOG`. | |
| """ | |
| _normalise = { | |
| "RandomForest": "random_forest", "LightGBM": "lightgbm", | |
| "XGBoost": "xgboost", "SVR": "svr", "Ridge": "ridge", | |
| "Lasso": "lasso", "ElasticNet": "elasticnet", | |
| "KNN-5": "knn_k5", "KNN-10": "knn_k10", "KNN-20": "knn_k20", | |
| } | |
| results: dict[str, dict[str, float]] = {} | |
| for csv_name in ( | |
| "classical_soh_results.csv", "lstm_soh_results.csv", | |
| "transformer_soh_results.csv", "ensemble_results.csv", | |
| "unified_results.csv", | |
| ): | |
| path = self._artifacts / csv_name | |
| if not path.exists(): | |
| # Fall back to root-level results (backward compat) | |
| path = _ARTIFACTS / csv_name | |
| if not path.exists(): | |
| continue | |
| try: | |
| df = pd.read_csv(path, index_col=0) | |
| for raw in df.index: | |
| key = _normalise.get(str(raw), str(raw).lower().replace(" ", "_")) | |
| results[key] = df.loc[raw].dropna().to_dict() | |
| except Exception as exc: | |
| log.warning("Could not read %s: %s", csv_name, exc) | |
| for json_name in ("dg_itransformer_results.json", "vae_lstm_results.json"): | |
| path = self._artifacts / json_name | |
| if not path.exists(): | |
| path = _ARTIFACTS / json_name | |
| if not path.exists(): | |
| continue | |
| try: | |
| with open(path) as fh: | |
| data = json.load(fh) | |
| key = json_name.replace("_results.json", "") | |
| results[key] = {k: float(v) for k, v in data.items() | |
| if isinstance(v, (int, float))} | |
| except Exception as exc: | |
| log.warning("Could not read %s: %s", json_name, exc) | |
| # Fill from catalog for anything not in result files | |
| for name, info in MODEL_CATALOG.items(): | |
| if name not in results and "r2" in info: | |
| results[name] = {"R2": info["r2"]} | |
| return results | |
| # ββ Prediction helpers ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_x(self, features: dict[str, float]) -> np.ndarray: | |
| """Build raw (1, F) feature numpy array β NO scaling applied here. | |
| Scaling is applied per-model-family in :meth:`predict` because | |
| tree models need no scaling while linear/deep models need different | |
| scalers. | |
| """ | |
| return np.array([[features.get(c, 0.0) for c in FEATURE_COLS_SCALAR]]) | |
| def _x_for_model(model: Any, x: np.ndarray) -> Any: | |
| """Return x in the format the model was fitted with. | |
| * If the model has ``feature_names_in_`` β pass a DataFrame whose | |
| columns match those exact names (handles LGB trained with Column_0β¦). | |
| * Otherwise β pass the raw numpy array (RF, ET trained without names). | |
| """ | |
| names = getattr(model, "feature_names_in_", None) | |
| if names is None: | |
| return x # numpy β model was fitted without feature names | |
| # Build DataFrame with the same column names the model was trained with | |
| return pd.DataFrame(x, columns=list(names)) | |
| def _scale_for_linear(self, x: np.ndarray) -> np.ndarray: | |
| """Apply StandardScaler for linear / SVR / KNN models.""" | |
| if self.linear_scaler is not None: | |
| try: | |
| return self.linear_scaler.transform(x) | |
| except Exception as exc: | |
| log.warning("Linear scaler transform failed: %s", exc) | |
| return x | |
| def _build_sequence_array( | |
| self, x: np.ndarray, seq_len: int = _SEQ_LEN | |
| ) -> np.ndarray: | |
| """Convert single-cycle feature row β scaled (1, seq_len, F) numpy array. | |
| Tile the current feature vector across *seq_len* timesteps and apply | |
| the sequence scaler so values match the training distribution. | |
| """ | |
| if self.sequence_scaler is not None: | |
| try: | |
| x_sc = self.sequence_scaler.transform(x) # (1, F) | |
| except Exception: | |
| x_sc = x | |
| else: | |
| x_sc = x | |
| # Tile to (1, seq_len, F) | |
| return np.tile(x_sc[:, np.newaxis, :], (1, seq_len, 1)).astype(np.float32) | |
| def _build_sequence_tensor( | |
| self, x: np.ndarray, seq_len: int = _SEQ_LEN | |
| ) -> Any: | |
| """Same as :meth:`_build_sequence_array` but returns a PyTorch tensor.""" | |
| import torch | |
| return torch.tensor(self._build_sequence_array(x, seq_len), dtype=torch.float32) | |
| def _predict_ensemble(self, x: np.ndarray) -> tuple[float, str]: | |
| """Weighted-average SOH prediction from BestEnsemble component models. | |
| Each component model receives input in the format it was trained with: | |
| - RF, ET, GB, XGB: raw numpy (trained on X_train.values, no feature names) | |
| - LGB: DataFrame with Column_0β¦Column_11 (LightGBM auto-assigned during training) | |
| Both cases handled by :meth:`_x_for_model`. | |
| """ | |
| components = self.model_meta.get("best_ensemble", {}).get( | |
| "components", list(_ENSEMBLE_WEIGHTS.keys()) | |
| ) | |
| total_w, weighted_sum = 0.0, 0.0 | |
| used: list[str] = [] | |
| for cname in components: | |
| if cname not in self.models: | |
| continue | |
| w = _ENSEMBLE_WEIGHTS.get(cname, 1.0) | |
| xi = self._x_for_model(self.models[cname], x) | |
| soh = float(self.models[cname].predict(xi)[0]) | |
| weighted_sum += w * soh | |
| total_w += w | |
| used.append(cname) | |
| if total_w == 0: | |
| raise ValueError("No BestEnsemble components available") | |
| return weighted_sum / total_w, f"best_ensemble({', '.join(used)})" | |
| # ββ Prediction ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def predict( | |
| self, | |
| features: dict[str, float], | |
| model_name: str | None = None, | |
| ) -> dict[str, Any]: | |
| """Predict SOH for a single battery cycle. | |
| Parameters | |
| ---------- | |
| features: | |
| Dict of cycle features; keys from :data:`FEATURE_COLS_SCALAR`. | |
| Missing keys are filled with 0.0. | |
| model_name: | |
| Registry model key (e.g. ``"best_ensemble"``, ``"random_forest"``, | |
| ``"tft"``). Defaults to :attr:`default_model`. | |
| Returns | |
| ------- | |
| dict | |
| ``soh_pct``, ``degradation_state``, ``rul_cycles``, | |
| ``confidence_lower``, ``confidence_upper``, | |
| ``model_used``, ``model_version``. | |
| """ | |
| name = model_name or self.default_model | |
| if name is None: | |
| raise ValueError("No models loaded in registry") | |
| x = self._build_x(features) | |
| # ββ Dispatch by model type ββββββββββββββββββββββββββββββββββββββ | |
| if name == "best_ensemble": | |
| soh, label = self._predict_ensemble(x) | |
| elif name in self.models: | |
| model = self.models[name] | |
| family = self.model_meta.get(name, {}).get("family", "classical") | |
| if family == "deep_pytorch": | |
| try: | |
| import torch | |
| with torch.no_grad(): | |
| # Build scaled (1, seq_len, F) sequence tensor | |
| t = self._build_sequence_tensor(x).to(self.device) | |
| out = model(t) | |
| # VAE-LSTM returns a dict; all others return a tensor | |
| if isinstance(out, dict): | |
| out = out["health_pred"] | |
| soh = float(out.cpu().numpy().ravel()[0]) | |
| except Exception as exc: | |
| log.error("PyTorch inference error for '%s': %s", name, exc) | |
| raise | |
| elif family == "deep_keras": | |
| try: | |
| # Build scaled (1, seq_len, F) numpy array for Keras | |
| seq_np = self._build_sequence_array(x) # (1, 32, F) | |
| out = model.predict(seq_np, verbose=0) | |
| # Physics-Informed model returns a dict with multiple heads | |
| if isinstance(out, dict): | |
| out = out.get("soh_ml", next(iter(out.values()))) | |
| soh = float(np.asarray(out).ravel()[0]) | |
| except Exception as exc: | |
| log.error("Keras inference error for '%s': %s", name, exc) | |
| raise | |
| elif name in self._LINEAR_FAMILIES: | |
| # Ridge/Lasso/ElasticNet/SVR/KNN need StandardScaler | |
| x_lin = self._scale_for_linear(x) | |
| soh = float(model.predict(x_lin)[0]) | |
| else: | |
| # RF/XGB/LGB β scale-invariant; use per-model input format | |
| xi = self._x_for_model(model, x) | |
| soh = float(model.predict(xi)[0]) | |
| label = name | |
| else: | |
| fallback = self.default_model | |
| if fallback and fallback != name and fallback in self.models: | |
| log.warning("Model '%s' not loaded β falling back to '%s'", name, fallback) | |
| return self.predict(features, fallback) | |
| raise ValueError( | |
| f"Model '{name}' is not available. " | |
| f"Loaded: {list(self.models.keys())}" | |
| ) | |
| soh = float(np.clip(soh, 0.0, 100.0)) | |
| # ββ RUL estimate ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Data-driven estimate: linear degradation from current SOH to 70% | |
| # (EOL threshold), calibrated to NASA dataset's ~0.2-0.4 %/cycle rate. | |
| EOL_THRESHOLD = 70.0 | |
| if soh > EOL_THRESHOLD: | |
| # Degradation rate: use delta_capacity as a proxy (Ah/cycle) | |
| # NASA nominal: ~2.0 Ah, so %/cycle = delta_cap / 2.0 * 100 | |
| cap_loss_per_cycle_pct = abs(features.get("delta_capacity", -0.005)) / 2.0 * 100 | |
| # Clamp to realistic range: 0.05 β 2.0 %/cycle | |
| rate = max(0.05, min(cap_loss_per_cycle_pct, 2.0)) | |
| rul = (soh - EOL_THRESHOLD) / rate | |
| else: | |
| rul = 0.0 | |
| version = self.model_meta.get(name, MODEL_CATALOG.get(name, {})).get("version", "?") | |
| return { | |
| "soh_pct": round(soh, 2), | |
| "degradation_state": classify_degradation(soh), | |
| "rul_cycles": round(rul, 1), | |
| "confidence_lower": round(soh - 2.0, 2), | |
| "confidence_upper": round(soh + 2.0, 2), | |
| "model_used": label, | |
| "model_version": version, | |
| } | |
| def predict_batch( | |
| self, | |
| battery_id: str, | |
| cycles: list[dict[str, float]], | |
| model_name: str | None = None, | |
| ) -> list[dict[str, Any]]: | |
| """Predict SOH for multiple cycles of the same battery.""" | |
| return [ | |
| {**self.predict(c, model_name), | |
| "battery_id": battery_id, | |
| "cycle_number": c.get("cycle_number", i + 1)} | |
| for i, c in enumerate(cycles) | |
| ] | |
| def predict_array( | |
| self, | |
| X: np.ndarray, | |
| model_name: str | None = None, | |
| ) -> tuple[np.ndarray, str]: | |
| """Vectorized batch SOH prediction on an (N, F) feature matrix. | |
| Performs a **single** ``model.predict()`` call for the whole array, | |
| giving O(1) Python overhead regardless of how many rows N is. | |
| Used by the simulation endpoint to avoid per-step loop overhead. | |
| Parameters | |
| ---------- | |
| X: | |
| Shape ``(N, len(FEATURE_COLS_SCALAR))`` β rows are ordered by | |
| ``FEATURE_COLS_SCALAR``, no scaling applied yet. | |
| model_name: | |
| Model key. Defaults to :attr:`default_model`. | |
| Returns | |
| ------- | |
| tuple[np.ndarray, str] | |
| ``(soh_array, model_label)`` β ``soh_array`` has shape ``(N,)``, | |
| values clipped to ``[0, 100]``. | |
| Notes | |
| ----- | |
| Deep sequence models (PyTorch / Keras) are not batchable here because | |
| they require multi-timestep tensors. Callers that request a deep model | |
| will get a ``ValueError``; the simulate endpoint falls back to physics. | |
| """ | |
| name = model_name or self.default_model | |
| if name is None: | |
| raise ValueError("No models loaded in registry") | |
| if name == "best_ensemble": | |
| components = self.model_meta.get("best_ensemble", {}).get( | |
| "components", list(_ENSEMBLE_WEIGHTS.keys()) | |
| ) | |
| total_w: float = 0.0 | |
| weighted_sum: np.ndarray | None = None | |
| used: list[str] = [] | |
| for cname in components: | |
| if cname not in self.models: | |
| continue | |
| w = _ENSEMBLE_WEIGHTS.get(cname, 1.0) | |
| xi = self._x_for_model(self.models[cname], X) | |
| preds = np.asarray(self.models[cname].predict(xi), dtype=float) | |
| weighted_sum = preds * w if weighted_sum is None else weighted_sum + preds * w | |
| total_w += w | |
| used.append(cname) | |
| if total_w == 0 or weighted_sum is None: | |
| raise ValueError("No BestEnsemble components available") | |
| return np.clip(weighted_sum / total_w, 0.0, 100.0), f"best_ensemble({', '.join(used)})" | |
| elif name in self.models: | |
| model = self.models[name] | |
| family = self.model_meta.get(name, {}).get("family", "classical") | |
| if family in ("deep_pytorch", "deep_keras"): | |
| raise ValueError( | |
| f"Model '{name}' is a deep sequence model and cannot be " | |
| "batch-predicted. Use predict() per sample instead." | |
| ) | |
| elif name in self._LINEAR_FAMILIES: | |
| xi = self._scale_for_linear(X) | |
| else: | |
| xi = self._x_for_model(model, X) | |
| return np.clip(np.asarray(model.predict(xi), dtype=float), 0.0, 100.0), name | |
| else: | |
| fallback = self.default_model | |
| if fallback and fallback != name and fallback in self.models: | |
| log.warning("predict_array: '%s' not loaded β falling back to '%s'", name, fallback) | |
| return self.predict_array(X, fallback) | |
| raise ValueError(f"Model '{name}' is not available. Loaded: {list(self.models.keys())}") | |
| # ββ Info helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def model_count(self) -> int: | |
| """Total number of registered model entries.""" | |
| return len(set(list(self.models.keys()) + list(self.model_meta.keys()))) | |
| def list_models(self) -> list[dict[str, Any]]: | |
| """Return full model listing with versioning, metrics, and load status.""" | |
| all_metrics = self.get_metrics() | |
| out: list[dict[str, Any]] = [] | |
| for name in MODEL_CATALOG: | |
| catalog = MODEL_CATALOG[name] | |
| meta = self.model_meta.get(name, {}) | |
| out.append({ | |
| "name": name, | |
| "version": catalog.get("version", "?"), | |
| "display_name": catalog.get("display_name", name), | |
| "family": catalog.get("family", "unknown"), | |
| "algorithm": catalog.get("algorithm", ""), | |
| "target": catalog.get("target", "soh"), | |
| "r2": catalog.get("r2"), | |
| "metrics": all_metrics.get(name, {}), | |
| "is_default": name == self.default_model, | |
| "loaded": name in self.models, | |
| "load_error": meta.get("load_error"), | |
| }) | |
| return out | |
| # ββ Singletons βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| registry_v1 = ModelRegistry(version="v1") | |
| registry_v2 = ModelRegistry(version="v2") | |
| # Default registry β v2 (latest models, bug fixes) | |
| registry = registry_v2 | |