File size: 5,532 Bytes
7690851 de597ec 7690851 de597ec 7690851 de597ec 7690851 de597ec aeaf551 de597ec aeaf551 de597ec 7690851 aeaf551 7690851 aeaf551 7690851 aeaf551 7690851 aeaf551 7690851 aeaf551 de597ec aeaf551 7690851 aeaf551 de597ec aeaf551 7690851 de597ec 7690851 aeaf551 7690851 aeaf551 1457ba8 de597ec aeaf551 de597ec aeaf551 1457ba8 de597ec aeaf551 1457ba8 aeaf551 de597ec |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
import json
import os
import joblib
import torch
import numpy as np
import pandas as pd
import logging
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from safetensors.torch import load_file
from ml.features import build_features
from ml.lstm_model import LSTMAutoencoder
from src.config import MLConfig
logger = logging.getLogger(__name__)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
ARTIFACTS_DIR = os.path.join(BASE_DIR, "artifacts")
class MLEngine:
def __init__(self):
logger.info("Initializing ML Engine...")
self._load_ml_config()
self._load_scaler()
self._load_isolation_forest()
self._load_xgboost_models()
self._load_lstm_model()
logger.info("ML Engine initialized successfully")
def _load_ml_config(self):
"""Load ML configuration from config."""
config = MLConfig.load()
self.feature_cols = config["feature_cols"]
self.window = config["window"]
self.seq_len = config["seq_len"]
self.design_life_days = config["design_life_days"]
def _load_scaler(self):
"""Load and reconstruct StandardScaler from JSON."""
with open(os.path.join(ARTIFACTS_DIR, "scaler.json"), "r") as f:
params = json.load(f)
self.scaler = StandardScaler()
self.scaler.mean_ = np.array(params["mean"])
self.scaler.scale_ = np.array(params["scale"])
self.scaler.var_ = self.scaler.scale_ ** 2
self.scaler.n_features_in_ = len(self.scaler.mean_)
def _load_isolation_forest(self):
"""Load and retrain IsolationForest using saved training data."""
self.iso = IsolationForest(
n_estimators=200,
contamination=0.05,
random_state=42
)
train_data = pd.read_json(os.path.join(ARTIFACTS_DIR, "training_data.json"))
self.iso.fit(train_data[self.feature_cols])
def _load_xgboost_models(self):
"""Load XGBoost models from JSON artifacts."""
import xgboost as xgb
self.ttf_model = xgb.XGBRegressor()
self.ttf_model.load_model(os.path.join(ARTIFACTS_DIR, "xgb_ttf.json"))
self.fail_model = xgb.XGBClassifier()
self.fail_model.load_model(os.path.join(ARTIFACTS_DIR, "xgb_fail.json"))
def _load_lstm_model(self):
"""Load LSTM autoencoder from safetensors."""
self.lstm = LSTMAutoencoder(
input_dim=len(self.feature_cols),
hidden_dim=32
)
state_dict = load_file(os.path.join(ARTIFACTS_DIR, "lstm_autoencoder.safetensors"))
self.lstm.load_state_dict(state_dict)
self.lstm.eval()
def _compute_anomalies(self, df_scaled: pd.DataFrame) -> tuple:
"""Compute anomaly scores from LSTM and IsolationForest.
Returns: (anomaly_lstm, health) tuple
"""
df_scaled["anomaly_iforest"] = -self.iso.decision_function(df_scaled)
X = df_scaled[self.feature_cols].values
X_seq = np.array([X[-self.seq_len:]])
with torch.no_grad():
recon = self.lstm(torch.tensor(X_seq, dtype=torch.float32))
anomaly_lstm = float(((recon - torch.tensor(X_seq)) ** 2).mean())
anomaly_norm = min(anomaly_lstm / 1e6, 1.0)
health = max(0.0, 1.0 - anomaly_norm)
return anomaly_lstm, health
def _make_predictions(self, df_scaled: pd.DataFrame, anomaly_lstm: float, health: float) -> dict:
"""Make TTF and failure probability predictions.
Returns: Dictionary with ttf, failure_prob, and rul predictions
"""
latest_features = df_scaled[self.feature_cols].iloc[[-1]].copy()
latest_features["anomaly_lstm"] = anomaly_lstm
latest_features["health_index"] = health
expected_ttf_days = float(
self.ttf_model.predict(latest_features, validate_features=False)[0]
)
failure_probability = float(
self.fail_model.predict_proba(latest_features, validate_features=False)[0][1]
)
expected_rul_days = float(health * self.design_life_days)
confidence = round(0.5 * abs(failure_probability - 0.5) * 2 + 0.5 * health, 2)
return {
"ttf_days": expected_ttf_days,
"failure_prob": failure_probability,
"rul_days": expected_rul_days,
"confidence": confidence
}
def predict_from_raw(self, raw_df: pd.DataFrame, asset_id: str = None):
logger.info("ML analysis start")
df = build_features(raw_df, self.window)
df = df[self.feature_cols].dropna()
if len(df) < self.seq_len:
raise ValueError("Not enough data for LSTM sequence")
df_scaled = pd.DataFrame(
self.scaler.transform(df), columns=self.feature_cols, index=df.index
)
anomaly_lstm, health = self._compute_anomalies(df_scaled)
predictions = self._make_predictions(df_scaled, anomaly_lstm, health)
# Use provided asset_id or generate default
if asset_id is None:
import uuid
asset_id = f"Solar_Panel_{str(uuid.uuid4())[:8]}"
logger.info("ML analysis end")
return {
"asset_id": asset_id,
"failure_probability": round(predictions["failure_prob"], 2),
"expected_ttf_days": round(predictions["ttf_days"], 1),
"expected_rul_days": round(predictions["rul_days"], 1),
"confidence": predictions["confidence"]
} |