|
|
""" |
|
|
FastAPI Predictive Maintenance API with Rule-Based Diagnostics |
|
|
Senior Backend Engineer & Reliability Engineer Implementation |
|
|
|
|
|
Features: |
|
|
- ML-based failure prediction using XGBoost |
|
|
- Physics-based diagnostic engine |
|
|
- Single prediction and batch processing endpoints |
|
|
- Production-ready with proper error handling |
|
|
""" |
|
|
|
|
|
from fastapi import FastAPI, HTTPException, UploadFile, File |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import StreamingResponse |
|
|
from pydantic import BaseModel, Field, validator |
|
|
from typing import Optional, Dict, Any, List |
|
|
import joblib |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
import os |
|
|
import io |
|
|
import logging |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Predictive Maintenance API", |
|
|
description="ML + Rule-Based Diagnostics for Industrial Equipment", |
|
|
version="1.0.0" |
|
|
) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
|
|
|
MODEL_PIPELINE = None |
|
|
HF_MODEL_REPO = os.getenv("HF_MODEL_REPO", "deropxyz/AC02-ML") |
|
|
PRIMARY_MODEL_FILENAME = os.getenv("HF_PRIMARY_MODEL", "model_pipeline_improved.joblib") |
|
|
FALLBACK_MODEL_FILENAME = os.getenv("HF_FALLBACK_MODEL", "model_pipeline.joblib") |
|
|
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") |
|
|
|
|
|
|
|
|
def download_model_artifact(filename: str) -> Path: |
|
|
""" |
|
|
Download a model artifact from Hugging Face Hub and return the local cache path. |
|
|
""" |
|
|
if not filename: |
|
|
raise ValueError("Filename must be provided for model download") |
|
|
|
|
|
logger.info("Downloading model '%s' from repo '%s'...", filename, HF_MODEL_REPO) |
|
|
cache_path = hf_hub_download( |
|
|
repo_id=HF_MODEL_REPO, |
|
|
filename=filename, |
|
|
repo_type="model", |
|
|
token=HF_TOKEN, |
|
|
) |
|
|
return Path(cache_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PredictionRequest(BaseModel): |
|
|
"""Single prediction request schema matching dataset columns""" |
|
|
machine_id: Optional[str] = Field(None, description="Machine identifier (e.g., M_L_01)") |
|
|
air_temperature: float = Field(..., ge=250, le=350, description="Air temperature in Kelvin") |
|
|
process_temperature: float = Field(..., ge=250, le=400, description="Process temperature in Kelvin") |
|
|
rotational_speed: int = Field(..., ge=0, le=3000, description="Rotational speed in RPM") |
|
|
torque: float = Field(..., ge=0, le=100, description="Torque in Nm") |
|
|
tool_wear: int = Field(..., ge=0, le=300, description="Tool wear in minutes") |
|
|
type: str = Field(..., description="Machine type: L (Low), M (Medium), H (High)") |
|
|
|
|
|
@validator('type') |
|
|
def validate_type(cls, v): |
|
|
if v not in ['L', 'M', 'H']: |
|
|
raise ValueError('Type must be L, M, or H') |
|
|
return v |
|
|
|
|
|
class Config: |
|
|
schema_extra = { |
|
|
"example": { |
|
|
"machine_id": "M_L_01", |
|
|
"air_temperature": 298.1, |
|
|
"process_temperature": 308.6, |
|
|
"rotational_speed": 1551, |
|
|
"torque": 42.8, |
|
|
"tool_wear": 0, |
|
|
"type": "M" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
class DiagnosticResult(BaseModel): |
|
|
"""Diagnostic analysis result""" |
|
|
primary_cause: str |
|
|
sensor_alert: str |
|
|
recommended_action: str |
|
|
severity: str |
|
|
|
|
|
|
|
|
class AnomalyDetail(BaseModel): |
|
|
"""Anomaly detection detail""" |
|
|
parameter: str |
|
|
value: str |
|
|
normal_range: str |
|
|
status: str |
|
|
explanation: str |
|
|
|
|
|
|
|
|
class PredictionResponse(BaseModel): |
|
|
"""Single prediction response""" |
|
|
machine_id: Optional[str] |
|
|
timestamp: str |
|
|
prediction: str |
|
|
confidence: float |
|
|
diagnostics: DiagnosticResult |
|
|
features: Optional[Dict[str, Any]] = None |
|
|
anomalies: Optional[List[AnomalyDetail]] = None |
|
|
overall_health: Optional[str] = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
|
async def load_model(): |
|
|
"""Download and load model pipeline on startup""" |
|
|
global MODEL_PIPELINE |
|
|
|
|
|
last_error: Optional[Exception] = None |
|
|
for candidate in [PRIMARY_MODEL_FILENAME, FALLBACK_MODEL_FILENAME]: |
|
|
if not candidate: |
|
|
continue |
|
|
try: |
|
|
model_path = download_model_artifact(candidate) |
|
|
MODEL_PIPELINE = joblib.load(model_path) |
|
|
logger.info("✅ Model loaded from %s", candidate) |
|
|
break |
|
|
except Exception as err: |
|
|
last_error = err |
|
|
logger.error("Failed to load model '%s': %s", candidate, err) |
|
|
|
|
|
if MODEL_PIPELINE is None: |
|
|
raise RuntimeError(f"Unable to load any model artifact: {last_error}") from last_error |
|
|
|
|
|
|
|
|
required_keys = ['model', 'scaler', 'features'] |
|
|
for key in required_keys: |
|
|
if key not in MODEL_PIPELINE: |
|
|
raise ValueError(f"Model pipeline missing key: {key}") |
|
|
|
|
|
logger.info("Model type: %s", type(MODEL_PIPELINE['model']).__name__) |
|
|
logger.info("Features: %s", MODEL_PIPELINE['features']) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def engineer_features(data: Dict[str, Any]) -> pd.DataFrame: |
|
|
""" |
|
|
Apply physics-based feature engineering matching training pipeline |
|
|
|
|
|
Features: |
|
|
- Temperature_Diff: Thermal stress indicator |
|
|
- Power_W: Mechanical power (P = τω) |
|
|
- Type_Encoded: Categorical encoding |
|
|
- Tool_wear_hours: Tool wear converted to hours |
|
|
""" |
|
|
|
|
|
df = pd.DataFrame([{ |
|
|
'Air_temp_K': data['air_temperature'], |
|
|
'Process_temp_K': data['process_temperature'], |
|
|
'Speed_rpm': data['rotational_speed'], |
|
|
'Torque_Nm': data['torque'], |
|
|
'Tool_wear_min': data['tool_wear'] |
|
|
}]) |
|
|
|
|
|
|
|
|
df['Tool_wear_hours'] = df['Tool_wear_min'] / 60.0 |
|
|
df['Temperature_Diff'] = df['Process_temp_K'] - df['Air_temp_K'] |
|
|
df['Power_W'] = df['Torque_Nm'] * df['Speed_rpm'] * (2 * np.pi / 60) |
|
|
|
|
|
|
|
|
type_mapping = {'L': 0, 'M': 1, 'H': 2} |
|
|
df['Type_Encoded'] = type_mapping[data['type']] |
|
|
|
|
|
|
|
|
feature_cols = MODEL_PIPELINE['features'] |
|
|
return df[feature_cols] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_condition( |
|
|
data: Dict[str, Any], |
|
|
pred_class: int, |
|
|
pred_proba: float, |
|
|
engineered_features: pd.DataFrame |
|
|
) -> DiagnosticResult: |
|
|
""" |
|
|
Advanced diagnostic engine combining ML predictions with physics-based rules |
|
|
|
|
|
Priority Rules: |
|
|
1. Tool Wear Failure (CRITICAL) |
|
|
2. Power Overstrain (HIGH) |
|
|
3. Heat Dissipation Issue (MEDIUM) |
|
|
4. ML Anomaly Detection (VARIABLE) |
|
|
5. Healthy Operation (LOW) |
|
|
""" |
|
|
|
|
|
|
|
|
temp_diff = engineered_features['Temperature_Diff'].iloc[0] |
|
|
power_w = engineered_features['Power_W'].iloc[0] |
|
|
torque = data['torque'] |
|
|
tool_wear = data['tool_wear'] |
|
|
speed = data['rotational_speed'] |
|
|
air_temp = data['air_temperature'] |
|
|
process_temp = data['process_temperature'] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if tool_wear > 200: |
|
|
return DiagnosticResult( |
|
|
primary_cause="Tool End of Life", |
|
|
sensor_alert=f"Tool wear ({tool_wear} min) exceeds safety threshold (200 min)", |
|
|
recommended_action="IMMEDIATE ACTION: Replace cutting tool before catastrophic failure. " |
|
|
"Schedule downtime within 4 hours.", |
|
|
severity="CRITICAL" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if power_w > 9000 or torque > 60: |
|
|
cause_details = [] |
|
|
if power_w > 9000: |
|
|
cause_details.append(f"Power output ({power_w:.0f} W) exceeds design limit (9000 W)") |
|
|
if torque > 60: |
|
|
cause_details.append(f"Torque ({torque:.1f} Nm) exceeds safety limit (60 Nm)") |
|
|
|
|
|
return DiagnosticResult( |
|
|
primary_cause="Power Overstrain / Mechanical Overload", |
|
|
sensor_alert=" | ".join(cause_details), |
|
|
recommended_action="URGENT: Reduce operational load immediately. " |
|
|
"Inspect shaft alignment, bearing condition, and drive belt tension. " |
|
|
"Check for material jamming or obstruction.", |
|
|
severity="HIGH" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if temp_diff < 8.0 and speed > 1300: |
|
|
return DiagnosticResult( |
|
|
primary_cause="Inefficient Cooling System", |
|
|
sensor_alert=f"Temperature differential ({temp_diff:.1f} K) too low at high speed ({speed} RPM). " |
|
|
f"Expected ≥8.0 K for proper heat dissipation.", |
|
|
recommended_action="MAINTENANCE REQUIRED: Inspect cooling system within 24 hours. " |
|
|
"Check heat exchanger efficiency, coolant flow rate, and radiator fins. " |
|
|
"Verify coolant temperature and pressure.", |
|
|
severity="MEDIUM" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if temp_diff > 15.0: |
|
|
return DiagnosticResult( |
|
|
primary_cause="Excessive Thermal Stress", |
|
|
sensor_alert=f"Temperature differential ({temp_diff:.1f} K) exceeds normal range (8-12 K). " |
|
|
f"Process temp: {process_temp:.1f} K, Air temp: {air_temp:.1f} K", |
|
|
recommended_action="MONITOR CLOSELY: High thermal gradient detected. " |
|
|
"Check process parameters and reduce processing intensity if possible. " |
|
|
"Verify insulation integrity and ambient conditions.", |
|
|
severity="MEDIUM" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if speed > 2500: |
|
|
return DiagnosticResult( |
|
|
primary_cause="High-Speed Operation Risk", |
|
|
sensor_alert=f"Rotational speed ({speed} RPM) in critical range (>2500 RPM). " |
|
|
f"Vibration and wear accelerate exponentially.", |
|
|
recommended_action="PREVENTIVE ACTION: Monitor vibration levels closely. " |
|
|
"Perform balance check and bearing inspection. " |
|
|
"Consider reducing speed if not operationally critical.", |
|
|
severity="MEDIUM" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if pred_class == 1: |
|
|
|
|
|
confidence_level = "high" if pred_proba > 0.8 else "moderate" |
|
|
|
|
|
return DiagnosticResult( |
|
|
primary_cause="ML-Detected Anomaly Pattern", |
|
|
sensor_alert=f"Machine learning model detected failure risk with {confidence_level} confidence ({pred_proba:.1%}). " |
|
|
f"No specific rule violation identified.", |
|
|
recommended_action="DIAGNOSTIC SCAN REQUIRED: Perform comprehensive machine diagnostic. " |
|
|
"Check for: (1) Unusual vibration patterns, (2) Bearing wear, " |
|
|
"(3) Lubrication quality, (4) Electrical anomalies, (5) Sensor calibration. " |
|
|
"Review recent maintenance history.", |
|
|
severity="HIGH" if pred_proba > 0.8 else "MEDIUM" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
health_indicators = { |
|
|
'tool_wear_ok': tool_wear <= 150, |
|
|
'power_ok': power_w <= 8000, |
|
|
'temp_ok': 8.0 <= temp_diff <= 12.0, |
|
|
'speed_ok': speed <= 2000, |
|
|
'torque_ok': torque <= 50 |
|
|
} |
|
|
health_score = sum(health_indicators.values()) / len(health_indicators) * 100 |
|
|
|
|
|
return DiagnosticResult( |
|
|
primary_cause="Normal Operation", |
|
|
sensor_alert=f"All parameters within optimal range. Health score: {health_score:.0f}%. " |
|
|
f"Tool wear: {tool_wear}/200 min, Power: {power_w:.0f}/9000 W, " |
|
|
f"Temp diff: {temp_diff:.1f} K, Speed: {speed}/2500 RPM", |
|
|
recommended_action="CONTINUE NORMAL OPERATION. Next scheduled maintenance as planned. " |
|
|
"Continue routine monitoring.", |
|
|
severity="LOW" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_anomalies(data: Dict[str, Any], features_df: pd.DataFrame) -> List[AnomalyDetail]: |
|
|
""" |
|
|
Detect anomalies in sensor readings and engineered features |
|
|
Returns list of anomaly details for visualization |
|
|
""" |
|
|
anomalies = [] |
|
|
|
|
|
|
|
|
air_temp = data['air_temperature'] |
|
|
process_temp = data['process_temperature'] |
|
|
speed = data['rotational_speed'] |
|
|
torque = data['torque'] |
|
|
tool_wear = data['tool_wear'] |
|
|
temp_diff = features_df['Temperature_Diff'].iloc[0] |
|
|
power_w = features_df['Power_W'].iloc[0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if air_temp < 290 or air_temp > 310: |
|
|
status = "CRITICAL" if air_temp < 285 or air_temp > 315 else "WARNING" |
|
|
anomalies.append(AnomalyDetail( |
|
|
parameter="Air Temperature", |
|
|
value=f"{air_temp:.1f} K", |
|
|
normal_range="290-310 K", |
|
|
status=status, |
|
|
explanation=f"Air temperature is {'too low' if air_temp < 290 else 'too high'}. This can affect cooling efficiency and equipment performance." |
|
|
)) |
|
|
|
|
|
|
|
|
if process_temp < 300 or process_temp > 315: |
|
|
status = "CRITICAL" if process_temp < 295 or process_temp > 320 else "WARNING" |
|
|
anomalies.append(AnomalyDetail( |
|
|
parameter="Process Temperature", |
|
|
value=f"{process_temp:.1f} K", |
|
|
normal_range="300-315 K", |
|
|
status=status, |
|
|
explanation=f"Process temperature is {'below optimal' if process_temp < 300 else 'exceeding optimal'} range. May indicate cooling system issues or excessive load." |
|
|
)) |
|
|
|
|
|
|
|
|
if temp_diff < 8.0 or temp_diff > 12.0: |
|
|
status = "CRITICAL" if temp_diff < 5.0 or temp_diff > 15.0 else "WARNING" |
|
|
anomalies.append(AnomalyDetail( |
|
|
parameter="Temperature Differential", |
|
|
value=f"{temp_diff:.1f} K", |
|
|
normal_range="8.0-12.0 K", |
|
|
status=status, |
|
|
explanation=f"Temperature gradient is {'insufficient' if temp_diff < 8.0 else 'excessive'}. This indicates {'poor heat dissipation' if temp_diff < 8.0 else 'thermal stress'}." |
|
|
)) |
|
|
|
|
|
|
|
|
if speed < 1200 or speed > 2000: |
|
|
status = "CRITICAL" if speed < 1000 or speed > 2500 else "WARNING" |
|
|
anomalies.append(AnomalyDetail( |
|
|
parameter="Rotational Speed", |
|
|
value=f"{speed} RPM", |
|
|
normal_range="1200-2000 RPM", |
|
|
status=status, |
|
|
explanation=f"Speed is {'below normal' if speed < 1200 else 'above normal'} operating range. This affects wear rate and vibration levels." |
|
|
)) |
|
|
|
|
|
|
|
|
if torque > 50: |
|
|
status = "CRITICAL" if torque > 60 else "WARNING" |
|
|
anomalies.append(AnomalyDetail( |
|
|
parameter="Torque", |
|
|
value=f"{torque:.1f} Nm", |
|
|
normal_range="20-50 Nm", |
|
|
status=status, |
|
|
explanation=f"Torque is {'significantly ' if torque > 60 else ''}exceeding normal range. This indicates mechanical overload or obstruction." |
|
|
)) |
|
|
|
|
|
|
|
|
if tool_wear > 150: |
|
|
status = "CRITICAL" if tool_wear > 200 else "WARNING" |
|
|
anomalies.append(AnomalyDetail( |
|
|
parameter="Tool Wear", |
|
|
value=f"{tool_wear} min", |
|
|
normal_range="0-150 min", |
|
|
status=status, |
|
|
explanation=f"Tool wear is {'critically high' if tool_wear > 200 else 'elevated'}. {'IMMEDIATE replacement required' if tool_wear > 200 else 'Schedule replacement soon'}." |
|
|
)) |
|
|
|
|
|
|
|
|
if power_w > 8000: |
|
|
status = "CRITICAL" if power_w > 9000 else "WARNING" |
|
|
anomalies.append(AnomalyDetail( |
|
|
parameter="Power Output", |
|
|
value=f"{power_w:.0f} W", |
|
|
normal_range="3000-8000 W", |
|
|
status=status, |
|
|
explanation=f"Power consumption is {'critically ' if power_w > 9000 else ''}high. This suggests mechanical resistance or overload conditions." |
|
|
)) |
|
|
|
|
|
return anomalies |
|
|
|
|
|
|
|
|
def determine_overall_health(prediction: int, confidence: float, severity: str, anomaly_count: int) -> str: |
|
|
""" |
|
|
Determine overall machine health status based on multiple factors |
|
|
Returns: EXCELLENT, GOOD, FAIR, or POOR health status |
|
|
""" |
|
|
if prediction == 1: |
|
|
|
|
|
if severity == "CRITICAL" or confidence > 0.9: |
|
|
return "🔴 POOR HEALTH - Critical failure risk detected. Immediate action required." |
|
|
elif severity == "HIGH" or confidence > 0.75: |
|
|
return "🟠 POOR HEALTH - High failure risk. Urgent maintenance needed." |
|
|
else: |
|
|
return "🟡 FAIR HEALTH - Moderate failure risk detected. Schedule maintenance soon." |
|
|
else: |
|
|
|
|
|
if anomaly_count == 0: |
|
|
return "🟢 EXCELLENT HEALTH - All systems operating optimally within normal parameters." |
|
|
elif anomaly_count <= 2: |
|
|
return "🟢 GOOD HEALTH - Minor anomalies detected but within acceptable limits." |
|
|
else: |
|
|
return "🟡 FAIR HEALTH - Multiple anomalies detected. Monitor closely and address issues." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/predict", response_model=PredictionResponse) |
|
|
async def predict_single(request: PredictionRequest): |
|
|
""" |
|
|
Predict failure risk for a single machine reading |
|
|
|
|
|
Returns: |
|
|
- Prediction (HEALTHY/FAILURE) |
|
|
- Confidence score |
|
|
- Detailed diagnostics with actionable recommendations |
|
|
""" |
|
|
try: |
|
|
|
|
|
data = request.dict() |
|
|
|
|
|
|
|
|
features_df = engineer_features(data) |
|
|
|
|
|
|
|
|
features_scaled = MODEL_PIPELINE['scaler'].transform(features_df) |
|
|
|
|
|
|
|
|
prediction = MODEL_PIPELINE['model'].predict(features_scaled)[0] |
|
|
prediction_proba = MODEL_PIPELINE['model'].predict_proba(features_scaled)[0] |
|
|
|
|
|
|
|
|
confidence = float(prediction_proba[prediction]) |
|
|
|
|
|
|
|
|
diagnostics = analyze_condition(data, prediction, confidence, features_df) |
|
|
|
|
|
|
|
|
features_dict = features_df.iloc[0].to_dict() |
|
|
|
|
|
|
|
|
anomalies = detect_anomalies(data, features_df) |
|
|
|
|
|
|
|
|
overall_health = determine_overall_health(prediction, confidence, diagnostics.severity, len(anomalies)) |
|
|
|
|
|
|
|
|
return PredictionResponse( |
|
|
machine_id=request.machine_id, |
|
|
timestamp=datetime.now().isoformat(), |
|
|
prediction="FAILURE" if prediction == 1 else "HEALTHY", |
|
|
confidence=round(confidence, 4), |
|
|
diagnostics=diagnostics, |
|
|
features=features_dict, |
|
|
anomalies=anomalies, |
|
|
overall_health=overall_health |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Prediction error: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"Prediction failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/predict/batch") |
|
|
async def predict_batch(file: UploadFile = File(...)): |
|
|
""" |
|
|
Batch prediction from CSV upload |
|
|
|
|
|
Input: CSV file with columns matching PredictionRequest |
|
|
Output: CSV with added columns: Prediction, Probability, Primary_Cause, |
|
|
Sensor_Alert, Recommended_Action, Severity |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not file.filename.endswith('.csv'): |
|
|
raise HTTPException(status_code=400, detail="File must be CSV format") |
|
|
|
|
|
|
|
|
contents = await file.read() |
|
|
df = pd.read_csv(io.StringIO(contents.decode('utf-8'))) |
|
|
|
|
|
|
|
|
df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('[', '_').str.replace(']', '').str.replace('__', '_').str.rstrip('_') |
|
|
|
|
|
|
|
|
column_mapping = { |
|
|
'air_temperature_k': 'air_temperature', |
|
|
'process_temperature_k': 'process_temperature', |
|
|
'rotational_speed_rpm': 'rotational_speed', |
|
|
'torque_nm': 'torque', |
|
|
'tool_wear_min': 'tool_wear' |
|
|
} |
|
|
|
|
|
|
|
|
df.rename(columns=column_mapping, inplace=True) |
|
|
|
|
|
|
|
|
required_columns = [ |
|
|
'air_temperature', 'process_temperature', 'rotational_speed', |
|
|
'torque', 'tool_wear', 'type' |
|
|
] |
|
|
missing_cols = [col for col in required_columns if col not in df.columns] |
|
|
if missing_cols: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Missing required columns: {missing_cols}. Found columns: {list(df.columns)}" |
|
|
) |
|
|
|
|
|
|
|
|
predictions = [] |
|
|
probabilities = [] |
|
|
causes = [] |
|
|
alerts = [] |
|
|
actions = [] |
|
|
severities = [] |
|
|
|
|
|
|
|
|
for idx, row in df.iterrows(): |
|
|
try: |
|
|
|
|
|
data = { |
|
|
'machine_id': row.get('machine_id', f'MACHINE_{idx}'), |
|
|
'air_temperature': float(row['air_temperature']), |
|
|
'process_temperature': float(row['process_temperature']), |
|
|
'rotational_speed': int(row['rotational_speed']), |
|
|
'torque': float(row['torque']), |
|
|
'tool_wear': int(row['tool_wear']), |
|
|
'type': str(row['type']) |
|
|
} |
|
|
|
|
|
|
|
|
features_df = engineer_features(data) |
|
|
|
|
|
|
|
|
features_scaled = MODEL_PIPELINE['scaler'].transform(features_df) |
|
|
prediction = MODEL_PIPELINE['model'].predict(features_scaled)[0] |
|
|
prediction_proba = MODEL_PIPELINE['model'].predict_proba(features_scaled)[0] |
|
|
confidence = float(prediction_proba[prediction]) |
|
|
|
|
|
|
|
|
diagnostics = analyze_condition(data, prediction, confidence, features_df) |
|
|
|
|
|
|
|
|
predictions.append("FAILURE" if prediction == 1 else "HEALTHY") |
|
|
probabilities.append(round(confidence, 4)) |
|
|
causes.append(diagnostics.primary_cause) |
|
|
alerts.append(diagnostics.sensor_alert) |
|
|
actions.append(diagnostics.recommended_action) |
|
|
severities.append(diagnostics.severity) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Row {idx} processing error: {e}") |
|
|
predictions.append("ERROR") |
|
|
probabilities.append(0.0) |
|
|
causes.append("Processing Error") |
|
|
alerts.append(str(e)) |
|
|
actions.append("Review input data") |
|
|
severities.append("UNKNOWN") |
|
|
|
|
|
|
|
|
df['Prediction'] = predictions |
|
|
df['Confidence'] = probabilities |
|
|
df['Primary_Cause'] = causes |
|
|
df['Sensor_Alert'] = alerts |
|
|
df['Recommended_Action'] = actions |
|
|
df['Severity'] = severities |
|
|
df['Processed_At'] = datetime.now().isoformat() |
|
|
|
|
|
|
|
|
output = io.StringIO() |
|
|
df.to_csv(output, index=False) |
|
|
output.seek(0) |
|
|
|
|
|
|
|
|
return StreamingResponse( |
|
|
iter([output.getvalue()]), |
|
|
media_type="text/csv", |
|
|
headers={ |
|
|
"Content-Disposition": f"attachment; filename=predictions_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" |
|
|
} |
|
|
) |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Batch prediction error: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"Batch prediction failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/predict/batch/json") |
|
|
async def predict_batch_json(file: UploadFile = File(...)): |
|
|
""" |
|
|
Batch prediction from CSV upload - returns detailed JSON for UI display |
|
|
|
|
|
Input: CSV file with required columns |
|
|
Output: JSON with detailed predictions, features, and anomalies for each row |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not file.filename.endswith('.csv'): |
|
|
raise HTTPException(status_code=400, detail="File must be CSV format") |
|
|
|
|
|
|
|
|
contents = await file.read() |
|
|
df = pd.read_csv(io.StringIO(contents.decode('utf-8'))) |
|
|
|
|
|
|
|
|
df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_') |
|
|
|
|
|
|
|
|
required_columns = [ |
|
|
'air_temperature', 'process_temperature', 'rotational_speed', |
|
|
'torque', 'tool_wear', 'type' |
|
|
] |
|
|
missing_cols = [col for col in required_columns if col not in df.columns] |
|
|
if missing_cols: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Missing required columns: {missing_cols}. Found columns: {list(df.columns)}" |
|
|
) |
|
|
|
|
|
|
|
|
if len(df) > 1000: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail=f"Batch JSON endpoint limited to 500 rows. Your file has {len(df)} rows. Use /predict/batch for larger files or download CSV results." |
|
|
) |
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
for idx, row in df.iterrows(): |
|
|
try: |
|
|
|
|
|
data = { |
|
|
'machine_id': row.get('machine_id', f'MACHINE_{idx+1}'), |
|
|
'air_temperature': float(row['air_temperature']), |
|
|
'process_temperature': float(row['process_temperature']), |
|
|
'rotational_speed': int(row['rotational_speed']), |
|
|
'torque': float(row['torque']), |
|
|
'tool_wear': int(row['tool_wear']), |
|
|
'type': str(row['type']).strip().upper() |
|
|
} |
|
|
|
|
|
|
|
|
features_df = engineer_features(data) |
|
|
|
|
|
|
|
|
features_scaled = MODEL_PIPELINE['scaler'].transform(features_df) |
|
|
prediction = MODEL_PIPELINE['model'].predict(features_scaled)[0] |
|
|
prediction_proba = MODEL_PIPELINE['model'].predict_proba(features_scaled)[0] |
|
|
confidence = float(prediction_proba[prediction]) |
|
|
|
|
|
|
|
|
diagnostics = analyze_condition(data, prediction, confidence, features_df) |
|
|
|
|
|
|
|
|
features_dict = features_df.iloc[0].to_dict() |
|
|
|
|
|
|
|
|
anomalies = detect_anomalies(data, features_df) |
|
|
|
|
|
|
|
|
overall_health = determine_overall_health(prediction, confidence, diagnostics.severity, len(anomalies)) |
|
|
|
|
|
|
|
|
results.append({ |
|
|
"row_number": idx + 1, |
|
|
"machine_id": data['machine_id'], |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"prediction": "FAILURE" if prediction == 1 else "HEALTHY", |
|
|
"confidence": round(confidence, 4), |
|
|
"diagnostics": { |
|
|
"primary_cause": diagnostics.primary_cause, |
|
|
"sensor_alert": diagnostics.sensor_alert, |
|
|
"recommended_action": diagnostics.recommended_action, |
|
|
"severity": diagnostics.severity |
|
|
}, |
|
|
"features": features_dict, |
|
|
"anomalies": [ |
|
|
{ |
|
|
"parameter": a.parameter, |
|
|
"value": a.value, |
|
|
"normal_range": a.normal_range, |
|
|
"status": a.status, |
|
|
"explanation": a.explanation |
|
|
} for a in anomalies |
|
|
], |
|
|
"overall_health": overall_health, |
|
|
"input_data": { |
|
|
"air_temperature": data['air_temperature'], |
|
|
"process_temperature": data['process_temperature'], |
|
|
"rotational_speed": data['rotational_speed'], |
|
|
"torque": data['torque'], |
|
|
"tool_wear": data['tool_wear'], |
|
|
"type": data['type'] |
|
|
} |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Row {idx+1} processing error: {e}") |
|
|
results.append({ |
|
|
"row_number": idx + 1, |
|
|
"machine_id": row.get('machine_id', f'MACHINE_{idx+1}'), |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"prediction": "ERROR", |
|
|
"confidence": 0.0, |
|
|
"diagnostics": { |
|
|
"primary_cause": "Processing Error", |
|
|
"sensor_alert": str(e), |
|
|
"recommended_action": "Review input data format and values", |
|
|
"severity": "UNKNOWN" |
|
|
}, |
|
|
"error": str(e) |
|
|
}) |
|
|
|
|
|
|
|
|
total = len(results) |
|
|
failures = sum(1 for r in results if r.get('prediction') == 'FAILURE') |
|
|
healthy = sum(1 for r in results if r.get('prediction') == 'HEALTHY') |
|
|
errors = sum(1 for r in results if r.get('prediction') == 'ERROR') |
|
|
|
|
|
return { |
|
|
"summary": { |
|
|
"total_records": total, |
|
|
"predictions": { |
|
|
"failure": failures, |
|
|
"healthy": healthy, |
|
|
"errors": errors |
|
|
}, |
|
|
"failure_rate": round(failures / total * 100, 2) if total > 0 else 0 |
|
|
}, |
|
|
"results": results, |
|
|
"processed_at": datetime.now().isoformat() |
|
|
} |
|
|
|
|
|
except HTTPException: |
|
|
raise |
|
|
except Exception as e: |
|
|
logger.error(f"Batch JSON prediction error: {e}") |
|
|
raise HTTPException(status_code=500, detail=f"Batch processing failed: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
"""API root endpoint""" |
|
|
return { |
|
|
"service": "Predictive Maintenance API", |
|
|
"version": "1.0.0", |
|
|
"status": "operational", |
|
|
"model_loaded": MODEL_PIPELINE is not None, |
|
|
"endpoints": { |
|
|
"single_prediction": "/predict", |
|
|
"batch_prediction": "/predict/batch", |
|
|
"health": "/health", |
|
|
"model_info": "/model/info" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""Health check endpoint""" |
|
|
return { |
|
|
"status": "healthy" if MODEL_PIPELINE is not None else "unhealthy", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"model_loaded": MODEL_PIPELINE is not None, |
|
|
"model_repo": HF_MODEL_REPO |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/model/info") |
|
|
async def model_info(): |
|
|
"""Get model information""" |
|
|
if MODEL_PIPELINE is None: |
|
|
raise HTTPException(status_code=503, detail="Model not loaded") |
|
|
|
|
|
return { |
|
|
"model_type": type(MODEL_PIPELINE['model']).__name__, |
|
|
"features": MODEL_PIPELINE['features'], |
|
|
"model_name": MODEL_PIPELINE.get('model_name', 'Unknown'), |
|
|
"performance": MODEL_PIPELINE.get('performance', {}), |
|
|
"training_config": MODEL_PIPELINE.get('training_config', {}), |
|
|
"random_state": MODEL_PIPELINE.get('random_state', None) |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run( |
|
|
"main:app", |
|
|
host="0.0.0.0", |
|
|
port=8000, |
|
|
reload=True, |
|
|
log_level="info" |
|
|
) |
|
|
|