Sahil Garg commited on
Commit
aeaf551
·
1 Parent(s): 921c0d5

Refactor: Extract helper functions

Browse files
Files changed (3) hide show
  1. agent/agent.py +18 -6
  2. app.py +53 -40
  3. ml/inference.py +67 -25
agent/agent.py CHANGED
@@ -1,4 +1,5 @@
1
  import json
 
2
  from langchain_google_genai import GoogleGenerativeAI
3
 
4
  class MaintenanceAgent:
@@ -9,8 +10,9 @@ class MaintenanceAgent:
9
  google_api_key=api_key
10
  )
11
 
12
- def run(self, phase2_output: dict) -> dict:
13
- prompt = f"""
 
14
  You are a maintenance decision AI.
15
  You must reason ONLY from the provided JSON.
16
  Do NOT invent data.
@@ -29,12 +31,22 @@ OUTPUT FORMAT:
29
  }}
30
  """
31
 
32
- response = self.llm.invoke(prompt)
 
33
  try:
34
  return json.loads(response)
35
  except json.JSONDecodeError:
36
- import re
37
- match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL) or re.search(r'\{.*\}', response, re.DOTALL)
 
 
 
 
38
  if match:
39
- return json.loads(match.group(1) if '```' in response else match.group(0))
40
  raise ValueError(f"Could not parse LLM response: {response[:200]}")
 
 
 
 
 
 
1
  import json
2
+ import re
3
  from langchain_google_genai import GoogleGenerativeAI
4
 
5
  class MaintenanceAgent:
 
10
  google_api_key=api_key
11
  )
12
 
13
+ def _build_prompt(self, phase2_output: dict) -> str:
14
+ """Build the maintenance analysis prompt."""
15
+ return f"""
16
  You are a maintenance decision AI.
17
  You must reason ONLY from the provided JSON.
18
  Do NOT invent data.
 
31
  }}
32
  """
33
 
34
+ def _parse_response(self, response: str) -> dict:
35
+ """Parse LLM response, handling various JSON formats."""
36
  try:
37
  return json.loads(response)
38
  except json.JSONDecodeError:
39
+ # Try extracting JSON from markdown code blocks
40
+ match = re.search(r'```json\s*(.*?)\s*```', response, re.DOTALL)
41
+ if match:
42
+ return json.loads(match.group(1))
43
+ # Try extracting raw JSON object
44
+ match = re.search(r'\{.*\}', response, re.DOTALL)
45
  if match:
46
+ return json.loads(match.group(0))
47
  raise ValueError(f"Could not parse LLM response: {response[:200]}")
48
+
49
+ def run(self, phase2_output: dict) -> dict:
50
+ prompt = self._build_prompt(phase2_output)
51
+ response = self.llm.invoke(prompt)
52
+ return self._parse_response(response)
app.py CHANGED
@@ -9,12 +9,55 @@ from agent.agent import MaintenanceAgent
9
 
10
  load_dotenv()
11
  logging.basicConfig(level=logging.INFO)
 
12
 
13
  app = FastAPI(title="Solar PV Predictive Maintenance API", version="1.0.0")
14
 
15
  # Load ML models once on startup for production performance
16
  ml_engine = MLEngine()
17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
18
  class SensorData(BaseModel):
19
  vdc1: list[float]
20
  idc1: list[float]
@@ -27,54 +70,24 @@ class AnalysisResponse(BaseModel):
27
  @app.post("/analyze", response_model=AnalysisResponse)
28
  async def analyze_sensor_data(data: SensorData):
29
  try:
30
- logging.info(f"Processing request with {len(data.vdc1)} voltage and {len(data.idc1)} current data points")
31
-
32
- if len(data.vdc1) != len(data.idc1):
33
- raise HTTPException(status_code=400, detail="Voltage and current lists must have the same length")
34
-
35
- if len(data.vdc1) < 3:
36
- raise HTTPException(status_code=400, detail="Need at least 3 data points")
37
 
38
- # Repeat to make at least 100 points if needed
39
- raw_df = pd.DataFrame({
40
- "vdc1": (data.vdc1 * (100 // len(data.vdc1) + 1))[:100],
41
- "idc1": (data.idc1 * (100 // len(data.idc1) + 1))[:100]
42
- })
43
 
44
- # ML Inference
45
- phase2_output = ml_engine.predict_from_raw(raw_df)
 
46
 
47
- # Agent Reasoning (if API key provided)
48
- if data.api_key:
49
- try:
50
- request_agent = MaintenanceAgent(
51
- api_key=data.api_key,
52
- model_name="gemini-2.5-flash-lite",
53
- temperature=0.0
54
- )
55
- agent_output = request_agent.run(phase2_output)
56
- except Exception as e:
57
- logging.warning(f"Agent initialization failed: {e}")
58
- agent_output = {
59
- "diagnosis": "Agent initialization failed",
60
- "urgency": "Unknown",
61
- "recommended_action": "Check your Google API key",
62
- "justification": [f"Error: {str(e)}"]
63
- }
64
- else:
65
- agent_output = {
66
- "diagnosis": "No API key provided - LLM features disabled",
67
- "urgency": "Unknown",
68
- "recommended_action": "Provide Google API key in request for AI diagnosis",
69
- "justification": ["Google API key required for maintenance reasoning"]
70
- }
71
 
72
- return AnalysisResponse(ml_output=phase2_output, agent_output=agent_output)
73
 
74
  except HTTPException:
75
  raise
76
  except Exception as e:
77
- logging.error(f"Error processing request: {e}")
78
  raise HTTPException(status_code=500, detail=str(e))
79
 
80
  @app.get("/")
 
9
 
10
  load_dotenv()
11
  logging.basicConfig(level=logging.INFO)
12
+ logger = logging.getLogger(__name__)
13
 
14
  app = FastAPI(title="Solar PV Predictive Maintenance API", version="1.0.0")
15
 
16
  # Load ML models once on startup for production performance
17
  ml_engine = MLEngine()
18
 
19
+ # ============ Helper Functions ============
20
+
21
+ def validate_sensor_data(vdc1: list, idc1: list) -> None:
22
+ """Validate sensor data consistency. Raises HTTPException on error."""
23
+ if len(vdc1) != len(idc1):
24
+ raise HTTPException(status_code=400, detail="Voltage and current lists must have the same length")
25
+ if len(vdc1) < 3:
26
+ raise HTTPException(status_code=400, detail="Need at least 3 data points")
27
+
28
+ def prepare_dataframe(vdc1: list, idc1: list) -> pd.DataFrame:
29
+ """Prepare sensor data for ML inference by padding to 100 points."""
30
+ return pd.DataFrame({
31
+ "vdc1": (vdc1 * (100 // len(vdc1) + 1))[:100],
32
+ "idc1": (idc1 * (100 // len(idc1) + 1))[:100]
33
+ })
34
+
35
+ def get_agent_output(api_key: str, ml_output: dict) -> dict:
36
+ """Get agent analysis if API key is provided, otherwise return no-key message."""
37
+ if not api_key:
38
+ return {
39
+ "diagnosis": "No API key provided - LLM features disabled",
40
+ "urgency": "Unknown",
41
+ "recommended_action": "Provide Google API key in request for AI diagnosis",
42
+ "justification": ["Google API key required for maintenance reasoning"]
43
+ }
44
+
45
+ try:
46
+ agent = MaintenanceAgent(
47
+ api_key=api_key,
48
+ model_name="gemini-2.5-flash-lite",
49
+ temperature=0.0
50
+ )
51
+ return agent.run(ml_output)
52
+ except Exception as e:
53
+ logger.warning(f"Agent initialization failed: {e}")
54
+ return {
55
+ "diagnosis": "Agent initialization failed",
56
+ "urgency": "Unknown",
57
+ "recommended_action": "Check your Google API key",
58
+ "justification": [f"Error: {str(e)}"]
59
+ }
60
+
61
  class SensorData(BaseModel):
62
  vdc1: list[float]
63
  idc1: list[float]
 
70
  @app.post("/analyze", response_model=AnalysisResponse)
71
  async def analyze_sensor_data(data: SensorData):
72
  try:
73
+ logger.info(f"Processing request with {len(data.vdc1)} voltage and {len(data.idc1)} current data points")
 
 
 
 
 
 
74
 
75
+ # Validate input
76
+ validate_sensor_data(data.vdc1, data.idc1)
 
 
 
77
 
78
+ # Prepare data and run ML inference
79
+ raw_df = prepare_dataframe(data.vdc1, data.idc1)
80
+ ml_output = ml_engine.predict_from_raw(raw_df)
81
 
82
+ # Get agent analysis
83
+ agent_output = get_agent_output(data.api_key, ml_output)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
 
85
+ return AnalysisResponse(ml_output=ml_output, agent_output=agent_output)
86
 
87
  except HTTPException:
88
  raise
89
  except Exception as e:
90
+ logger.error(f"Error processing request: {e}")
91
  raise HTTPException(status_code=500, detail=str(e))
92
 
93
  @app.get("/")
ml/inference.py CHANGED
@@ -16,6 +16,16 @@ ARTIFACTS_DIR = os.path.join(BASE_DIR, "artifacts")
16
 
17
  class MLEngine:
18
  def __init__(self):
 
 
 
 
 
 
 
 
 
 
19
  with open(os.path.join(ARTIFACTS_DIR, "ml_config.json")) as f:
20
  self.config = json.load(f)
21
 
@@ -24,33 +34,39 @@ class MLEngine:
24
  self.seq_len = self.config["seq_len"]
25
  self.design_life_days = self.config["design_life_days"]
26
 
27
- # Load scaler from JSON
 
28
  with open(os.path.join(ARTIFACTS_DIR, "scaler.json"), "r") as f:
29
  params = json.load(f)
 
30
  self.scaler = StandardScaler()
31
  self.scaler.mean_ = np.array(params["mean"])
32
  self.scaler.scale_ = np.array(params["scale"])
33
  self.scaler.var_ = self.scaler.scale_ ** 2
34
  self.scaler.n_features_in_ = len(self.scaler.mean_)
35
 
36
- # Retrain IsolationForest at startup using saved training data
 
37
  self.iso = IsolationForest(
38
  n_estimators=200,
39
  contamination=0.05,
40
  random_state=42
41
  )
42
- # Load training data (scaled features from Colab) and fit
43
  train_data = pd.read_json(os.path.join(ARTIFACTS_DIR, "training_data.json"))
44
  self.iso.fit(train_data[self.feature_cols])
45
 
46
- # Load XGBoost from JSON
 
47
  import xgboost as xgb
 
48
  self.ttf_model = xgb.XGBRegressor()
49
  self.ttf_model.load_model(os.path.join(ARTIFACTS_DIR, "xgb_ttf.json"))
 
50
  self.fail_model = xgb.XGBClassifier()
51
  self.fail_model.load_model(os.path.join(ARTIFACTS_DIR, "xgb_fail.json"))
52
 
53
- # Load LSTM from safetensors
 
54
  self.lstm = LSTMAutoencoder(
55
  input_dim=len(self.feature_cols),
56
  hidden_dim=32
@@ -59,21 +75,12 @@ class MLEngine:
59
  self.lstm.load_state_dict(state_dict)
60
  self.lstm.eval()
61
 
62
- def predict_from_raw(self, raw_df: pd.DataFrame):
63
- # --- Feature engineering ---
64
- df = build_features(raw_df, self.window)
65
- df = df[self.feature_cols].dropna()
66
-
67
- if len(df) < self.seq_len:
68
- raise ValueError("Not enough data for LSTM sequence")
69
-
70
- # --- Scaling ---
71
- df_scaled = pd.DataFrame(
72
- self.scaler.transform(df),
73
- columns=self.feature_cols,
74
- index=df.index
75
- )
76
-
77
  # --- Isolation Forest anomaly ---
78
  df_scaled["anomaly_iforest"] = -self.iso.decision_function(df_scaled)
79
 
@@ -91,7 +98,14 @@ class MLEngine:
91
  anomaly_norm = min(anomaly_lstm / 1e6, 1.0)
92
  health = max(0.0, 1.0 - anomaly_norm)
93
 
94
- # --- ML predictions ---
 
 
 
 
 
 
 
95
  latest_features = df_scaled[self.feature_cols].iloc[[-1]].copy()
96
  latest_features["anomaly_lstm"] = anomaly_lstm
97
  latest_features["health_index"] = health
@@ -115,9 +129,37 @@ class MLEngine:
115
  )
116
 
117
  return {
118
- "asset_id": "PV_INVERTER_001",
119
- "failure_probability": round(failure_probability, 2),
120
- "expected_ttf_days": round(expected_ttf_days, 1),
121
- "expected_rul_days": round(expected_rul_days, 1),
122
  "confidence": confidence
123
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
  class MLEngine:
18
  def __init__(self):
19
+ # Load configuration
20
+ self._load_config()
21
+ # Load all models
22
+ self._load_scaler()
23
+ self._load_isolation_forest()
24
+ self._load_xgboost_models()
25
+ self._load_lstm_model()
26
+
27
+ def _load_config(self):
28
+ """Load ML configuration from JSON."""
29
  with open(os.path.join(ARTIFACTS_DIR, "ml_config.json")) as f:
30
  self.config = json.load(f)
31
 
 
34
  self.seq_len = self.config["seq_len"]
35
  self.design_life_days = self.config["design_life_days"]
36
 
37
+ def _load_scaler(self):
38
+ """Load and reconstruct StandardScaler from JSON."""
39
  with open(os.path.join(ARTIFACTS_DIR, "scaler.json"), "r") as f:
40
  params = json.load(f)
41
+
42
  self.scaler = StandardScaler()
43
  self.scaler.mean_ = np.array(params["mean"])
44
  self.scaler.scale_ = np.array(params["scale"])
45
  self.scaler.var_ = self.scaler.scale_ ** 2
46
  self.scaler.n_features_in_ = len(self.scaler.mean_)
47
 
48
+ def _load_isolation_forest(self):
49
+ """Load and retrain IsolationForest using saved training data."""
50
  self.iso = IsolationForest(
51
  n_estimators=200,
52
  contamination=0.05,
53
  random_state=42
54
  )
 
55
  train_data = pd.read_json(os.path.join(ARTIFACTS_DIR, "training_data.json"))
56
  self.iso.fit(train_data[self.feature_cols])
57
 
58
+ def _load_xgboost_models(self):
59
+ """Load XGBoost models from JSON artifacts."""
60
  import xgboost as xgb
61
+
62
  self.ttf_model = xgb.XGBRegressor()
63
  self.ttf_model.load_model(os.path.join(ARTIFACTS_DIR, "xgb_ttf.json"))
64
+
65
  self.fail_model = xgb.XGBClassifier()
66
  self.fail_model.load_model(os.path.join(ARTIFACTS_DIR, "xgb_fail.json"))
67
 
68
+ def _load_lstm_model(self):
69
+ """Load LSTM autoencoder from safetensors."""
70
  self.lstm = LSTMAutoencoder(
71
  input_dim=len(self.feature_cols),
72
  hidden_dim=32
 
75
  self.lstm.load_state_dict(state_dict)
76
  self.lstm.eval()
77
 
78
+ def _compute_anomalies(self, df_scaled: pd.DataFrame) -> tuple:
79
+ """Compute anomaly scores from LSTM and IsolationForest.
80
+
81
+ Returns:
82
+ (anomaly_lstm, health) tuple
83
+ """
 
 
 
 
 
 
 
 
 
84
  # --- Isolation Forest anomaly ---
85
  df_scaled["anomaly_iforest"] = -self.iso.decision_function(df_scaled)
86
 
 
98
  anomaly_norm = min(anomaly_lstm / 1e6, 1.0)
99
  health = max(0.0, 1.0 - anomaly_norm)
100
 
101
+ return anomaly_lstm, health
102
+
103
+ def _make_predictions(self, df_scaled: pd.DataFrame, anomaly_lstm: float, health: float) -> dict:
104
+ """Make TTF and failure probability predictions.
105
+
106
+ Returns:
107
+ Dictionary with ttf, failure_prob, and rul predictions
108
+ """
109
  latest_features = df_scaled[self.feature_cols].iloc[[-1]].copy()
110
  latest_features["anomaly_lstm"] = anomaly_lstm
111
  latest_features["health_index"] = health
 
129
  )
130
 
131
  return {
132
+ "ttf_days": expected_ttf_days,
133
+ "failure_prob": failure_probability,
134
+ "rul_days": expected_rul_days,
 
135
  "confidence": confidence
136
  }
137
+
138
+ def predict_from_raw(self, raw_df: pd.DataFrame):
139
+ # --- Feature engineering ---
140
+ df = build_features(raw_df, self.window)
141
+ df = df[self.feature_cols].dropna()
142
+
143
+ if len(df) < self.seq_len:
144
+ raise ValueError("Not enough data for LSTM sequence")
145
+
146
+ # --- Scaling ---
147
+ df_scaled = pd.DataFrame(
148
+ self.scaler.transform(df),
149
+ columns=self.feature_cols,
150
+ index=df.index
151
+ )
152
+
153
+ # --- Compute anomalies ---
154
+ anomaly_lstm, health = self._compute_anomalies(df_scaled)
155
+
156
+ # --- Make predictions ---
157
+ predictions = self._make_predictions(df_scaled, anomaly_lstm, health)
158
+
159
+ return {
160
+ "asset_id": "PV_INVERTER_001",
161
+ "failure_probability": round(predictions["failure_prob"], 2),
162
+ "expected_ttf_days": round(predictions["ttf_days"], 1),
163
+ "expected_rul_days": round(predictions["rul_days"], 1),
164
+ "confidence": predictions["confidence"]
165
+ }