AC02-ML / api /main.py
deropxyz's picture
Fix Dockerfile: Remove non-existent models folder copy and fix health endpoint
60e8834
"""
FastAPI Predictive Maintenance API with Rule-Based Diagnostics
Senior Backend Engineer & Reliability Engineer Implementation
Features:
- ML-based failure prediction using XGBoost
- Physics-based diagnostic engine
- Single prediction and batch processing endpoints
- Production-ready with proper error handling
"""
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field, validator
from typing import Optional, Dict, Any, List
import joblib
import numpy as np
import pandas as pd
from datetime import datetime
from pathlib import Path
import os
import io
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI
app = FastAPI(
title="Predictive Maintenance API",
description="ML + Rule-Based Diagnostics for Industrial Equipment",
version="1.0.0"
)
# CORS Configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
from huggingface_hub import hf_hub_download
# Global model pipeline
MODEL_PIPELINE = None
HF_MODEL_REPO = os.getenv("HF_MODEL_REPO", "deropxyz/AC02-ML")
PRIMARY_MODEL_FILENAME = os.getenv("HF_PRIMARY_MODEL", "model_pipeline_improved.joblib")
FALLBACK_MODEL_FILENAME = os.getenv("HF_FALLBACK_MODEL", "model_pipeline.joblib")
HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN")
def download_model_artifact(filename: str) -> Path:
"""
Download a model artifact from Hugging Face Hub and return the local cache path.
"""
if not filename:
raise ValueError("Filename must be provided for model download")
logger.info("Downloading model '%s' from repo '%s'...", filename, HF_MODEL_REPO)
cache_path = hf_hub_download(
repo_id=HF_MODEL_REPO,
filename=filename,
repo_type="model",
token=HF_TOKEN,
)
return Path(cache_path)
# ============================================================================
# REQUEST/RESPONSE SCHEMAS
# ============================================================================
class PredictionRequest(BaseModel):
"""Single prediction request schema matching dataset columns"""
machine_id: Optional[str] = Field(None, description="Machine identifier (e.g., M_L_01)")
air_temperature: float = Field(..., ge=250, le=350, description="Air temperature in Kelvin")
process_temperature: float = Field(..., ge=250, le=400, description="Process temperature in Kelvin")
rotational_speed: int = Field(..., ge=0, le=3000, description="Rotational speed in RPM")
torque: float = Field(..., ge=0, le=100, description="Torque in Nm")
tool_wear: int = Field(..., ge=0, le=300, description="Tool wear in minutes")
type: str = Field(..., description="Machine type: L (Low), M (Medium), H (High)")
@validator('type')
def validate_type(cls, v):
if v not in ['L', 'M', 'H']:
raise ValueError('Type must be L, M, or H')
return v
class Config:
schema_extra = {
"example": {
"machine_id": "M_L_01",
"air_temperature": 298.1,
"process_temperature": 308.6,
"rotational_speed": 1551,
"torque": 42.8,
"tool_wear": 0,
"type": "M"
}
}
class DiagnosticResult(BaseModel):
"""Diagnostic analysis result"""
primary_cause: str
sensor_alert: str
recommended_action: str
severity: str # LOW, MEDIUM, HIGH, CRITICAL
class AnomalyDetail(BaseModel):
"""Anomaly detection detail"""
parameter: str
value: str
normal_range: str
status: str # NORMAL, WARNING, CRITICAL
explanation: str
class PredictionResponse(BaseModel):
"""Single prediction response"""
machine_id: Optional[str]
timestamp: str
prediction: str # HEALTHY or FAILURE
confidence: float
diagnostics: DiagnosticResult
features: Optional[Dict[str, Any]] = None # Engineered features
anomalies: Optional[List[AnomalyDetail]] = None # Parameter anomaly analysis
overall_health: Optional[str] = None # Overall health status
# ============================================================================
# STARTUP EVENT: LOAD MODEL
# ============================================================================
@app.on_event("startup")
async def load_model():
"""Download and load model pipeline on startup"""
global MODEL_PIPELINE
last_error: Optional[Exception] = None
for candidate in [PRIMARY_MODEL_FILENAME, FALLBACK_MODEL_FILENAME]:
if not candidate:
continue
try:
model_path = download_model_artifact(candidate)
MODEL_PIPELINE = joblib.load(model_path)
logger.info("✅ Model loaded from %s", candidate)
break
except Exception as err:
last_error = err
logger.error("Failed to load model '%s': %s", candidate, err)
if MODEL_PIPELINE is None:
raise RuntimeError(f"Unable to load any model artifact: {last_error}") from last_error
# Validate model structure
required_keys = ['model', 'scaler', 'features']
for key in required_keys:
if key not in MODEL_PIPELINE:
raise ValueError(f"Model pipeline missing key: {key}")
logger.info("Model type: %s", type(MODEL_PIPELINE['model']).__name__)
logger.info("Features: %s", MODEL_PIPELINE['features'])
# ============================================================================
# FEATURE ENGINEERING (Must match training logic)
# ============================================================================
def engineer_features(data: Dict[str, Any]) -> pd.DataFrame:
"""
Apply physics-based feature engineering matching training pipeline
Features:
- Temperature_Diff: Thermal stress indicator
- Power_W: Mechanical power (P = τω)
- Type_Encoded: Categorical encoding
- Tool_wear_hours: Tool wear converted to hours
"""
# Create base dataframe
df = pd.DataFrame([{
'Air_temp_K': data['air_temperature'],
'Process_temp_K': data['process_temperature'],
'Speed_rpm': data['rotational_speed'],
'Torque_Nm': data['torque'],
'Tool_wear_min': data['tool_wear']
}])
# Physics-based features
df['Tool_wear_hours'] = df['Tool_wear_min'] / 60.0 # Convert minutes to hours
df['Temperature_Diff'] = df['Process_temp_K'] - df['Air_temp_K']
df['Power_W'] = df['Torque_Nm'] * df['Speed_rpm'] * (2 * np.pi / 60)
# Type encoding
type_mapping = {'L': 0, 'M': 1, 'H': 2}
df['Type_Encoded'] = type_mapping[data['type']]
# Ensure correct feature order matching training
feature_cols = MODEL_PIPELINE['features']
return df[feature_cols]
# ============================================================================
# DIAGNOSTIC ENGINE (Rule-Based Intelligence)
# ============================================================================
def analyze_condition(
data: Dict[str, Any],
pred_class: int,
pred_proba: float,
engineered_features: pd.DataFrame
) -> DiagnosticResult:
"""
Advanced diagnostic engine combining ML predictions with physics-based rules
Priority Rules:
1. Tool Wear Failure (CRITICAL)
2. Power Overstrain (HIGH)
3. Heat Dissipation Issue (MEDIUM)
4. ML Anomaly Detection (VARIABLE)
5. Healthy Operation (LOW)
"""
# Extract values for rule evaluation
temp_diff = engineered_features['Temperature_Diff'].iloc[0]
power_w = engineered_features['Power_W'].iloc[0]
torque = data['torque']
tool_wear = data['tool_wear']
speed = data['rotational_speed']
air_temp = data['air_temperature']
process_temp = data['process_temperature']
# ========================================================================
# RULE 1: TOOL WEAR FAILURE (CRITICAL PRIORITY)
# ========================================================================
if tool_wear > 200:
return DiagnosticResult(
primary_cause="Tool End of Life",
sensor_alert=f"Tool wear ({tool_wear} min) exceeds safety threshold (200 min)",
recommended_action="IMMEDIATE ACTION: Replace cutting tool before catastrophic failure. "
"Schedule downtime within 4 hours.",
severity="CRITICAL"
)
# ========================================================================
# RULE 2: POWER OVERSTRAIN (HIGH PRIORITY)
# ========================================================================
if power_w > 9000 or torque > 60:
cause_details = []
if power_w > 9000:
cause_details.append(f"Power output ({power_w:.0f} W) exceeds design limit (9000 W)")
if torque > 60:
cause_details.append(f"Torque ({torque:.1f} Nm) exceeds safety limit (60 Nm)")
return DiagnosticResult(
primary_cause="Power Overstrain / Mechanical Overload",
sensor_alert=" | ".join(cause_details),
recommended_action="URGENT: Reduce operational load immediately. "
"Inspect shaft alignment, bearing condition, and drive belt tension. "
"Check for material jamming or obstruction.",
severity="HIGH"
)
# ========================================================================
# RULE 3: HEAT DISSIPATION ISSUE (MEDIUM PRIORITY)
# ========================================================================
if temp_diff < 8.0 and speed > 1300:
return DiagnosticResult(
primary_cause="Inefficient Cooling System",
sensor_alert=f"Temperature differential ({temp_diff:.1f} K) too low at high speed ({speed} RPM). "
f"Expected ≥8.0 K for proper heat dissipation.",
recommended_action="MAINTENANCE REQUIRED: Inspect cooling system within 24 hours. "
"Check heat exchanger efficiency, coolant flow rate, and radiator fins. "
"Verify coolant temperature and pressure.",
severity="MEDIUM"
)
# ========================================================================
# RULE 4: THERMAL STRESS (MEDIUM PRIORITY)
# ========================================================================
if temp_diff > 15.0:
return DiagnosticResult(
primary_cause="Excessive Thermal Stress",
sensor_alert=f"Temperature differential ({temp_diff:.1f} K) exceeds normal range (8-12 K). "
f"Process temp: {process_temp:.1f} K, Air temp: {air_temp:.1f} K",
recommended_action="MONITOR CLOSELY: High thermal gradient detected. "
"Check process parameters and reduce processing intensity if possible. "
"Verify insulation integrity and ambient conditions.",
severity="MEDIUM"
)
# ========================================================================
# RULE 5: HIGH-SPEED OPERATION RISK (MEDIUM PRIORITY)
# ========================================================================
if speed > 2500:
return DiagnosticResult(
primary_cause="High-Speed Operation Risk",
sensor_alert=f"Rotational speed ({speed} RPM) in critical range (>2500 RPM). "
f"Vibration and wear accelerate exponentially.",
recommended_action="PREVENTIVE ACTION: Monitor vibration levels closely. "
"Perform balance check and bearing inspection. "
"Consider reducing speed if not operationally critical.",
severity="MEDIUM"
)
# ========================================================================
# RULE 6: ML DETECTED ANOMALY (VARIABLE PRIORITY)
# ========================================================================
if pred_class == 1:
# ML model detected failure but no specific rule matched
confidence_level = "high" if pred_proba > 0.8 else "moderate"
return DiagnosticResult(
primary_cause="ML-Detected Anomaly Pattern",
sensor_alert=f"Machine learning model detected failure risk with {confidence_level} confidence ({pred_proba:.1%}). "
f"No specific rule violation identified.",
recommended_action="DIAGNOSTIC SCAN REQUIRED: Perform comprehensive machine diagnostic. "
"Check for: (1) Unusual vibration patterns, (2) Bearing wear, "
"(3) Lubrication quality, (4) Electrical anomalies, (5) Sensor calibration. "
"Review recent maintenance history.",
severity="HIGH" if pred_proba > 0.8 else "MEDIUM"
)
# ========================================================================
# RULE 7: HEALTHY OPERATION (LOW PRIORITY)
# ========================================================================
# Calculate health score
health_indicators = {
'tool_wear_ok': tool_wear <= 150,
'power_ok': power_w <= 8000,
'temp_ok': 8.0 <= temp_diff <= 12.0,
'speed_ok': speed <= 2000,
'torque_ok': torque <= 50
}
health_score = sum(health_indicators.values()) / len(health_indicators) * 100
return DiagnosticResult(
primary_cause="Normal Operation",
sensor_alert=f"All parameters within optimal range. Health score: {health_score:.0f}%. "
f"Tool wear: {tool_wear}/200 min, Power: {power_w:.0f}/9000 W, "
f"Temp diff: {temp_diff:.1f} K, Speed: {speed}/2500 RPM",
recommended_action="CONTINUE NORMAL OPERATION. Next scheduled maintenance as planned. "
"Continue routine monitoring.",
severity="LOW"
)
# ============================================================================
# ANOMALY DETECTION & HEALTH ASSESSMENT
# ============================================================================
def detect_anomalies(data: Dict[str, Any], features_df: pd.DataFrame) -> List[AnomalyDetail]:
"""
Detect anomalies in sensor readings and engineered features
Returns list of anomaly details for visualization
"""
anomalies = []
# Extract values
air_temp = data['air_temperature']
process_temp = data['process_temperature']
speed = data['rotational_speed']
torque = data['torque']
tool_wear = data['tool_wear']
temp_diff = features_df['Temperature_Diff'].iloc[0]
power_w = features_df['Power_W'].iloc[0]
# Define normal ranges and check each parameter
# 1. Air Temperature
if air_temp < 290 or air_temp > 310:
status = "CRITICAL" if air_temp < 285 or air_temp > 315 else "WARNING"
anomalies.append(AnomalyDetail(
parameter="Air Temperature",
value=f"{air_temp:.1f} K",
normal_range="290-310 K",
status=status,
explanation=f"Air temperature is {'too low' if air_temp < 290 else 'too high'}. This can affect cooling efficiency and equipment performance."
))
# 2. Process Temperature
if process_temp < 300 or process_temp > 315:
status = "CRITICAL" if process_temp < 295 or process_temp > 320 else "WARNING"
anomalies.append(AnomalyDetail(
parameter="Process Temperature",
value=f"{process_temp:.1f} K",
normal_range="300-315 K",
status=status,
explanation=f"Process temperature is {'below optimal' if process_temp < 300 else 'exceeding optimal'} range. May indicate cooling system issues or excessive load."
))
# 3. Temperature Differential
if temp_diff < 8.0 or temp_diff > 12.0:
status = "CRITICAL" if temp_diff < 5.0 or temp_diff > 15.0 else "WARNING"
anomalies.append(AnomalyDetail(
parameter="Temperature Differential",
value=f"{temp_diff:.1f} K",
normal_range="8.0-12.0 K",
status=status,
explanation=f"Temperature gradient is {'insufficient' if temp_diff < 8.0 else 'excessive'}. This indicates {'poor heat dissipation' if temp_diff < 8.0 else 'thermal stress'}."
))
# 4. Rotational Speed
if speed < 1200 or speed > 2000:
status = "CRITICAL" if speed < 1000 or speed > 2500 else "WARNING"
anomalies.append(AnomalyDetail(
parameter="Rotational Speed",
value=f"{speed} RPM",
normal_range="1200-2000 RPM",
status=status,
explanation=f"Speed is {'below normal' if speed < 1200 else 'above normal'} operating range. This affects wear rate and vibration levels."
))
# 5. Torque
if torque > 50:
status = "CRITICAL" if torque > 60 else "WARNING"
anomalies.append(AnomalyDetail(
parameter="Torque",
value=f"{torque:.1f} Nm",
normal_range="20-50 Nm",
status=status,
explanation=f"Torque is {'significantly ' if torque > 60 else ''}exceeding normal range. This indicates mechanical overload or obstruction."
))
# 6. Tool Wear
if tool_wear > 150:
status = "CRITICAL" if tool_wear > 200 else "WARNING"
anomalies.append(AnomalyDetail(
parameter="Tool Wear",
value=f"{tool_wear} min",
normal_range="0-150 min",
status=status,
explanation=f"Tool wear is {'critically high' if tool_wear > 200 else 'elevated'}. {'IMMEDIATE replacement required' if tool_wear > 200 else 'Schedule replacement soon'}."
))
# 7. Power Output
if power_w > 8000:
status = "CRITICAL" if power_w > 9000 else "WARNING"
anomalies.append(AnomalyDetail(
parameter="Power Output",
value=f"{power_w:.0f} W",
normal_range="3000-8000 W",
status=status,
explanation=f"Power consumption is {'critically ' if power_w > 9000 else ''}high. This suggests mechanical resistance or overload conditions."
))
return anomalies
def determine_overall_health(prediction: int, confidence: float, severity: str, anomaly_count: int) -> str:
"""
Determine overall machine health status based on multiple factors
Returns: EXCELLENT, GOOD, FAIR, or POOR health status
"""
if prediction == 1:
# Failure predicted
if severity == "CRITICAL" or confidence > 0.9:
return "🔴 POOR HEALTH - Critical failure risk detected. Immediate action required."
elif severity == "HIGH" or confidence > 0.75:
return "🟠 POOR HEALTH - High failure risk. Urgent maintenance needed."
else:
return "🟡 FAIR HEALTH - Moderate failure risk detected. Schedule maintenance soon."
else:
# No failure predicted
if anomaly_count == 0:
return "🟢 EXCELLENT HEALTH - All systems operating optimally within normal parameters."
elif anomaly_count <= 2:
return "🟢 GOOD HEALTH - Minor anomalies detected but within acceptable limits."
else:
return "🟡 FAIR HEALTH - Multiple anomalies detected. Monitor closely and address issues."
# ============================================================================
# ENDPOINT 1: SINGLE PREDICTION
# ============================================================================
@app.post("/predict", response_model=PredictionResponse)
async def predict_single(request: PredictionRequest):
"""
Predict failure risk for a single machine reading
Returns:
- Prediction (HEALTHY/FAILURE)
- Confidence score
- Detailed diagnostics with actionable recommendations
"""
try:
# Convert request to dict
data = request.dict()
# Feature engineering
features_df = engineer_features(data)
# Scale features
features_scaled = MODEL_PIPELINE['scaler'].transform(features_df)
# Predict
prediction = MODEL_PIPELINE['model'].predict(features_scaled)[0]
prediction_proba = MODEL_PIPELINE['model'].predict_proba(features_scaled)[0]
# Get confidence (probability of predicted class)
confidence = float(prediction_proba[prediction])
# Run diagnostic engine
diagnostics = analyze_condition(data, prediction, confidence, features_df)
# Prepare features dict for display
features_dict = features_df.iloc[0].to_dict()
# Detect anomalies
anomalies = detect_anomalies(data, features_df)
# Determine overall health
overall_health = determine_overall_health(prediction, confidence, diagnostics.severity, len(anomalies))
# Format response
return PredictionResponse(
machine_id=request.machine_id,
timestamp=datetime.now().isoformat(),
prediction="FAILURE" if prediction == 1 else "HEALTHY",
confidence=round(confidence, 4),
diagnostics=diagnostics,
features=features_dict,
anomalies=anomalies,
overall_health=overall_health
)
except Exception as e:
logger.error(f"Prediction error: {e}")
raise HTTPException(status_code=500, detail=f"Prediction failed: {str(e)}")
# ============================================================================
# ENDPOINT 2: BATCH PREDICTION (CSV UPLOAD)
# ============================================================================
@app.post("/predict/batch")
async def predict_batch(file: UploadFile = File(...)):
"""
Batch prediction from CSV upload
Input: CSV file with columns matching PredictionRequest
Output: CSV with added columns: Prediction, Probability, Primary_Cause,
Sensor_Alert, Recommended_Action, Severity
"""
try:
# Validate file type
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="File must be CSV format")
# Read CSV
contents = await file.read()
df = pd.read_csv(io.StringIO(contents.decode('utf-8')))
# Normalize column names (handle case-insensitive, spaces, brackets, etc)
df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('[', '_').str.replace(']', '').str.replace('__', '_').str.rstrip('_')
# Create column mapping for different formats
column_mapping = {
'air_temperature_k': 'air_temperature',
'process_temperature_k': 'process_temperature',
'rotational_speed_rpm': 'rotational_speed',
'torque_nm': 'torque',
'tool_wear_min': 'tool_wear'
}
# Apply mapping if columns have units
df.rename(columns=column_mapping, inplace=True)
# Validate required columns
required_columns = [
'air_temperature', 'process_temperature', 'rotational_speed',
'torque', 'tool_wear', 'type'
]
missing_cols = [col for col in required_columns if col not in df.columns]
if missing_cols:
raise HTTPException(
status_code=400,
detail=f"Missing required columns: {missing_cols}. Found columns: {list(df.columns)}"
)
# Initialize result columns
predictions = []
probabilities = []
causes = []
alerts = []
actions = []
severities = []
# Process each row
for idx, row in df.iterrows():
try:
# Prepare data
data = {
'machine_id': row.get('machine_id', f'MACHINE_{idx}'),
'air_temperature': float(row['air_temperature']),
'process_temperature': float(row['process_temperature']),
'rotational_speed': int(row['rotational_speed']),
'torque': float(row['torque']),
'tool_wear': int(row['tool_wear']),
'type': str(row['type'])
}
# Feature engineering
features_df = engineer_features(data)
# Scale and predict
features_scaled = MODEL_PIPELINE['scaler'].transform(features_df)
prediction = MODEL_PIPELINE['model'].predict(features_scaled)[0]
prediction_proba = MODEL_PIPELINE['model'].predict_proba(features_scaled)[0]
confidence = float(prediction_proba[prediction])
# Diagnostics
diagnostics = analyze_condition(data, prediction, confidence, features_df)
# Append results
predictions.append("FAILURE" if prediction == 1 else "HEALTHY")
probabilities.append(round(confidence, 4))
causes.append(diagnostics.primary_cause)
alerts.append(diagnostics.sensor_alert)
actions.append(diagnostics.recommended_action)
severities.append(diagnostics.severity)
except Exception as e:
logger.warning(f"Row {idx} processing error: {e}")
predictions.append("ERROR")
probabilities.append(0.0)
causes.append("Processing Error")
alerts.append(str(e))
actions.append("Review input data")
severities.append("UNKNOWN")
# Add results to dataframe
df['Prediction'] = predictions
df['Confidence'] = probabilities
df['Primary_Cause'] = causes
df['Sensor_Alert'] = alerts
df['Recommended_Action'] = actions
df['Severity'] = severities
df['Processed_At'] = datetime.now().isoformat()
# Convert to CSV
output = io.StringIO()
df.to_csv(output, index=False)
output.seek(0)
# Return as downloadable file
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={
"Content-Disposition": f"attachment; filename=predictions_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Batch prediction error: {e}")
raise HTTPException(status_code=500, detail=f"Batch prediction failed: {str(e)}")
# ============================================================================
# ENDPOINT 3: BATCH PREDICTION JSON (For UI Display)
# ============================================================================
@app.post("/predict/batch/json")
async def predict_batch_json(file: UploadFile = File(...)):
"""
Batch prediction from CSV upload - returns detailed JSON for UI display
Input: CSV file with required columns
Output: JSON with detailed predictions, features, and anomalies for each row
"""
try:
# Validate file type
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="File must be CSV format")
# Read CSV
contents = await file.read()
df = pd.read_csv(io.StringIO(contents.decode('utf-8')))
# Normalize column names
df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_')
# Validate required columns
required_columns = [
'air_temperature', 'process_temperature', 'rotational_speed',
'torque', 'tool_wear', 'type'
]
missing_cols = [col for col in required_columns if col not in df.columns]
if missing_cols:
raise HTTPException(
status_code=400,
detail=f"Missing required columns: {missing_cols}. Found columns: {list(df.columns)}"
)
# Limit batch size for JSON response (increased for better UX)
if len(df) > 1000:
raise HTTPException(
status_code=400,
detail=f"Batch JSON endpoint limited to 500 rows. Your file has {len(df)} rows. Use /predict/batch for larger files or download CSV results."
)
results = []
# Process each row
for idx, row in df.iterrows():
try:
# Prepare data
data = {
'machine_id': row.get('machine_id', f'MACHINE_{idx+1}'),
'air_temperature': float(row['air_temperature']),
'process_temperature': float(row['process_temperature']),
'rotational_speed': int(row['rotational_speed']),
'torque': float(row['torque']),
'tool_wear': int(row['tool_wear']),
'type': str(row['type']).strip().upper()
}
# Feature engineering
features_df = engineer_features(data)
# Scale and predict
features_scaled = MODEL_PIPELINE['scaler'].transform(features_df)
prediction = MODEL_PIPELINE['model'].predict(features_scaled)[0]
prediction_proba = MODEL_PIPELINE['model'].predict_proba(features_scaled)[0]
confidence = float(prediction_proba[prediction])
# Diagnostics
diagnostics = analyze_condition(data, prediction, confidence, features_df)
# Features dict
features_dict = features_df.iloc[0].to_dict()
# Detect anomalies
anomalies = detect_anomalies(data, features_df)
# Overall health
overall_health = determine_overall_health(prediction, confidence, diagnostics.severity, len(anomalies))
# Append result
results.append({
"row_number": idx + 1,
"machine_id": data['machine_id'],
"timestamp": datetime.now().isoformat(),
"prediction": "FAILURE" if prediction == 1 else "HEALTHY",
"confidence": round(confidence, 4),
"diagnostics": {
"primary_cause": diagnostics.primary_cause,
"sensor_alert": diagnostics.sensor_alert,
"recommended_action": diagnostics.recommended_action,
"severity": diagnostics.severity
},
"features": features_dict,
"anomalies": [
{
"parameter": a.parameter,
"value": a.value,
"normal_range": a.normal_range,
"status": a.status,
"explanation": a.explanation
} for a in anomalies
],
"overall_health": overall_health,
"input_data": {
"air_temperature": data['air_temperature'],
"process_temperature": data['process_temperature'],
"rotational_speed": data['rotational_speed'],
"torque": data['torque'],
"tool_wear": data['tool_wear'],
"type": data['type']
}
})
except Exception as e:
logger.warning(f"Row {idx+1} processing error: {e}")
results.append({
"row_number": idx + 1,
"machine_id": row.get('machine_id', f'MACHINE_{idx+1}'),
"timestamp": datetime.now().isoformat(),
"prediction": "ERROR",
"confidence": 0.0,
"diagnostics": {
"primary_cause": "Processing Error",
"sensor_alert": str(e),
"recommended_action": "Review input data format and values",
"severity": "UNKNOWN"
},
"error": str(e)
})
# Summary statistics
total = len(results)
failures = sum(1 for r in results if r.get('prediction') == 'FAILURE')
healthy = sum(1 for r in results if r.get('prediction') == 'HEALTHY')
errors = sum(1 for r in results if r.get('prediction') == 'ERROR')
return {
"summary": {
"total_records": total,
"predictions": {
"failure": failures,
"healthy": healthy,
"errors": errors
},
"failure_rate": round(failures / total * 100, 2) if total > 0 else 0
},
"results": results,
"processed_at": datetime.now().isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Batch JSON prediction error: {e}")
raise HTTPException(status_code=500, detail=f"Batch processing failed: {str(e)}")
# ============================================================================
# HEALTH CHECK ENDPOINTS
# ============================================================================
@app.get("/")
async def root():
"""API root endpoint"""
return {
"service": "Predictive Maintenance API",
"version": "1.0.0",
"status": "operational",
"model_loaded": MODEL_PIPELINE is not None,
"endpoints": {
"single_prediction": "/predict",
"batch_prediction": "/predict/batch",
"health": "/health",
"model_info": "/model/info"
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy" if MODEL_PIPELINE is not None else "unhealthy",
"timestamp": datetime.now().isoformat(),
"model_loaded": MODEL_PIPELINE is not None,
"model_repo": HF_MODEL_REPO
}
@app.get("/model/info")
async def model_info():
"""Get model information"""
if MODEL_PIPELINE is None:
raise HTTPException(status_code=503, detail="Model not loaded")
return {
"model_type": type(MODEL_PIPELINE['model']).__name__,
"features": MODEL_PIPELINE['features'],
"model_name": MODEL_PIPELINE.get('model_name', 'Unknown'),
"performance": MODEL_PIPELINE.get('performance', {}),
"training_config": MODEL_PIPELINE.get('training_config', {}),
"random_state": MODEL_PIPELINE.get('random_state', None)
}
# ============================================================================
# MAIN
# ============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)