Sahil Garg
flexible assest id handling
1457ba8
import json
import os
import joblib
import torch
import numpy as np
import pandas as pd
import logging
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from safetensors.torch import load_file
from ml.features import build_features
from ml.lstm_model import LSTMAutoencoder
from src.config import MLConfig
logger = logging.getLogger(__name__)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
ARTIFACTS_DIR = os.path.join(BASE_DIR, "artifacts")
class MLEngine:
def __init__(self):
logger.info("Initializing ML Engine...")
self._load_ml_config()
self._load_scaler()
self._load_isolation_forest()
self._load_xgboost_models()
self._load_lstm_model()
logger.info("ML Engine initialized successfully")
def _load_ml_config(self):
"""Load ML configuration from config."""
config = MLConfig.load()
self.feature_cols = config["feature_cols"]
self.window = config["window"]
self.seq_len = config["seq_len"]
self.design_life_days = config["design_life_days"]
def _load_scaler(self):
"""Load and reconstruct StandardScaler from JSON."""
with open(os.path.join(ARTIFACTS_DIR, "scaler.json"), "r") as f:
params = json.load(f)
self.scaler = StandardScaler()
self.scaler.mean_ = np.array(params["mean"])
self.scaler.scale_ = np.array(params["scale"])
self.scaler.var_ = self.scaler.scale_ ** 2
self.scaler.n_features_in_ = len(self.scaler.mean_)
def _load_isolation_forest(self):
"""Load and retrain IsolationForest using saved training data."""
self.iso = IsolationForest(
n_estimators=200,
contamination=0.05,
random_state=42
)
train_data = pd.read_json(os.path.join(ARTIFACTS_DIR, "training_data.json"))
self.iso.fit(train_data[self.feature_cols])
def _load_xgboost_models(self):
"""Load XGBoost models from JSON artifacts."""
import xgboost as xgb
self.ttf_model = xgb.XGBRegressor()
self.ttf_model.load_model(os.path.join(ARTIFACTS_DIR, "xgb_ttf.json"))
self.fail_model = xgb.XGBClassifier()
self.fail_model.load_model(os.path.join(ARTIFACTS_DIR, "xgb_fail.json"))
def _load_lstm_model(self):
"""Load LSTM autoencoder from safetensors."""
self.lstm = LSTMAutoencoder(
input_dim=len(self.feature_cols),
hidden_dim=32
)
state_dict = load_file(os.path.join(ARTIFACTS_DIR, "lstm_autoencoder.safetensors"))
self.lstm.load_state_dict(state_dict)
self.lstm.eval()
def _compute_anomalies(self, df_scaled: pd.DataFrame) -> tuple:
"""Compute anomaly scores from LSTM and IsolationForest.
Returns: (anomaly_lstm, health) tuple
"""
df_scaled["anomaly_iforest"] = -self.iso.decision_function(df_scaled)
X = df_scaled[self.feature_cols].values
X_seq = np.array([X[-self.seq_len:]])
with torch.no_grad():
recon = self.lstm(torch.tensor(X_seq, dtype=torch.float32))
anomaly_lstm = float(((recon - torch.tensor(X_seq)) ** 2).mean())
anomaly_norm = min(anomaly_lstm / 1e6, 1.0)
health = max(0.0, 1.0 - anomaly_norm)
return anomaly_lstm, health
def _make_predictions(self, df_scaled: pd.DataFrame, anomaly_lstm: float, health: float) -> dict:
"""Make TTF and failure probability predictions.
Returns: Dictionary with ttf, failure_prob, and rul predictions
"""
latest_features = df_scaled[self.feature_cols].iloc[[-1]].copy()
latest_features["anomaly_lstm"] = anomaly_lstm
latest_features["health_index"] = health
expected_ttf_days = float(
self.ttf_model.predict(latest_features, validate_features=False)[0]
)
failure_probability = float(
self.fail_model.predict_proba(latest_features, validate_features=False)[0][1]
)
expected_rul_days = float(health * self.design_life_days)
confidence = round(0.5 * abs(failure_probability - 0.5) * 2 + 0.5 * health, 2)
return {
"ttf_days": expected_ttf_days,
"failure_prob": failure_probability,
"rul_days": expected_rul_days,
"confidence": confidence
}
def predict_from_raw(self, raw_df: pd.DataFrame, asset_id: str = None):
logger.info("ML analysis start")
df = build_features(raw_df, self.window)
df = df[self.feature_cols].dropna()
if len(df) < self.seq_len:
raise ValueError("Not enough data for LSTM sequence")
df_scaled = pd.DataFrame(
self.scaler.transform(df), columns=self.feature_cols, index=df.index
)
anomaly_lstm, health = self._compute_anomalies(df_scaled)
predictions = self._make_predictions(df_scaled, anomaly_lstm, health)
# Use provided asset_id or generate default
if asset_id is None:
import uuid
asset_id = f"Solar_Panel_{str(uuid.uuid4())[:8]}"
logger.info("ML analysis end")
return {
"asset_id": asset_id,
"failure_probability": round(predictions["failure_prob"], 2),
"expected_ttf_days": round(predictions["ttf_days"], 1),
"expected_rul_days": round(predictions["rul_days"], 1),
"confidence": predictions["confidence"]
}