import pandas as pd import numpy as np import joblib import matplotlib.pyplot as plt import gradio as gr from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler from transformers import pipeline import os # variables df = None iso_forest = None sensor_cols = None explainer = None def find_data_file(): """Find the train_FD001.txt file in various possible locations""" possible_paths = [ 'CMaps/train_FD001.txt', 'train_FD001.txt', 'data/train_FD001.txt', 'C-MAPSS/train_FD001.txt', 'CMAPSS/train_FD001.txt', 'dataset/train_FD001.txt' ] for path in possible_paths: if os.path.exists(path): print(f" Found data file at: {path}") return path print("Files in current directory:") for file in os.listdir('.'): print(f" {file}") if os.path.exists('CMaps'): print("Files in CMaps directory:") for file in os.listdir('CMaps'): print(f" {file}") raise FileNotFoundError("Could not find train_FD001.txt in any expected location") def load_and_process_data(): """ Load and preprocess the NASA Turbofan dataset """ print("Loading and processing data...") data_path = find_data_file() df_raw = pd.read_csv(data_path, delim_whitespace=True, header=None, nrows=1) num_columns = len(df_raw.columns) print(f"Found {num_columns} columns in the dataset") # Standard NASA CMAPSS FD001 has id, cycle, op1, op2, op3, and then sensors so that's why i have allineged them like this if num_columns >= 26: # id, cycle, 3 ops, 21+ sensors columns = ['id', 'cycle', 'op1', 'op2', 'op3'] + [f'sensor{i}' for i in range(1, num_columns - 4)] elif num_columns >= 25: # id, cycle, 2 ops, sensors columns = ['id', 'cycle', 'op1', 'op2'] + [f'sensor{i}' for i in range(1, num_columns - 3)] elif num_columns >= 24: # id, cycle, 1 op, sensors columns = ['id', 'cycle', 'op1'] + [f'sensor{i}' for i in range(1, num_columns - 2)] else: # id, cycle, sensors (less common for FD001) columns = ['id', 'cycle'] + [f'sensor{i}' for i in range(1, num_columns - 1)] # Trim columns to actual number (safety check) columns = columns[:num_columns] df = pd.read_csv(data_path, delim_whitespace=True, header=None, names=columns) # The NASA data often has trailing spaces or extra NaN columns, drop them df = df.dropna(axis=1, how='all') sensor_cols = [col for col in df.columns if col.startswith('sensor')] print(f" Identified {len(sensor_cols)} sensor columns: {sensor_cols}") if len(sensor_cols) > 0: # transform with groupby correctly and ensuring numerical stability df[sensor_cols] = df.groupby('id')[sensor_cols].transform( lambda x: (x - x.mean()) / (x.std() + 1e-6) if x.std() > 1e-6 else x - x.mean() ) else: print(" Warning: No sensor columns found!") sensor_cols = [] print(f" Processed data shape: {df.shape}") return df, sensor_cols def load_processed_data(filepath='processed_data.csv'): """ Load processed data from CSV """ if not os.path.exists(filepath): return None, None try: df = pd.read_csv(filepath) except Exception as e: print(f" Error loading processed data from {filepath}: {e}") return None, None sensor_cols = [col for col in df.columns if col.startswith('sensor')] return df, sensor_cols def load_model(filepath='isolation_forest_model.pkl'): """ Load trained model from disk """ if not os.path.exists(filepath): return None try: model = joblib.load(filepath) print(f" Model loaded from {filepath}") return model except Exception as e: print(f" Error loading model from {filepath}: {e}") return None def train_isolation_forest(df, sensor_cols, contamination=0.02): # Reduced contamination for fewer false positives """ Train Isolation Forest model for anomaly detection """ print(" Training Isolation Forest model...") print(f" Using {len(sensor_cols)} sensor columns for training") print(f" Contamination rate: {contamination}") if len(sensor_cols) == 0: raise ValueError(" No sensor columns found for training") # Initialize and train the model iso_forest = IsolationForest( contamination=contamination, random_state=42, n_estimators=150, max_samples='auto' ) # Fit the model iso_forest.fit(df[sensor_cols]) # Add predictions to dataframe df['anomaly'] = iso_forest.predict(df[sensor_cols]) df['anomaly_score'] = iso_forest.decision_function(df[sensor_cols]) # Show statistics if 'anomaly' in df.columns: anomaly_count = (df['anomaly'] == -1).sum() normal_count = (df['anomaly'] == 1).sum() print(f" Anomalies detected: {anomaly_count} ({anomaly_count/len(df)*100:.1f}%)") print(f" Normal readings: {normal_count} ({normal_count/len(df)*100:.1f}%)") else: print(" Warning: 'anomaly' column not found in df after prediction.") print(" Model training completed!") return iso_forest, df def initialize_app(): """ Initialize the application by loading data and model """ global df, iso_forest, sensor_cols, explainer print(" Initializing FIFO Mining Predictor...") try: df, sensor_cols = load_processed_data('processed_data.csv') if df is None: print(" Processed data not found. Creating from raw data...") df, sensor_cols = load_and_process_data() df.to_csv('processed_data.csv', index=False) print(" Processed data saved.") # Safety check after loading/processing if df is None or df.empty: print(" Failed to load or process data.") return False # load existing model iso_forest = load_model('isolation_forest_model.pkl') # If model doesn't exist, train it if iso_forest is None: print(" Model not found. Training new model...") # Use the potentially lower contamination rate for retraining if needed iso_forest_trained, df_updated = train_isolation_forest(df, sensor_cols, contamination=0.02) joblib.dump(iso_forest_trained, 'isolation_forest_model.pkl') df_updated.to_csv('processed_data.csv', index=False) iso_forest = iso_forest_trained df = df_updated print(" Model trained and saved.") # Ensure anomaly scores are present upon initialization if 'anomaly_score' not in df.columns and iso_forest is not None and sensor_cols is not None: print(" Re-calculating anomaly scores...") df['anomaly'] = iso_forest.predict(df[sensor_cols]) df['anomaly_score'] = iso_forest.decision_function(df[sensor_cols]) df.to_csv('processed_data.csv', index=False) print(" Anomaly scores updated in processed data.") # Initialize Gen AI try: print(" Loading Gen AI model...") explainer = pipeline("text2text-generation", model="google/flan-t5-small") print(" Gen AI model loaded successfully.") except Exception as e: print(f" Gen AI model not available: {e}") explainer = None # Ensure it's None if loading fails print(" Application initialized successfully!") return True except Exception as e: print(f" Error initializing application: {e}") import traceback traceback.print_exc() return False def generate_insight(engine_id, cycle, anomaly_score, top_sensors): """Generate AI explanation for the anomaly""" if explainer is not None: try: if anomaly_score < -0.7: risk_desc = "high risk" elif anomaly_score < 0: risk_desc = "moderate risk" else: risk_desc = "normal operation" prompt = f""" Mining equipment shows {risk_desc}. ID: {engine_id}, cycle: {cycle}. Score: {anomaly_score:.3f}. Sensors: {', '.join(top_sensors[:2])}. Brief maintenance recommendation in 1-2 sentences. """ result = explainer( prompt, max_length=80, num_return_sequences=1, do_sample=False, truncation=True ) return result[0]['generated_text'].strip() except Exception as e: print(f" Gen AI error: {e}") pass if anomaly_score < -0.7: return "Critical risk detected. Immediate inspection required. Check for mechanical wear or overheating." elif anomaly_score < -0.5: # Medium threshold return "Moderate risk detected. Schedule inspection within 48 hours. Monitor vibration and temperature." elif anomaly_score < 0: # Low threshold return "Low risk anomaly detected. Increase monitoring frequency. Review operational parameters." else: return "Equipment operating normally. Continue routine monitoring schedule." def predict_failure(engine_id): """Main prediction function with better risk assessment""" global df, iso_forest, sensor_cols if df is None or df.empty or sensor_cols is None or len(sensor_cols) == 0 or iso_forest is None: return " Application not properly initialized. Data or model is missing.", None unique_ids = df['id'].unique() if engine_id not in unique_ids: s sample_ids = sorted(unique_ids)[:10] # Show first 10 sample_str = ", ".join(map(str, sample_ids)) if len(unique_ids) > 10: sample_str += ", ..." return f" Truck ID {engine_id} not found.\nAvailable IDs (first 10): {sample_str}", None engine_data = df[df['id'] == engine_id].tail(1) if engine_data.empty: return " No data found for this truck ID.", None try: cycle = int(engine_data['cycle'].iloc[0]) anomaly_score = float(engine_data['anomaly_score'].iloc[0]) except (IndexError, KeyError, ValueError, TypeError) as e: return f" Error retrieving data for Truck ID {engine_id}: {e}", None # Get top abnormal sensors try: sens_vals = engine_data[sensor_cols].iloc[0].abs().sort_values(ascending=False).head(5).index.tolist() except Exception as e: return f" Error analyzing sensor data for Truck ID {engine_id}: {e}", None # Generate AI explanation insight = generate_insight(engine_id, cycle, anomaly_score, sens_vals) try: all_scores = df['anomaly_score'].dropna().values if len(all_scores) == 0: raise ValueError("No anomaly scores found in data.") high_threshold = np.percentile(all_scores, 1) medium_threshold = np.percentile(all_scores, 5) except Exception as e: print(f" Warning: Could not calculate percentiles, using fallback thresholds: {e}") high_threshold = -0.3 medium_threshold = -0.1 # Risk level with better thresholds if anomaly_score <= high_threshold: risk_level = " HIGH RISK" action = " **IMMEDIATE INSPECTION REQUIRED**" elif anomaly_score <= medium_threshold: risk_level = " MEDIUM RISK" action = " **SCHEDULE INSPECTION SOON**" else: risk_level = " LOW RISK" action = " Equipment operating normally" result = f""" ## 🔧 FIFO Mining Equipment Failure Predictor ### Equipment Status - **Truck ID:** `{int(engine_id)}` - **Current Cycle:** `{cycle}` - **Anomaly Score:** `{anomaly_score:.3f}` - **Risk Assessment:** **{risk_level}** --- ### AI Maintenance Recommendation > {insight} ### Top Abnormal Sensors 1. `{sens_vals[0]}` 2. `{sens_vals[1]}` 3. `{sens_vals[2]}` ### Action Priority {action} """ # Create visualization try: fig, axes = plt.subplots(2, 2, figsize=(15, 10)) fig.suptitle(f'Mining Truck {int(engine_id)} - Health Analysis', fontsize=16, fontweight='bold') engine_hist = df[df['id'] == engine_id] # Plot 1: Key sensor trends (handle potential index errors) if len(sensor_cols) > 0: axes[0, 0].plot(engine_hist['cycle'], engine_hist[sensor_cols[0]], label=f'{sensor_cols[0]}', linewidth=2) if len(sensor_cols) > 3: axes[0, 0].plot(engine_hist['cycle'], engine_hist[sensor_cols[3]], label=f'{sensor_cols[3]}', linewidth=2) if len(sensor_cols) > 6: axes[0, 0].plot(engine_hist['cycle'], engine_hist[sensor_cols[6]], label=f'{sensor_cols[6]}', linewidth=2) axes[0, 0].set_title('Sensor Trends') axes[0, 0].set_xlabel('Cycle') axes[0, 0].set_ylabel('Normalized Value') axes[0, 0].legend() axes[0, 0].grid(True, alpha=0.3) # Plot 2: Anomaly score trend with thresholds axes[0, 1].plot(engine_hist['cycle'], engine_hist['anomaly_score'], 'b-', linewidth=2, label='Current Score') axes[0, 1].axhline(y=high_threshold, color='r', linestyle='--', alpha=0.7, label=f'High Risk ({high_threshold:.3f})') axes[0, 1].axhline(y=medium_threshold, color='orange', linestyle='--', alpha=0.7, label=f'Medium Risk ({medium_threshold:.3f})') axes[0, 1].axhline(y=0, color='g', linestyle='-', alpha=0.5, label='Normal') axes[0, 1].set_title('Anomaly Score Over Time') axes[0, 1].set_xlabel('Cycle') axes[0, 1].set_ylabel('Anomaly Score') axes[0, 1].legend() axes[0, 1].grid(True, alpha=0.3) # Plot 3: Current sensor values (top 6) if len(sens_vals) >= 1: # Need at least one num_bars = min(6, len(sens_vals)) current_values = engine_data[sens_vals[:num_bars]].iloc[0].values bar_colors = ['red' if x <= high_threshold else 'orange' if x <= medium_threshold else 'green' for x in current_values] axes[1, 0].bar(range(num_bars), current_values, color=bar_colors) axes[1, 0].set_title('Current Top Abnormal Sensors') axes[1, 0].set_xticks(range(num_bars)) axes[1, 0].set_xticklabels([s.replace('sensor', 'S') for s in sens_vals[:num_bars]], rotation=45) axes[1, 0].set_ylabel('Normalized Value') axes[1, 0].grid(True, alpha=0.3) # Plot 4: Risk distribution axes[1, 1].hist(all_scores, bins=50, alpha=0.7, color='lightblue', edgecolor='black', linewidth=0.5) axes[1, 1].axvline(x=anomaly_score, color='red', linestyle='--', linewidth=2, label=f'Truck {engine_id}: {anomaly_score:.3f}') axes[1, 1].axvline(x=high_threshold, color='r', linestyle=':', alpha=0.7, label=f'High Risk Threshold') axes[1, 1].axvline(x=medium_threshold, color='orange', linestyle=':', alpha=0.7, label=f'Medium Risk Threshold') axes[1, 1].set_title('Anomaly Score Distribution') axes[1, 1].set_xlabel('Anomaly Score') axes[1, 1].set_ylabel('Frequency') axes[1, 1].legend() axes[1, 1].grid(True, alpha=0.3) plt.tight_layout() except Exception as e: print(f" Error creating plot: {e}") return result, None return result, fig # --- Main Application Logic --- # Initialize the app app_initialized = False try: print("=== Starting Initialization Process ===") app_initialized = initialize_app() print("=== Initialization Process Complete ===") except Exception as e: print(f" Critical error during initialization: {e}") import traceback traceback.print_exc() # --- Debug Information --- print("\n=== POST-INITIALIZATION DEBUG INFO ===") print(f"app_initialized: {app_initialized}") if df is not None and not df.empty: print(f" Data loaded successfully. Shape: {df.shape}") print(f" Columns: {list(df.columns)}") if 'id' in df.columns: unique_ids = sorted(df['id'].dropna().unique()) print(f" Unique Truck IDs found: {len(unique_ids)} (Min: {int(min(unique_ids)) if len(unique_ids) > 0 else 'N/A'}, Max: {int(max(unique_ids)) if len(unique_ids) > 0 else 'N/A'})") print(f" First 10 IDs: {list(map(int, unique_ids[:10]))}") else: print(" 'id' column is missing!") if 'anomaly_score' in df.columns: try: print(f" Anomaly scores range: [{df['anomaly_score'].min():.3f}, {df['anomaly_score'].max():.3f}]") except: print(" Error calculating anomaly score range.") else: print(" 'anomaly_score' column is missing - model might not have trained correctly.") sensor_cols_debug = [col for col in df.columns if col.startswith('sensor')] print(f" Sensor columns identified: {len(sensor_cols_debug)}") else: print(" Data (df) failed to load or is empty after initialization.") print("=======================================\n") # --- Gradio Interface Creation --- if app_initialized and df is not None and not df.empty: safe_min_id = 1 safe_max_id = 100 try: if 'id' in df.columns and not df['id'].empty: unique_ids = df['id'].dropna().unique() if len(unique_ids) > 0: calculated_min_id = int(min(unique_ids)) calculated_max_id = int(max(unique_ids)) # Apply sanity checks if calculated_min_id > 0 and calculated_max_id >= calculated_min_id: safe_min_id = calculated_min_id safe_max_id = calculated_max_id print(f" Setting interface ID range: {safe_min_id}-{safe_max_id}") else: print(f" Calculated ID range [{calculated_min_id}, {calculated_max_id}] seems invalid, using defaults 1-100") else: print(" No unique IDs found in data, using defaults 1-100") else: print(" 'id' column not found in data, using defaults 1-100") except Exception as e: print(f" Error calculating ID range: {e}, using defaults 1-100") # --- End of max_truck_id calculation --- print(f" Creating main Gradio interface with ID range {safe_min_id}-{safe_max_id}") demo = gr.Interface( fn=predict_failure, inputs=gr.Number( label="⛏️ Enter Mining Truck ID", value=safe_min_id, # Start with the actual minimum ID found in data minimum=safe_min_id, maximum=safe_max_id, step=1 ), outputs=[ gr.Markdown(label=" Failure Prediction & AI Insights"), gr.Plot(label=" Equipment Health Dashboard") # Handle potential None plots gracefully ], title=" FIFO Mining Equipment Failure Predictor", description=""" AI-powered predictive maintenance using unsupervised learning + Generative AI. Detects equipment anomalies before failures occur to prevent costly downtime. """, examples=[[safe_min_id], [min(safe_min_id + 4, safe_max_id)], [min(safe_min_id + 9, safe_max_id)]], # Dynamic examples based on actual data range theme=gr.themes.Soft(primary_hue="blue", secondary_hue="cyan") # Updated theme syntax for newer Gradio versions ) else: print(" Creating fallback Gradio interface") # Fallback interface def error_message(truck_id): return " Application failed to initialize correctly. Please check the console logs and data files.", None demo = gr.Interface( fn=error_message, inputs=gr.Number(label="⛏️ Enter Mining Truck ID", value=1), outputs=[gr.Markdown(label="Error"), gr.Plot(label="Plot")], title=" FIFO Mining Predictor - Initialization Error", description="Failed to load data or model. Check file paths and data format.", theme=gr.themes.Soft(primary_hue="red", secondary_hue="pink") ) # For local development / Hugging Face Spaces if __name__ == "__main__": print(" Starting FIFO Mining Equipment Failure Predictor...") demo.launch()