aiBatteryLifeCycle / api /model_registry.py
NeerajCodz's picture
feat: full project β€” ML simulation, dashboard UI, models on HF Hub
f381be8
"""
api.model_registry
==================
Singleton model registry providing unified loading, versioning, and inference
for all trained battery lifecycle models.
Model versioning
----------------
* v1.x β€” Classical (tree-based / linear) models trained in NB03.
* v2.x β€” Deep sequence models trained in NB04 – NB07.
* v3.x β€” Ensemble / meta-models trained in NB08.
Usage
-----
from api.model_registry import registry
registry.load_all() # FastAPI lifespan startup
result = registry.predict(
features={"cycle_number": 150, ...},
model_name="best_ensemble",
)
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import joblib
import numpy as np
import pandas as pd
from src.utils.logger import get_logger
log = get_logger(__name__)
# ── Architecture constants (must match NB04 – NB07 training) ─────────────────
_N_FEAT: int = 12 # len(FEATURE_COLS_SCALAR)
_SEQ_LEN: int = 32 # WINDOW_SIZE
_HIDDEN: int = 128 # LSTM_HIDDEN
_LSTM_LAYERS: int = 2 # LSTM_LAYERS
_ATTN_LAYERS: int = 3 # AttentionLSTM trained with n_layers=3
_D_MODEL: int = 64 # TRANSFORMER_D_MODEL
_N_HEADS: int = 4 # TRANSFORMER_NHEAD
_TF_LAYERS: int = 2 # TRANSFORMER_NLAYERS
_DROPOUT: float = 0.2 # DROPOUT
# ── Paths ─────────────────────────────────────────────────────────────────────
_HERE = Path(__file__).resolve().parent
_PROJECT = _HERE.parent
_MODELS_DIR = _PROJECT / "artifacts" / "models"
_ARTIFACTS = _PROJECT / "artifacts"
def _versioned_paths(version: str = "v1") -> dict[str, Path]:
"""Return artifact paths for a specific model version (v1 or v2)."""
root = _PROJECT / "artifacts" / version
return {
"models_dir": root / "models",
"artifacts": root,
"scalers": root / "scalers",
"results": root / "results",
}
FEATURE_COLS_SCALAR: list[str] = [
"cycle_number", "ambient_temperature",
"peak_voltage", "min_voltage", "voltage_range",
"avg_current", "avg_temp", "temp_rise",
"cycle_duration", "Re", "Rct", "delta_capacity",
]
# ── Model catalog (single source of truth for versions & metadata) ────────────
MODEL_CATALOG: dict[str, dict[str, Any]] = {
"random_forest": {"version": "1.0.0", "display_name": "Random Forest", "family": "classical", "algorithm": "RandomForestRegressor", "target": "soh", "r2": 0.9567},
"xgboost": {"version": "1.0.0", "display_name": "XGBoost", "family": "classical", "algorithm": "XGBRegressor", "target": "soh", "r2": 0.928},
"lightgbm": {"version": "1.0.0", "display_name": "LightGBM", "family": "classical", "algorithm": "LGBMRegressor", "target": "soh", "r2": 0.928},
"ridge": {"version": "1.0.0", "display_name": "Ridge Regression", "family": "classical", "algorithm": "Ridge", "target": "soh", "r2": 0.72},
"svr": {"version": "1.0.0", "display_name": "SVR (RBF)", "family": "classical", "algorithm": "SVR", "target": "soh", "r2": 0.805},
"lasso": {"version": "1.0.0", "display_name": "Lasso", "family": "classical", "algorithm": "Lasso", "target": "soh", "r2": 0.52},
"elasticnet": {"version": "1.0.0", "display_name": "ElasticNet", "family": "classical", "algorithm": "ElasticNet", "target": "soh", "r2": 0.52},
"knn_k5": {"version": "1.0.0", "display_name": "KNN (k=5)", "family": "classical", "algorithm": "KNeighborsRegressor", "target": "soh", "r2": 0.72},
"knn_k10": {"version": "1.0.0", "display_name": "KNN (k=10)", "family": "classical", "algorithm": "KNeighborsRegressor", "target": "soh", "r2": 0.724},
"knn_k20": {"version": "1.0.0", "display_name": "KNN (k=20)", "family": "classical", "algorithm": "KNeighborsRegressor", "target": "soh", "r2": 0.717},
"extra_trees": {"version": "2.0.0", "display_name": "ExtraTrees", "family": "classical", "algorithm": "ExtraTreesRegressor", "target": "soh", "r2": 0.967},
"gradient_boosting": {"version": "2.0.0", "display_name": "GradientBoosting", "family": "classical", "algorithm": "GradientBoostingRegressor", "target": "soh", "r2": 0.934},
"vanilla_lstm": {"version": "2.0.0", "display_name": "Vanilla LSTM", "family": "deep_pytorch", "algorithm": "VanillaLSTM", "target": "soh", "r2": 0.507},
"bidirectional_lstm": {"version": "2.0.0", "display_name": "Bidirectional LSTM", "family": "deep_pytorch", "algorithm": "BidirectionalLSTM", "target": "soh", "r2": 0.520},
"gru": {"version": "2.0.0", "display_name": "GRU", "family": "deep_pytorch", "algorithm": "GRUModel", "target": "soh", "r2": 0.510},
"attention_lstm": {"version": "2.0.0", "display_name": "Attention LSTM", "family": "deep_pytorch", "algorithm": "AttentionLSTM", "target": "soh", "r2": 0.540},
"batterygpt": {"version": "2.1.0", "display_name": "BatteryGPT", "family": "deep_pytorch", "algorithm": "BatteryGPT", "target": "soh", "r2": 0.881},
"tft": {"version": "2.2.0", "display_name": "Temporal Fusion Transformer", "family": "deep_pytorch", "algorithm": "TemporalFusionTransformer", "target": "soh", "r2": 0.881},
"vae_lstm": {"version": "2.3.0", "display_name": "VAE-LSTM", "family": "deep_pytorch", "algorithm": "VAE_LSTM", "target": "soh", "r2": 0.730},
"itransformer": {"version": "2.4.0", "display_name": "iTransformer", "family": "deep_keras", "algorithm": "iTransformer", "target": "soh", "r2": 0.595},
"physics_itransformer": {"version": "2.4.1", "display_name": "Physics iTransformer", "family": "deep_keras", "algorithm": "PhysicsITransformer", "target": "soh", "r2": 0.600},
"dynamic_graph_itransformer": {"version": "2.5.0", "display_name": "DG-iTransformer", "family": "deep_keras", "algorithm": "DynamicGraphITransformer", "target": "soh", "r2": 0.595},
"best_ensemble": {"version": "3.0.0", "display_name": "Best Ensemble (RF+XGB+LGB)", "family": "ensemble", "algorithm": "WeightedAverage", "target": "soh", "r2": 0.957},
}
# RΒ²-proportional weights for BestEnsemble
_ENSEMBLE_WEIGHTS: dict[str, float] = {
"random_forest": 0.957,
"xgboost": 0.928,
"lightgbm": 0.928,
"extra_trees": 0.967,
"gradient_boosting": 0.934,
}
# ── Degradation state ───────────────────────────────────────────────────────
def classify_degradation(soh: float) -> str:
if soh >= 90:
return "Healthy"
elif soh >= 80:
return "Moderate"
elif soh >= 70:
return "Degraded"
else:
return "End-of-Life"
def soh_to_color(soh: float) -> str:
"""Map SOH percentage to a hex colour (green→yellow→red)."""
if soh >= 90:
return "#22c55e" # green
elif soh >= 80:
return "#eab308" # yellow
elif soh >= 70:
return "#f97316" # orange
else:
return "#ef4444" # red
# ── Registry ─────────────────────────────────────────────────────────────────
class ModelRegistry:
"""Thread-safe singleton that owns all model objects and inference logic.
Attributes
----------
models:
Mapping from name to loaded model object (sklearn/XGBoost/LightGBM
or PyTorch ``nn.Module`` or Keras model).
default_model:
Name of the best available model (set by :meth:`_choose_default`).
device:
PyTorch device string β€” ``"cuda"`` when a GPU is available, else ``"cpu"``.
"""
# Model families that need the linear StandardScaler at inference
_LINEAR_FAMILIES = {"ridge", "lasso", "elasticnet", "svr",
"knn_k5", "knn_k10", "knn_k20"}
# Tree families that are scale-invariant (no scaler needed)
_TREE_FAMILIES = {"random_forest", "xgboost", "lightgbm", "best_ensemble",
"extra_trees", "gradient_boosting"}
def __init__(self, version: str = "v1"):
self.models: dict[str, Any] = {}
self.model_meta: dict[str, dict] = {}
self.default_model: str | None = None
self.scaler = None # kept for backward compat
self.linear_scaler = None # StandardScaler for Ridge/Lasso/SVR/KNN
self.sequence_scaler = None # StandardScaler for sequence deep models
self.device = "cpu"
self.version = version
# Set version-aware paths
vp = _versioned_paths(version)
self._models_dir = vp["models_dir"]
self._artifacts = vp["artifacts"]
self._scalers_dir = vp["scalers"]
# ── Loading ──────────────────────────────────────────────────────────
def load_all(self) -> None:
"""Scan artifacts/models and load all available model artefacts.
Safe to call multiple times β€” subsequent calls are no-ops when the
registry is already populated.
"""
if self.models:
log.debug("Registry already populated β€” skipping load_all()")
return
self._detect_device()
self._load_scaler()
self._load_classical()
self._load_deep_pytorch()
self._load_deep_keras()
self._register_ensemble()
self._choose_default()
log.info(
"Registry ready β€” %d models active, default='%s', device=%s",
len(self.models), self.default_model, self.device,
)
def _detect_device(self) -> None:
"""Detect PyTorch compute device (CUDA > CPU)."""
try:
import torch
self.device = "cuda" if torch.cuda.is_available() else "cpu"
log.info("PyTorch device: %s", self.device)
except ImportError:
log.info("torch not installed β€” deep PyTorch models unavailable")
def _load_classical(self) -> None:
"""Eagerly load all sklearn/XGBoost/LightGBM joblib artefacts."""
cdir = self._models_dir / "classical"
if not cdir.exists():
log.warning("Classical models dir not found: %s", cdir)
return
for p in sorted(cdir.glob("*.joblib")):
name = p.stem
# Skip non-model dumps (param search results, classifiers)
if "best_params" in name or "classifier" in name:
continue
try:
self.models[name] = joblib.load(p)
catalog = MODEL_CATALOG.get(name, {})
self.model_meta[name] = {
**catalog,
"family": "classical",
"loaded": True,
"path": str(p),
}
log.info("Loaded classical: %-22s v%s", name, catalog.get("version", "?"))
except Exception as exc:
log.warning("Failed to load %s: %s", p.name, exc)
def _build_pytorch_model(self, name: str) -> Any | None:
"""Instantiate a PyTorch module with the architecture used during training."""
try:
if name == "vanilla_lstm":
from src.models.deep.lstm import VanillaLSTM
return VanillaLSTM(_N_FEAT, _HIDDEN, _LSTM_LAYERS, _DROPOUT)
if name == "bidirectional_lstm":
from src.models.deep.lstm import BidirectionalLSTM
return BidirectionalLSTM(_N_FEAT, _HIDDEN, _LSTM_LAYERS, _DROPOUT)
if name == "gru":
from src.models.deep.lstm import GRUModel
return GRUModel(_N_FEAT, _HIDDEN, _LSTM_LAYERS, _DROPOUT)
if name == "attention_lstm":
from src.models.deep.lstm import AttentionLSTM
return AttentionLSTM(_N_FEAT, _HIDDEN, _ATTN_LAYERS, _DROPOUT)
if name == "batterygpt":
from src.models.deep.transformer import BatteryGPT
return BatteryGPT(
input_dim=_N_FEAT, d_model=_D_MODEL, n_heads=_N_HEADS,
n_layers=_TF_LAYERS, dropout=_DROPOUT, max_len=64,
)
if name == "tft":
from src.models.deep.transformer import TemporalFusionTransformer
return TemporalFusionTransformer(
n_features=_N_FEAT, d_model=_D_MODEL, n_heads=_N_HEADS,
n_layers=_TF_LAYERS, dropout=_DROPOUT,
)
if name == "vae_lstm":
from src.models.deep.vae_lstm import VAE_LSTM
return VAE_LSTM(
input_dim=_N_FEAT, seq_len=_SEQ_LEN,
hidden_dim=_HIDDEN, latent_dim=16,
n_layers=_LSTM_LAYERS, dropout=_DROPOUT,
)
except Exception as exc:
log.warning("Cannot build PyTorch model '%s': %s", name, exc)
return None
def _load_deep_pytorch(self) -> None:
"""Load PyTorch .pt state-dict files into reconstructed model instances."""
ddir = self._models_dir / "deep"
if not ddir.exists():
return
try:
import torch
except ImportError:
log.info("torch not installed β€” skipping deep PyTorch model loading")
return
for p in sorted(ddir.glob("*.pt")):
name = p.stem
model = self._build_pytorch_model(name)
if model is None:
self.model_meta[name] = {
**MODEL_CATALOG.get(name, {}),
"family": "deep_pytorch", "loaded": False,
"path": str(p), "load_error": "architecture unavailable",
}
continue
try:
state = torch.load(p, map_location=self.device, weights_only=True)
model.load_state_dict(state)
model.to(self.device)
model.eval()
self.models[name] = model
catalog = MODEL_CATALOG.get(name, {})
self.model_meta[name] = {
**catalog, "family": "deep_pytorch",
"loaded": True, "path": str(p),
}
log.info("Loaded PyTorch: %-22s v%s", name, catalog.get("version", "?"))
except Exception as exc:
log.warning("Could not load PyTorch '%s': %s", name, exc)
self.model_meta[name] = {
**MODEL_CATALOG.get(name, {}),
"family": "deep_pytorch", "loaded": False,
"path": str(p), "load_error": str(exc),
}
def _load_deep_keras(self) -> None:
"""Load TensorFlow/Keras .keras model files."""
ddir = self._models_dir / "deep"
if not ddir.exists():
return
try:
import tensorflow as tf
except ImportError:
log.info("TensorFlow not installed β€” skipping Keras model loading")
return
# Import the custom Keras classes so they are registered before load
try:
from src.models.deep.itransformer import (
FeatureWiseMHA,
TokenWiseMHA,
Conv1DFeedForward,
DynamicGraphConv,
PhysicsInformedLoss,
AbsCumCurrentLayer,
)
_custom_objects: dict = {
"FeatureWiseMHA": FeatureWiseMHA,
"TokenWiseMHA": TokenWiseMHA,
"Conv1DFeedForward": Conv1DFeedForward,
"DynamicGraphConv": DynamicGraphConv,
"PhysicsInformedLoss": PhysicsInformedLoss,
"AbsCumCurrentLayer": AbsCumCurrentLayer,
}
except Exception as imp_err:
log.warning("Could not import iTransformer custom classes: %s", imp_err)
_custom_objects = {}
for p in sorted(ddir.glob("*.keras")):
name = p.stem
try:
model = tf.keras.models.load_model(str(p), custom_objects=_custom_objects, safe_mode=False)
self.models[name] = model
catalog = MODEL_CATALOG.get(name, {})
self.model_meta[name] = {
**catalog, "family": "deep_keras",
"loaded": True, "path": str(p),
}
log.info("Loaded Keras: %-22s v%s", name, catalog.get("version", "?"))
except Exception as exc:
log.warning("Could not load Keras '%s': %s", name, exc)
self.model_meta[name] = {
**MODEL_CATALOG.get(name, {}),
"family": "deep_keras", "loaded": False,
"path": str(p), "load_error": str(exc),
}
def _register_ensemble(self) -> None:
"""Register the BestEnsemble virtual model when components are loaded."""
available = [m for m in _ENSEMBLE_WEIGHTS if m in self.models]
if not available:
log.warning("BestEnsemble: no component models loaded")
return
self.models["best_ensemble"] = "virtual_ensemble"
self.model_meta["best_ensemble"] = {
**MODEL_CATALOG["best_ensemble"],
"components": available, "loaded": True,
}
log.info("BestEnsemble registered β€” components: %s", ", ".join(available))
def _load_scaler(self) -> None:
# Scaler mapping (from notebooks/03_classical_ml.ipynb):
# standard_scaler.joblib β€” StandardScaler fitted on X_train
# Used for: SVR, Ridge, Lasso, ElasticNet, KNN
# sequence_scaler.joblib β€” StandardScaler for deep-model sequences
# Tree models (RF, ET, GB, XGB, LGB) were fitted on raw numpy X_train
# β†’ NO scaler applied, passed as-is
#
# Both standard_scaler.joblib and linear_scaler.joblib are identical
# (same mean_ / scale_). Prefer standard_scaler.joblib (canonical name
# from training notebook), fall back to linear_scaler.joblib.
scalers_dir = self._scalers_dir
for fname in ("standard_scaler.joblib", "linear_scaler.joblib"):
sp = scalers_dir / fname
if sp.exists():
try:
self.linear_scaler = joblib.load(sp)
log.info("Linear scaler loaded from %s", sp)
break
except Exception as exc:
log.warning("Could not load %s: %s", fname, exc)
else:
log.warning("No linear scaler found β€” Ridge/Lasso/SVR/KNN will use raw features")
sp_seq = scalers_dir / "sequence_scaler.joblib"
if sp_seq.exists():
try:
self.sequence_scaler = joblib.load(sp_seq)
log.info("Sequence scaler loaded from %s", sp_seq)
except Exception as exc:
log.warning("Could not load sequence_scaler.joblib: %s", exc)
else:
log.warning("sequence_scaler.joblib not found β€” deep models will use raw features")
def _choose_default(self) -> None:
"""Select the highest-quality loaded model as the registry default."""
priority = [
"best_ensemble",
"extra_trees",
"random_forest",
"xgboost",
"lightgbm",
"gradient_boosting",
"tft",
"batterygpt",
"attention_lstm",
"ridge",
]
for name in priority:
if name in self.models:
self.default_model = name
log.info("Default model: %s", name)
return
if self.models:
self.default_model = next(iter(self.models))
log.info("Default model (fallback): %s", self.default_model)
# ── Metrics retrieval ────────────────────────────────────────────────
def get_metrics(self) -> dict[str, dict[str, float]]:
"""Return unified evaluation metrics from results CSV/JSON artefacts.
CSV model name headers are normalised to lower-case underscore keys.
Entries missing from result files fall back to the ``r2`` field in
:data:`MODEL_CATALOG`.
"""
_normalise = {
"RandomForest": "random_forest", "LightGBM": "lightgbm",
"XGBoost": "xgboost", "SVR": "svr", "Ridge": "ridge",
"Lasso": "lasso", "ElasticNet": "elasticnet",
"KNN-5": "knn_k5", "KNN-10": "knn_k10", "KNN-20": "knn_k20",
}
results: dict[str, dict[str, float]] = {}
for csv_name in (
"classical_soh_results.csv", "lstm_soh_results.csv",
"transformer_soh_results.csv", "ensemble_results.csv",
"unified_results.csv",
):
path = self._artifacts / csv_name
if not path.exists():
# Fall back to root-level results (backward compat)
path = _ARTIFACTS / csv_name
if not path.exists():
continue
try:
df = pd.read_csv(path, index_col=0)
for raw in df.index:
key = _normalise.get(str(raw), str(raw).lower().replace(" ", "_"))
results[key] = df.loc[raw].dropna().to_dict()
except Exception as exc:
log.warning("Could not read %s: %s", csv_name, exc)
for json_name in ("dg_itransformer_results.json", "vae_lstm_results.json"):
path = self._artifacts / json_name
if not path.exists():
path = _ARTIFACTS / json_name
if not path.exists():
continue
try:
with open(path) as fh:
data = json.load(fh)
key = json_name.replace("_results.json", "")
results[key] = {k: float(v) for k, v in data.items()
if isinstance(v, (int, float))}
except Exception as exc:
log.warning("Could not read %s: %s", json_name, exc)
# Fill from catalog for anything not in result files
for name, info in MODEL_CATALOG.items():
if name not in results and "r2" in info:
results[name] = {"R2": info["r2"]}
return results
# ── Prediction helpers ────────────────────────────────────────────────
def _build_x(self, features: dict[str, float]) -> np.ndarray:
"""Build raw (1, F) feature numpy array β€” NO scaling applied here.
Scaling is applied per-model-family in :meth:`predict` because
tree models need no scaling while linear/deep models need different
scalers.
"""
return np.array([[features.get(c, 0.0) for c in FEATURE_COLS_SCALAR]])
@staticmethod
def _x_for_model(model: Any, x: np.ndarray) -> Any:
"""Return x in the format the model was fitted with.
* If the model has ``feature_names_in_`` β†’ pass a DataFrame whose
columns match those exact names (handles LGB trained with Column_0…).
* Otherwise β†’ pass the raw numpy array (RF, ET trained without names).
"""
names = getattr(model, "feature_names_in_", None)
if names is None:
return x # numpy β€” model was fitted without feature names
# Build DataFrame with the same column names the model was trained with
return pd.DataFrame(x, columns=list(names))
def _scale_for_linear(self, x: np.ndarray) -> np.ndarray:
"""Apply StandardScaler for linear / SVR / KNN models."""
if self.linear_scaler is not None:
try:
return self.linear_scaler.transform(x)
except Exception as exc:
log.warning("Linear scaler transform failed: %s", exc)
return x
def _build_sequence_array(
self, x: np.ndarray, seq_len: int = _SEQ_LEN
) -> np.ndarray:
"""Convert single-cycle feature row β†’ scaled (1, seq_len, F) numpy array.
Tile the current feature vector across *seq_len* timesteps and apply
the sequence scaler so values match the training distribution.
"""
if self.sequence_scaler is not None:
try:
x_sc = self.sequence_scaler.transform(x) # (1, F)
except Exception:
x_sc = x
else:
x_sc = x
# Tile to (1, seq_len, F)
return np.tile(x_sc[:, np.newaxis, :], (1, seq_len, 1)).astype(np.float32)
def _build_sequence_tensor(
self, x: np.ndarray, seq_len: int = _SEQ_LEN
) -> Any:
"""Same as :meth:`_build_sequence_array` but returns a PyTorch tensor."""
import torch
return torch.tensor(self._build_sequence_array(x, seq_len), dtype=torch.float32)
def _predict_ensemble(self, x: np.ndarray) -> tuple[float, str]:
"""Weighted-average SOH prediction from BestEnsemble component models.
Each component model receives input in the format it was trained with:
- RF, ET, GB, XGB: raw numpy (trained on X_train.values, no feature names)
- LGB: DataFrame with Column_0…Column_11 (LightGBM auto-assigned during training)
Both cases handled by :meth:`_x_for_model`.
"""
components = self.model_meta.get("best_ensemble", {}).get(
"components", list(_ENSEMBLE_WEIGHTS.keys())
)
total_w, weighted_sum = 0.0, 0.0
used: list[str] = []
for cname in components:
if cname not in self.models:
continue
w = _ENSEMBLE_WEIGHTS.get(cname, 1.0)
xi = self._x_for_model(self.models[cname], x)
soh = float(self.models[cname].predict(xi)[0])
weighted_sum += w * soh
total_w += w
used.append(cname)
if total_w == 0:
raise ValueError("No BestEnsemble components available")
return weighted_sum / total_w, f"best_ensemble({', '.join(used)})"
# ── Prediction ────────────────────────────────────────────────────────
def predict(
self,
features: dict[str, float],
model_name: str | None = None,
) -> dict[str, Any]:
"""Predict SOH for a single battery cycle.
Parameters
----------
features:
Dict of cycle features; keys from :data:`FEATURE_COLS_SCALAR`.
Missing keys are filled with 0.0.
model_name:
Registry model key (e.g. ``"best_ensemble"``, ``"random_forest"``,
``"tft"``). Defaults to :attr:`default_model`.
Returns
-------
dict
``soh_pct``, ``degradation_state``, ``rul_cycles``,
``confidence_lower``, ``confidence_upper``,
``model_used``, ``model_version``.
"""
name = model_name or self.default_model
if name is None:
raise ValueError("No models loaded in registry")
x = self._build_x(features)
# ── Dispatch by model type ──────────────────────────────────────
if name == "best_ensemble":
soh, label = self._predict_ensemble(x)
elif name in self.models:
model = self.models[name]
family = self.model_meta.get(name, {}).get("family", "classical")
if family == "deep_pytorch":
try:
import torch
with torch.no_grad():
# Build scaled (1, seq_len, F) sequence tensor
t = self._build_sequence_tensor(x).to(self.device)
out = model(t)
# VAE-LSTM returns a dict; all others return a tensor
if isinstance(out, dict):
out = out["health_pred"]
soh = float(out.cpu().numpy().ravel()[0])
except Exception as exc:
log.error("PyTorch inference error for '%s': %s", name, exc)
raise
elif family == "deep_keras":
try:
# Build scaled (1, seq_len, F) numpy array for Keras
seq_np = self._build_sequence_array(x) # (1, 32, F)
out = model.predict(seq_np, verbose=0)
# Physics-Informed model returns a dict with multiple heads
if isinstance(out, dict):
out = out.get("soh_ml", next(iter(out.values())))
soh = float(np.asarray(out).ravel()[0])
except Exception as exc:
log.error("Keras inference error for '%s': %s", name, exc)
raise
elif name in self._LINEAR_FAMILIES:
# Ridge/Lasso/ElasticNet/SVR/KNN need StandardScaler
x_lin = self._scale_for_linear(x)
soh = float(model.predict(x_lin)[0])
else:
# RF/XGB/LGB β€” scale-invariant; use per-model input format
xi = self._x_for_model(model, x)
soh = float(model.predict(xi)[0])
label = name
else:
fallback = self.default_model
if fallback and fallback != name and fallback in self.models:
log.warning("Model '%s' not loaded β€” falling back to '%s'", name, fallback)
return self.predict(features, fallback)
raise ValueError(
f"Model '{name}' is not available. "
f"Loaded: {list(self.models.keys())}"
)
soh = float(np.clip(soh, 0.0, 100.0))
# ── RUL estimate ────────────────────────────────────────────────
# Data-driven estimate: linear degradation from current SOH to 70%
# (EOL threshold), calibrated to NASA dataset's ~0.2-0.4 %/cycle rate.
EOL_THRESHOLD = 70.0
if soh > EOL_THRESHOLD:
# Degradation rate: use delta_capacity as a proxy (Ah/cycle)
# NASA nominal: ~2.0 Ah, so %/cycle = delta_cap / 2.0 * 100
cap_loss_per_cycle_pct = abs(features.get("delta_capacity", -0.005)) / 2.0 * 100
# Clamp to realistic range: 0.05 – 2.0 %/cycle
rate = max(0.05, min(cap_loss_per_cycle_pct, 2.0))
rul = (soh - EOL_THRESHOLD) / rate
else:
rul = 0.0
version = self.model_meta.get(name, MODEL_CATALOG.get(name, {})).get("version", "?")
return {
"soh_pct": round(soh, 2),
"degradation_state": classify_degradation(soh),
"rul_cycles": round(rul, 1),
"confidence_lower": round(soh - 2.0, 2),
"confidence_upper": round(soh + 2.0, 2),
"model_used": label,
"model_version": version,
}
def predict_batch(
self,
battery_id: str,
cycles: list[dict[str, float]],
model_name: str | None = None,
) -> list[dict[str, Any]]:
"""Predict SOH for multiple cycles of the same battery."""
return [
{**self.predict(c, model_name),
"battery_id": battery_id,
"cycle_number": c.get("cycle_number", i + 1)}
for i, c in enumerate(cycles)
]
def predict_array(
self,
X: np.ndarray,
model_name: str | None = None,
) -> tuple[np.ndarray, str]:
"""Vectorized batch SOH prediction on an (N, F) feature matrix.
Performs a **single** ``model.predict()`` call for the whole array,
giving O(1) Python overhead regardless of how many rows N is.
Used by the simulation endpoint to avoid per-step loop overhead.
Parameters
----------
X:
Shape ``(N, len(FEATURE_COLS_SCALAR))`` β€” rows are ordered by
``FEATURE_COLS_SCALAR``, no scaling applied yet.
model_name:
Model key. Defaults to :attr:`default_model`.
Returns
-------
tuple[np.ndarray, str]
``(soh_array, model_label)`` β€” ``soh_array`` has shape ``(N,)``,
values clipped to ``[0, 100]``.
Notes
-----
Deep sequence models (PyTorch / Keras) are not batchable here because
they require multi-timestep tensors. Callers that request a deep model
will get a ``ValueError``; the simulate endpoint falls back to physics.
"""
name = model_name or self.default_model
if name is None:
raise ValueError("No models loaded in registry")
if name == "best_ensemble":
components = self.model_meta.get("best_ensemble", {}).get(
"components", list(_ENSEMBLE_WEIGHTS.keys())
)
total_w: float = 0.0
weighted_sum: np.ndarray | None = None
used: list[str] = []
for cname in components:
if cname not in self.models:
continue
w = _ENSEMBLE_WEIGHTS.get(cname, 1.0)
xi = self._x_for_model(self.models[cname], X)
preds = np.asarray(self.models[cname].predict(xi), dtype=float)
weighted_sum = preds * w if weighted_sum is None else weighted_sum + preds * w
total_w += w
used.append(cname)
if total_w == 0 or weighted_sum is None:
raise ValueError("No BestEnsemble components available")
return np.clip(weighted_sum / total_w, 0.0, 100.0), f"best_ensemble({', '.join(used)})"
elif name in self.models:
model = self.models[name]
family = self.model_meta.get(name, {}).get("family", "classical")
if family in ("deep_pytorch", "deep_keras"):
raise ValueError(
f"Model '{name}' is a deep sequence model and cannot be "
"batch-predicted. Use predict() per sample instead."
)
elif name in self._LINEAR_FAMILIES:
xi = self._scale_for_linear(X)
else:
xi = self._x_for_model(model, X)
return np.clip(np.asarray(model.predict(xi), dtype=float), 0.0, 100.0), name
else:
fallback = self.default_model
if fallback and fallback != name and fallback in self.models:
log.warning("predict_array: '%s' not loaded β€” falling back to '%s'", name, fallback)
return self.predict_array(X, fallback)
raise ValueError(f"Model '{name}' is not available. Loaded: {list(self.models.keys())}")
# ── Info helpers ──────────────────────────────────────────────────────
@property
def model_count(self) -> int:
"""Total number of registered model entries."""
return len(set(list(self.models.keys()) + list(self.model_meta.keys())))
def list_models(self) -> list[dict[str, Any]]:
"""Return full model listing with versioning, metrics, and load status."""
all_metrics = self.get_metrics()
out: list[dict[str, Any]] = []
for name in MODEL_CATALOG:
catalog = MODEL_CATALOG[name]
meta = self.model_meta.get(name, {})
out.append({
"name": name,
"version": catalog.get("version", "?"),
"display_name": catalog.get("display_name", name),
"family": catalog.get("family", "unknown"),
"algorithm": catalog.get("algorithm", ""),
"target": catalog.get("target", "soh"),
"r2": catalog.get("r2"),
"metrics": all_metrics.get(name, {}),
"is_default": name == self.default_model,
"loaded": name in self.models,
"load_error": meta.get("load_error"),
})
return out
# ── Singletons ───────────────────────────────────────────────────────────────
registry_v1 = ModelRegistry(version="v1")
registry_v2 = ModelRegistry(version="v2")
# Default registry β€” v2 (latest models, bug fixes)
registry = registry_v2