""" FastAPI Predictive Maintenance API with Rule-Based Diagnostics Senior Backend Engineer & Reliability Engineer Implementation Features: - ML-based failure prediction using XGBoost - Physics-based diagnostic engine - Single prediction and batch processing endpoints - Production-ready with proper error handling """ from fastapi import FastAPI, HTTPException, UploadFile, File from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field, validator from typing import Optional, Dict, Any, List import joblib import numpy as np import pandas as pd from datetime import datetime from pathlib import Path import os import io import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Initialize FastAPI app = FastAPI( title="Predictive Maintenance API", description="ML + Rule-Based Diagnostics for Industrial Equipment", version="1.0.0" ) # CORS Configuration app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) from huggingface_hub import hf_hub_download # Global model pipeline MODEL_PIPELINE = None HF_MODEL_REPO = os.getenv("HF_MODEL_REPO", "deropxyz/AC02-ML") PRIMARY_MODEL_FILENAME = os.getenv("HF_PRIMARY_MODEL", "model_pipeline_improved.joblib") FALLBACK_MODEL_FILENAME = os.getenv("HF_FALLBACK_MODEL", "model_pipeline.joblib") HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") def download_model_artifact(filename: str) -> Path: """ Download a model artifact from Hugging Face Hub and return the local cache path. """ if not filename: raise ValueError("Filename must be provided for model download") logger.info("Downloading model '%s' from repo '%s'...", filename, HF_MODEL_REPO) cache_path = hf_hub_download( repo_id=HF_MODEL_REPO, filename=filename, repo_type="model", token=HF_TOKEN, ) return Path(cache_path) # ============================================================================ # REQUEST/RESPONSE SCHEMAS # ============================================================================ class PredictionRequest(BaseModel): """Single prediction request schema matching dataset columns""" machine_id: Optional[str] = Field(None, description="Machine identifier (e.g., M_L_01)") air_temperature: float = Field(..., ge=250, le=350, description="Air temperature in Kelvin") process_temperature: float = Field(..., ge=250, le=400, description="Process temperature in Kelvin") rotational_speed: int = Field(..., ge=0, le=3000, description="Rotational speed in RPM") torque: float = Field(..., ge=0, le=100, description="Torque in Nm") tool_wear: int = Field(..., ge=0, le=300, description="Tool wear in minutes") type: str = Field(..., description="Machine type: L (Low), M (Medium), H (High)") @validator('type') def validate_type(cls, v): if v not in ['L', 'M', 'H']: raise ValueError('Type must be L, M, or H') return v class Config: schema_extra = { "example": { "machine_id": "M_L_01", "air_temperature": 298.1, "process_temperature": 308.6, "rotational_speed": 1551, "torque": 42.8, "tool_wear": 0, "type": "M" } } class DiagnosticResult(BaseModel): """Diagnostic analysis result""" primary_cause: str sensor_alert: str recommended_action: str severity: str # LOW, MEDIUM, HIGH, CRITICAL class AnomalyDetail(BaseModel): """Anomaly detection detail""" parameter: str value: str normal_range: str status: str # NORMAL, WARNING, CRITICAL explanation: str class PredictionResponse(BaseModel): """Single prediction response""" machine_id: Optional[str] timestamp: str prediction: str # HEALTHY or FAILURE confidence: float diagnostics: DiagnosticResult features: Optional[Dict[str, Any]] = None # Engineered features anomalies: Optional[List[AnomalyDetail]] = None # Parameter anomaly analysis overall_health: Optional[str] = None # Overall health status # ============================================================================ # STARTUP EVENT: LOAD MODEL # ============================================================================ @app.on_event("startup") async def load_model(): """Download and load model pipeline on startup""" global MODEL_PIPELINE last_error: Optional[Exception] = None for candidate in [PRIMARY_MODEL_FILENAME, FALLBACK_MODEL_FILENAME]: if not candidate: continue try: model_path = download_model_artifact(candidate) MODEL_PIPELINE = joblib.load(model_path) logger.info("✅ Model loaded from %s", candidate) break except Exception as err: last_error = err logger.error("Failed to load model '%s': %s", candidate, err) if MODEL_PIPELINE is None: raise RuntimeError(f"Unable to load any model artifact: {last_error}") from last_error # Validate model structure required_keys = ['model', 'scaler', 'features'] for key in required_keys: if key not in MODEL_PIPELINE: raise ValueError(f"Model pipeline missing key: {key}") logger.info("Model type: %s", type(MODEL_PIPELINE['model']).__name__) logger.info("Features: %s", MODEL_PIPELINE['features']) # ============================================================================ # FEATURE ENGINEERING (Must match training logic) # ============================================================================ def engineer_features(data: Dict[str, Any]) -> pd.DataFrame: """ Apply physics-based feature engineering matching training pipeline Features: - Temperature_Diff: Thermal stress indicator - Power_W: Mechanical power (P = τω) - Type_Encoded: Categorical encoding - Tool_wear_hours: Tool wear converted to hours """ # Create base dataframe df = pd.DataFrame([{ 'Air_temp_K': data['air_temperature'], 'Process_temp_K': data['process_temperature'], 'Speed_rpm': data['rotational_speed'], 'Torque_Nm': data['torque'], 'Tool_wear_min': data['tool_wear'] }]) # Physics-based features df['Tool_wear_hours'] = df['Tool_wear_min'] / 60.0 # Convert minutes to hours df['Temperature_Diff'] = df['Process_temp_K'] - df['Air_temp_K'] df['Power_W'] = df['Torque_Nm'] * df['Speed_rpm'] * (2 * np.pi / 60) # Type encoding type_mapping = {'L': 0, 'M': 1, 'H': 2} df['Type_Encoded'] = type_mapping[data['type']] # Ensure correct feature order matching training feature_cols = MODEL_PIPELINE['features'] return df[feature_cols] # ============================================================================ # DIAGNOSTIC ENGINE (Rule-Based Intelligence) # ============================================================================ def analyze_condition( data: Dict[str, Any], pred_class: int, pred_proba: float, engineered_features: pd.DataFrame ) -> DiagnosticResult: """ Advanced diagnostic engine combining ML predictions with physics-based rules Priority Rules: 1. Tool Wear Failure (CRITICAL) 2. Power Overstrain (HIGH) 3. Heat Dissipation Issue (MEDIUM) 4. ML Anomaly Detection (VARIABLE) 5. Healthy Operation (LOW) """ # Extract values for rule evaluation temp_diff = engineered_features['Temperature_Diff'].iloc[0] power_w = engineered_features['Power_W'].iloc[0] torque = data['torque'] tool_wear = data['tool_wear'] speed = data['rotational_speed'] air_temp = data['air_temperature'] process_temp = data['process_temperature'] # ======================================================================== # RULE 1: TOOL WEAR FAILURE (CRITICAL PRIORITY) # ======================================================================== if tool_wear > 200: return DiagnosticResult( primary_cause="Tool End of Life", sensor_alert=f"Tool wear ({tool_wear} min) exceeds safety threshold (200 min)", recommended_action="IMMEDIATE ACTION: Replace cutting tool before catastrophic failure. " "Schedule downtime within 4 hours.", severity="CRITICAL" ) # ======================================================================== # RULE 2: POWER OVERSTRAIN (HIGH PRIORITY) # ======================================================================== if power_w > 9000 or torque > 60: cause_details = [] if power_w > 9000: cause_details.append(f"Power output ({power_w:.0f} W) exceeds design limit (9000 W)") if torque > 60: cause_details.append(f"Torque ({torque:.1f} Nm) exceeds safety limit (60 Nm)") return DiagnosticResult( primary_cause="Power Overstrain / Mechanical Overload", sensor_alert=" | ".join(cause_details), recommended_action="URGENT: Reduce operational load immediately. " "Inspect shaft alignment, bearing condition, and drive belt tension. " "Check for material jamming or obstruction.", severity="HIGH" ) # ======================================================================== # RULE 3: HEAT DISSIPATION ISSUE (MEDIUM PRIORITY) # ======================================================================== if temp_diff < 8.0 and speed > 1300: return DiagnosticResult( primary_cause="Inefficient Cooling System", sensor_alert=f"Temperature differential ({temp_diff:.1f} K) too low at high speed ({speed} RPM). " f"Expected ≥8.0 K for proper heat dissipation.", recommended_action="MAINTENANCE REQUIRED: Inspect cooling system within 24 hours. " "Check heat exchanger efficiency, coolant flow rate, and radiator fins. " "Verify coolant temperature and pressure.", severity="MEDIUM" ) # ======================================================================== # RULE 4: THERMAL STRESS (MEDIUM PRIORITY) # ======================================================================== if temp_diff > 15.0: return DiagnosticResult( primary_cause="Excessive Thermal Stress", sensor_alert=f"Temperature differential ({temp_diff:.1f} K) exceeds normal range (8-12 K). " f"Process temp: {process_temp:.1f} K, Air temp: {air_temp:.1f} K", recommended_action="MONITOR CLOSELY: High thermal gradient detected. " "Check process parameters and reduce processing intensity if possible. " "Verify insulation integrity and ambient conditions.", severity="MEDIUM" ) # ======================================================================== # RULE 5: HIGH-SPEED OPERATION RISK (MEDIUM PRIORITY) # ======================================================================== if speed > 2500: return DiagnosticResult( primary_cause="High-Speed Operation Risk", sensor_alert=f"Rotational speed ({speed} RPM) in critical range (>2500 RPM). " f"Vibration and wear accelerate exponentially.", recommended_action="PREVENTIVE ACTION: Monitor vibration levels closely. " "Perform balance check and bearing inspection. " "Consider reducing speed if not operationally critical.", severity="MEDIUM" ) # ======================================================================== # RULE 6: ML DETECTED ANOMALY (VARIABLE PRIORITY) # ======================================================================== if pred_class == 1: # ML model detected failure but no specific rule matched confidence_level = "high" if pred_proba > 0.8 else "moderate" return DiagnosticResult( primary_cause="ML-Detected Anomaly Pattern", sensor_alert=f"Machine learning model detected failure risk with {confidence_level} confidence ({pred_proba:.1%}). " f"No specific rule violation identified.", recommended_action="DIAGNOSTIC SCAN REQUIRED: Perform comprehensive machine diagnostic. " "Check for: (1) Unusual vibration patterns, (2) Bearing wear, " "(3) Lubrication quality, (4) Electrical anomalies, (5) Sensor calibration. " "Review recent maintenance history.", severity="HIGH" if pred_proba > 0.8 else "MEDIUM" ) # ======================================================================== # RULE 7: HEALTHY OPERATION (LOW PRIORITY) # ======================================================================== # Calculate health score health_indicators = { 'tool_wear_ok': tool_wear <= 150, 'power_ok': power_w <= 8000, 'temp_ok': 8.0 <= temp_diff <= 12.0, 'speed_ok': speed <= 2000, 'torque_ok': torque <= 50 } health_score = sum(health_indicators.values()) / len(health_indicators) * 100 return DiagnosticResult( primary_cause="Normal Operation", sensor_alert=f"All parameters within optimal range. Health score: {health_score:.0f}%. " f"Tool wear: {tool_wear}/200 min, Power: {power_w:.0f}/9000 W, " f"Temp diff: {temp_diff:.1f} K, Speed: {speed}/2500 RPM", recommended_action="CONTINUE NORMAL OPERATION. Next scheduled maintenance as planned. " "Continue routine monitoring.", severity="LOW" ) # ============================================================================ # ANOMALY DETECTION & HEALTH ASSESSMENT # ============================================================================ def detect_anomalies(data: Dict[str, Any], features_df: pd.DataFrame) -> List[AnomalyDetail]: """ Detect anomalies in sensor readings and engineered features Returns list of anomaly details for visualization """ anomalies = [] # Extract values air_temp = data['air_temperature'] process_temp = data['process_temperature'] speed = data['rotational_speed'] torque = data['torque'] tool_wear = data['tool_wear'] temp_diff = features_df['Temperature_Diff'].iloc[0] power_w = features_df['Power_W'].iloc[0] # Define normal ranges and check each parameter # 1. Air Temperature if air_temp < 290 or air_temp > 310: status = "CRITICAL" if air_temp < 285 or air_temp > 315 else "WARNING" anomalies.append(AnomalyDetail( parameter="Air Temperature", value=f"{air_temp:.1f} K", normal_range="290-310 K", status=status, explanation=f"Air temperature is {'too low' if air_temp < 290 else 'too high'}. This can affect cooling efficiency and equipment performance." )) # 2. Process Temperature if process_temp < 300 or process_temp > 315: status = "CRITICAL" if process_temp < 295 or process_temp > 320 else "WARNING" anomalies.append(AnomalyDetail( parameter="Process Temperature", value=f"{process_temp:.1f} K", normal_range="300-315 K", status=status, explanation=f"Process temperature is {'below optimal' if process_temp < 300 else 'exceeding optimal'} range. May indicate cooling system issues or excessive load." )) # 3. Temperature Differential if temp_diff < 8.0 or temp_diff > 12.0: status = "CRITICAL" if temp_diff < 5.0 or temp_diff > 15.0 else "WARNING" anomalies.append(AnomalyDetail( parameter="Temperature Differential", value=f"{temp_diff:.1f} K", normal_range="8.0-12.0 K", status=status, explanation=f"Temperature gradient is {'insufficient' if temp_diff < 8.0 else 'excessive'}. This indicates {'poor heat dissipation' if temp_diff < 8.0 else 'thermal stress'}." )) # 4. Rotational Speed if speed < 1200 or speed > 2000: status = "CRITICAL" if speed < 1000 or speed > 2500 else "WARNING" anomalies.append(AnomalyDetail( parameter="Rotational Speed", value=f"{speed} RPM", normal_range="1200-2000 RPM", status=status, explanation=f"Speed is {'below normal' if speed < 1200 else 'above normal'} operating range. This affects wear rate and vibration levels." )) # 5. Torque if torque > 50: status = "CRITICAL" if torque > 60 else "WARNING" anomalies.append(AnomalyDetail( parameter="Torque", value=f"{torque:.1f} Nm", normal_range="20-50 Nm", status=status, explanation=f"Torque is {'significantly ' if torque > 60 else ''}exceeding normal range. This indicates mechanical overload or obstruction." )) # 6. Tool Wear if tool_wear > 150: status = "CRITICAL" if tool_wear > 200 else "WARNING" anomalies.append(AnomalyDetail( parameter="Tool Wear", value=f"{tool_wear} min", normal_range="0-150 min", status=status, explanation=f"Tool wear is {'critically high' if tool_wear > 200 else 'elevated'}. {'IMMEDIATE replacement required' if tool_wear > 200 else 'Schedule replacement soon'}." )) # 7. Power Output if power_w > 8000: status = "CRITICAL" if power_w > 9000 else "WARNING" anomalies.append(AnomalyDetail( parameter="Power Output", value=f"{power_w:.0f} W", normal_range="3000-8000 W", status=status, explanation=f"Power consumption is {'critically ' if power_w > 9000 else ''}high. This suggests mechanical resistance or overload conditions." )) return anomalies def determine_overall_health(prediction: int, confidence: float, severity: str, anomaly_count: int) -> str: """ Determine overall machine health status based on multiple factors Returns: EXCELLENT, GOOD, FAIR, or POOR health status """ if prediction == 1: # Failure predicted if severity == "CRITICAL" or confidence > 0.9: return "🔴 POOR HEALTH - Critical failure risk detected. Immediate action required." elif severity == "HIGH" or confidence > 0.75: return "🟠 POOR HEALTH - High failure risk. Urgent maintenance needed." else: return "🟡 FAIR HEALTH - Moderate failure risk detected. Schedule maintenance soon." else: # No failure predicted if anomaly_count == 0: return "🟢 EXCELLENT HEALTH - All systems operating optimally within normal parameters." elif anomaly_count <= 2: return "🟢 GOOD HEALTH - Minor anomalies detected but within acceptable limits." else: return "🟡 FAIR HEALTH - Multiple anomalies detected. Monitor closely and address issues." # ============================================================================ # ENDPOINT 1: SINGLE PREDICTION # ============================================================================ @app.post("/predict", response_model=PredictionResponse) async def predict_single(request: PredictionRequest): """ Predict failure risk for a single machine reading Returns: - Prediction (HEALTHY/FAILURE) - Confidence score - Detailed diagnostics with actionable recommendations """ try: # Convert request to dict data = request.dict() # Feature engineering features_df = engineer_features(data) # Scale features features_scaled = MODEL_PIPELINE['scaler'].transform(features_df) # Predict prediction = MODEL_PIPELINE['model'].predict(features_scaled)[0] prediction_proba = MODEL_PIPELINE['model'].predict_proba(features_scaled)[0] # Get confidence (probability of predicted class) confidence = float(prediction_proba[prediction]) # Run diagnostic engine diagnostics = analyze_condition(data, prediction, confidence, features_df) # Prepare features dict for display features_dict = features_df.iloc[0].to_dict() # Detect anomalies anomalies = detect_anomalies(data, features_df) # Determine overall health overall_health = determine_overall_health(prediction, confidence, diagnostics.severity, len(anomalies)) # Format response return PredictionResponse( machine_id=request.machine_id, timestamp=datetime.now().isoformat(), prediction="FAILURE" if prediction == 1 else "HEALTHY", confidence=round(confidence, 4), diagnostics=diagnostics, features=features_dict, anomalies=anomalies, overall_health=overall_health ) except Exception as e: logger.error(f"Prediction error: {e}") raise HTTPException(status_code=500, detail=f"Prediction failed: {str(e)}") # ============================================================================ # ENDPOINT 2: BATCH PREDICTION (CSV UPLOAD) # ============================================================================ @app.post("/predict/batch") async def predict_batch(file: UploadFile = File(...)): """ Batch prediction from CSV upload Input: CSV file with columns matching PredictionRequest Output: CSV with added columns: Prediction, Probability, Primary_Cause, Sensor_Alert, Recommended_Action, Severity """ try: # Validate file type if not file.filename.endswith('.csv'): raise HTTPException(status_code=400, detail="File must be CSV format") # Read CSV contents = await file.read() df = pd.read_csv(io.StringIO(contents.decode('utf-8'))) # Normalize column names (handle case-insensitive, spaces, brackets, etc) df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_').str.replace('[', '_').str.replace(']', '').str.replace('__', '_').str.rstrip('_') # Create column mapping for different formats column_mapping = { 'air_temperature_k': 'air_temperature', 'process_temperature_k': 'process_temperature', 'rotational_speed_rpm': 'rotational_speed', 'torque_nm': 'torque', 'tool_wear_min': 'tool_wear' } # Apply mapping if columns have units df.rename(columns=column_mapping, inplace=True) # Validate required columns required_columns = [ 'air_temperature', 'process_temperature', 'rotational_speed', 'torque', 'tool_wear', 'type' ] missing_cols = [col for col in required_columns if col not in df.columns] if missing_cols: raise HTTPException( status_code=400, detail=f"Missing required columns: {missing_cols}. Found columns: {list(df.columns)}" ) # Initialize result columns predictions = [] probabilities = [] causes = [] alerts = [] actions = [] severities = [] # Process each row for idx, row in df.iterrows(): try: # Prepare data data = { 'machine_id': row.get('machine_id', f'MACHINE_{idx}'), 'air_temperature': float(row['air_temperature']), 'process_temperature': float(row['process_temperature']), 'rotational_speed': int(row['rotational_speed']), 'torque': float(row['torque']), 'tool_wear': int(row['tool_wear']), 'type': str(row['type']) } # Feature engineering features_df = engineer_features(data) # Scale and predict features_scaled = MODEL_PIPELINE['scaler'].transform(features_df) prediction = MODEL_PIPELINE['model'].predict(features_scaled)[0] prediction_proba = MODEL_PIPELINE['model'].predict_proba(features_scaled)[0] confidence = float(prediction_proba[prediction]) # Diagnostics diagnostics = analyze_condition(data, prediction, confidence, features_df) # Append results predictions.append("FAILURE" if prediction == 1 else "HEALTHY") probabilities.append(round(confidence, 4)) causes.append(diagnostics.primary_cause) alerts.append(diagnostics.sensor_alert) actions.append(diagnostics.recommended_action) severities.append(diagnostics.severity) except Exception as e: logger.warning(f"Row {idx} processing error: {e}") predictions.append("ERROR") probabilities.append(0.0) causes.append("Processing Error") alerts.append(str(e)) actions.append("Review input data") severities.append("UNKNOWN") # Add results to dataframe df['Prediction'] = predictions df['Confidence'] = probabilities df['Primary_Cause'] = causes df['Sensor_Alert'] = alerts df['Recommended_Action'] = actions df['Severity'] = severities df['Processed_At'] = datetime.now().isoformat() # Convert to CSV output = io.StringIO() df.to_csv(output, index=False) output.seek(0) # Return as downloadable file return StreamingResponse( iter([output.getvalue()]), media_type="text/csv", headers={ "Content-Disposition": f"attachment; filename=predictions_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" } ) except HTTPException: raise except Exception as e: logger.error(f"Batch prediction error: {e}") raise HTTPException(status_code=500, detail=f"Batch prediction failed: {str(e)}") # ============================================================================ # ENDPOINT 3: BATCH PREDICTION JSON (For UI Display) # ============================================================================ @app.post("/predict/batch/json") async def predict_batch_json(file: UploadFile = File(...)): """ Batch prediction from CSV upload - returns detailed JSON for UI display Input: CSV file with required columns Output: JSON with detailed predictions, features, and anomalies for each row """ try: # Validate file type if not file.filename.endswith('.csv'): raise HTTPException(status_code=400, detail="File must be CSV format") # Read CSV contents = await file.read() df = pd.read_csv(io.StringIO(contents.decode('utf-8'))) # Normalize column names df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_') # Validate required columns required_columns = [ 'air_temperature', 'process_temperature', 'rotational_speed', 'torque', 'tool_wear', 'type' ] missing_cols = [col for col in required_columns if col not in df.columns] if missing_cols: raise HTTPException( status_code=400, detail=f"Missing required columns: {missing_cols}. Found columns: {list(df.columns)}" ) # Limit batch size for JSON response (increased for better UX) if len(df) > 1000: raise HTTPException( status_code=400, detail=f"Batch JSON endpoint limited to 500 rows. Your file has {len(df)} rows. Use /predict/batch for larger files or download CSV results." ) results = [] # Process each row for idx, row in df.iterrows(): try: # Prepare data data = { 'machine_id': row.get('machine_id', f'MACHINE_{idx+1}'), 'air_temperature': float(row['air_temperature']), 'process_temperature': float(row['process_temperature']), 'rotational_speed': int(row['rotational_speed']), 'torque': float(row['torque']), 'tool_wear': int(row['tool_wear']), 'type': str(row['type']).strip().upper() } # Feature engineering features_df = engineer_features(data) # Scale and predict features_scaled = MODEL_PIPELINE['scaler'].transform(features_df) prediction = MODEL_PIPELINE['model'].predict(features_scaled)[0] prediction_proba = MODEL_PIPELINE['model'].predict_proba(features_scaled)[0] confidence = float(prediction_proba[prediction]) # Diagnostics diagnostics = analyze_condition(data, prediction, confidence, features_df) # Features dict features_dict = features_df.iloc[0].to_dict() # Detect anomalies anomalies = detect_anomalies(data, features_df) # Overall health overall_health = determine_overall_health(prediction, confidence, diagnostics.severity, len(anomalies)) # Append result results.append({ "row_number": idx + 1, "machine_id": data['machine_id'], "timestamp": datetime.now().isoformat(), "prediction": "FAILURE" if prediction == 1 else "HEALTHY", "confidence": round(confidence, 4), "diagnostics": { "primary_cause": diagnostics.primary_cause, "sensor_alert": diagnostics.sensor_alert, "recommended_action": diagnostics.recommended_action, "severity": diagnostics.severity }, "features": features_dict, "anomalies": [ { "parameter": a.parameter, "value": a.value, "normal_range": a.normal_range, "status": a.status, "explanation": a.explanation } for a in anomalies ], "overall_health": overall_health, "input_data": { "air_temperature": data['air_temperature'], "process_temperature": data['process_temperature'], "rotational_speed": data['rotational_speed'], "torque": data['torque'], "tool_wear": data['tool_wear'], "type": data['type'] } }) except Exception as e: logger.warning(f"Row {idx+1} processing error: {e}") results.append({ "row_number": idx + 1, "machine_id": row.get('machine_id', f'MACHINE_{idx+1}'), "timestamp": datetime.now().isoformat(), "prediction": "ERROR", "confidence": 0.0, "diagnostics": { "primary_cause": "Processing Error", "sensor_alert": str(e), "recommended_action": "Review input data format and values", "severity": "UNKNOWN" }, "error": str(e) }) # Summary statistics total = len(results) failures = sum(1 for r in results if r.get('prediction') == 'FAILURE') healthy = sum(1 for r in results if r.get('prediction') == 'HEALTHY') errors = sum(1 for r in results if r.get('prediction') == 'ERROR') return { "summary": { "total_records": total, "predictions": { "failure": failures, "healthy": healthy, "errors": errors }, "failure_rate": round(failures / total * 100, 2) if total > 0 else 0 }, "results": results, "processed_at": datetime.now().isoformat() } except HTTPException: raise except Exception as e: logger.error(f"Batch JSON prediction error: {e}") raise HTTPException(status_code=500, detail=f"Batch processing failed: {str(e)}") # ============================================================================ # HEALTH CHECK ENDPOINTS # ============================================================================ @app.get("/") async def root(): """API root endpoint""" return { "service": "Predictive Maintenance API", "version": "1.0.0", "status": "operational", "model_loaded": MODEL_PIPELINE is not None, "endpoints": { "single_prediction": "/predict", "batch_prediction": "/predict/batch", "health": "/health", "model_info": "/model/info" } } @app.get("/health") async def health_check(): """Health check endpoint""" return { "status": "healthy" if MODEL_PIPELINE is not None else "unhealthy", "timestamp": datetime.now().isoformat(), "model_loaded": MODEL_PIPELINE is not None, "model_repo": HF_MODEL_REPO } @app.get("/model/info") async def model_info(): """Get model information""" if MODEL_PIPELINE is None: raise HTTPException(status_code=503, detail="Model not loaded") return { "model_type": type(MODEL_PIPELINE['model']).__name__, "features": MODEL_PIPELINE['features'], "model_name": MODEL_PIPELINE.get('model_name', 'Unknown'), "performance": MODEL_PIPELINE.get('performance', {}), "training_config": MODEL_PIPELINE.get('training_config', {}), "random_state": MODEL_PIPELINE.get('random_state', None) } # ============================================================================ # MAIN # ============================================================================ if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=True, log_level="info" )