solfedge's picture
Update app.py
65619ad verified
import pandas as pd
import numpy as np
import joblib
import matplotlib.pyplot as plt
import gradio as gr
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from transformers import pipeline
import os
# variables
df = None
iso_forest = None
sensor_cols = None
explainer = None
def find_data_file():
"""Find the train_FD001.txt file in various possible locations"""
possible_paths = [
'CMaps/train_FD001.txt',
'train_FD001.txt',
'data/train_FD001.txt',
'C-MAPSS/train_FD001.txt',
'CMAPSS/train_FD001.txt',
'dataset/train_FD001.txt'
]
for path in possible_paths:
if os.path.exists(path):
print(f" Found data file at: {path}")
return path
print("Files in current directory:")
for file in os.listdir('.'):
print(f" {file}")
if os.path.exists('CMaps'):
print("Files in CMaps directory:")
for file in os.listdir('CMaps'):
print(f" {file}")
raise FileNotFoundError("Could not find train_FD001.txt in any expected location")
def load_and_process_data():
"""
Load and preprocess the NASA Turbofan dataset
"""
print("Loading and processing data...")
data_path = find_data_file()
df_raw = pd.read_csv(data_path, delim_whitespace=True, header=None, nrows=1)
num_columns = len(df_raw.columns)
print(f"Found {num_columns} columns in the dataset")
# Standard NASA CMAPSS FD001 has id, cycle, op1, op2, op3, and then sensors so that's why i have allineged them like this
if num_columns >= 26: # id, cycle, 3 ops, 21+ sensors
columns = ['id', 'cycle', 'op1', 'op2', 'op3'] + [f'sensor{i}' for i in range(1, num_columns - 4)]
elif num_columns >= 25: # id, cycle, 2 ops, sensors
columns = ['id', 'cycle', 'op1', 'op2'] + [f'sensor{i}' for i in range(1, num_columns - 3)]
elif num_columns >= 24: # id, cycle, 1 op, sensors
columns = ['id', 'cycle', 'op1'] + [f'sensor{i}' for i in range(1, num_columns - 2)]
else: # id, cycle, sensors (less common for FD001)
columns = ['id', 'cycle'] + [f'sensor{i}' for i in range(1, num_columns - 1)]
# Trim columns to actual number (safety check)
columns = columns[:num_columns]
df = pd.read_csv(data_path, delim_whitespace=True, header=None, names=columns)
# The NASA data often has trailing spaces or extra NaN columns, drop them
df = df.dropna(axis=1, how='all')
sensor_cols = [col for col in df.columns if col.startswith('sensor')]
print(f" Identified {len(sensor_cols)} sensor columns: {sensor_cols}")
if len(sensor_cols) > 0:
# transform with groupby correctly and ensuring numerical stability
df[sensor_cols] = df.groupby('id')[sensor_cols].transform(
lambda x: (x - x.mean()) / (x.std() + 1e-6) if x.std() > 1e-6 else x - x.mean()
)
else:
print(" Warning: No sensor columns found!")
sensor_cols = []
print(f" Processed data shape: {df.shape}")
return df, sensor_cols
def load_processed_data(filepath='processed_data.csv'):
"""
Load processed data from CSV
"""
if not os.path.exists(filepath):
return None, None
try:
df = pd.read_csv(filepath)
except Exception as e:
print(f" Error loading processed data from {filepath}: {e}")
return None, None
sensor_cols = [col for col in df.columns if col.startswith('sensor')]
return df, sensor_cols
def load_model(filepath='isolation_forest_model.pkl'):
"""
Load trained model from disk
"""
if not os.path.exists(filepath):
return None
try:
model = joblib.load(filepath)
print(f" Model loaded from {filepath}")
return model
except Exception as e:
print(f" Error loading model from {filepath}: {e}")
return None
def train_isolation_forest(df, sensor_cols, contamination=0.02): # Reduced contamination for fewer false positives
"""
Train Isolation Forest model for anomaly detection
"""
print(" Training Isolation Forest model...")
print(f" Using {len(sensor_cols)} sensor columns for training")
print(f" Contamination rate: {contamination}")
if len(sensor_cols) == 0:
raise ValueError(" No sensor columns found for training")
# Initialize and train the model
iso_forest = IsolationForest(
contamination=contamination,
random_state=42,
n_estimators=150,
max_samples='auto'
)
# Fit the model
iso_forest.fit(df[sensor_cols])
# Add predictions to dataframe
df['anomaly'] = iso_forest.predict(df[sensor_cols])
df['anomaly_score'] = iso_forest.decision_function(df[sensor_cols])
# Show statistics
if 'anomaly' in df.columns:
anomaly_count = (df['anomaly'] == -1).sum()
normal_count = (df['anomaly'] == 1).sum()
print(f" Anomalies detected: {anomaly_count} ({anomaly_count/len(df)*100:.1f}%)")
print(f" Normal readings: {normal_count} ({normal_count/len(df)*100:.1f}%)")
else:
print(" Warning: 'anomaly' column not found in df after prediction.")
print(" Model training completed!")
return iso_forest, df
def initialize_app():
"""
Initialize the application by loading data and model
"""
global df, iso_forest, sensor_cols, explainer
print(" Initializing FIFO Mining Predictor...")
try:
df, sensor_cols = load_processed_data('processed_data.csv')
if df is None:
print(" Processed data not found. Creating from raw data...")
df, sensor_cols = load_and_process_data()
df.to_csv('processed_data.csv', index=False)
print(" Processed data saved.")
# Safety check after loading/processing
if df is None or df.empty:
print(" Failed to load or process data.")
return False
# load existing model
iso_forest = load_model('isolation_forest_model.pkl')
# If model doesn't exist, train it
if iso_forest is None:
print(" Model not found. Training new model...")
# Use the potentially lower contamination rate for retraining if needed
iso_forest_trained, df_updated = train_isolation_forest(df, sensor_cols, contamination=0.02)
joblib.dump(iso_forest_trained, 'isolation_forest_model.pkl')
df_updated.to_csv('processed_data.csv', index=False)
iso_forest = iso_forest_trained
df = df_updated
print(" Model trained and saved.")
# Ensure anomaly scores are present upon initialization
if 'anomaly_score' not in df.columns and iso_forest is not None and sensor_cols is not None:
print(" Re-calculating anomaly scores...")
df['anomaly'] = iso_forest.predict(df[sensor_cols])
df['anomaly_score'] = iso_forest.decision_function(df[sensor_cols])
df.to_csv('processed_data.csv', index=False)
print(" Anomaly scores updated in processed data.")
# Initialize Gen AI
try:
print(" Loading Gen AI model...")
explainer = pipeline("text2text-generation", model="google/flan-t5-small")
print(" Gen AI model loaded successfully.")
except Exception as e:
print(f" Gen AI model not available: {e}")
explainer = None # Ensure it's None if loading fails
print(" Application initialized successfully!")
return True
except Exception as e:
print(f" Error initializing application: {e}")
import traceback
traceback.print_exc()
return False
def generate_insight(engine_id, cycle, anomaly_score, top_sensors):
"""Generate AI explanation for the anomaly"""
if explainer is not None:
try:
if anomaly_score < -0.7:
risk_desc = "high risk"
elif anomaly_score < 0:
risk_desc = "moderate risk"
else:
risk_desc = "normal operation"
prompt = f"""
Mining equipment shows {risk_desc}. ID: {engine_id}, cycle: {cycle}.
Score: {anomaly_score:.3f}. Sensors: {', '.join(top_sensors[:2])}.
Brief maintenance recommendation in 1-2 sentences.
"""
result = explainer(
prompt,
max_length=80,
num_return_sequences=1,
do_sample=False,
truncation=True
)
return result[0]['generated_text'].strip()
except Exception as e:
print(f" Gen AI error: {e}")
pass
if anomaly_score < -0.7:
return "Critical risk detected. Immediate inspection required. Check for mechanical wear or overheating."
elif anomaly_score < -0.5: # Medium threshold
return "Moderate risk detected. Schedule inspection within 48 hours. Monitor vibration and temperature."
elif anomaly_score < 0: # Low threshold
return "Low risk anomaly detected. Increase monitoring frequency. Review operational parameters."
else:
return "Equipment operating normally. Continue routine monitoring schedule."
def predict_failure(engine_id):
"""Main prediction function with better risk assessment"""
global df, iso_forest, sensor_cols
if df is None or df.empty or sensor_cols is None or len(sensor_cols) == 0 or iso_forest is None:
return " Application not properly initialized. Data or model is missing.", None
unique_ids = df['id'].unique()
if engine_id not in unique_ids:
s
sample_ids = sorted(unique_ids)[:10] # Show first 10
sample_str = ", ".join(map(str, sample_ids))
if len(unique_ids) > 10:
sample_str += ", ..."
return f" Truck ID {engine_id} not found.\nAvailable IDs (first 10): {sample_str}", None
engine_data = df[df['id'] == engine_id].tail(1)
if engine_data.empty:
return " No data found for this truck ID.", None
try:
cycle = int(engine_data['cycle'].iloc[0])
anomaly_score = float(engine_data['anomaly_score'].iloc[0])
except (IndexError, KeyError, ValueError, TypeError) as e:
return f" Error retrieving data for Truck ID {engine_id}: {e}", None
# Get top abnormal sensors
try:
sens_vals = engine_data[sensor_cols].iloc[0].abs().sort_values(ascending=False).head(5).index.tolist()
except Exception as e:
return f" Error analyzing sensor data for Truck ID {engine_id}: {e}", None
# Generate AI explanation
insight = generate_insight(engine_id, cycle, anomaly_score, sens_vals)
try:
all_scores = df['anomaly_score'].dropna().values
if len(all_scores) == 0:
raise ValueError("No anomaly scores found in data.")
high_threshold = np.percentile(all_scores, 1)
medium_threshold = np.percentile(all_scores, 5)
except Exception as e:
print(f" Warning: Could not calculate percentiles, using fallback thresholds: {e}")
high_threshold = -0.3
medium_threshold = -0.1
# Risk level with better thresholds
if anomaly_score <= high_threshold:
risk_level = " HIGH RISK"
action = " **IMMEDIATE INSPECTION REQUIRED**"
elif anomaly_score <= medium_threshold:
risk_level = " MEDIUM RISK"
action = " **SCHEDULE INSPECTION SOON**"
else:
risk_level = " LOW RISK"
action = " Equipment operating normally"
result = f"""
## 🔧 FIFO Mining Equipment Failure Predictor
### Equipment Status
- **Truck ID:** `{int(engine_id)}`
- **Current Cycle:** `{cycle}`
- **Anomaly Score:** `{anomaly_score:.3f}`
- **Risk Assessment:** **{risk_level}**
---
### AI Maintenance Recommendation
> {insight}
### Top Abnormal Sensors
1. `{sens_vals[0]}`
2. `{sens_vals[1]}`
3. `{sens_vals[2]}`
### Action Priority
{action}
"""
# Create visualization
try:
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle(f'Mining Truck {int(engine_id)} - Health Analysis', fontsize=16, fontweight='bold')
engine_hist = df[df['id'] == engine_id]
# Plot 1: Key sensor trends (handle potential index errors)
if len(sensor_cols) > 0:
axes[0, 0].plot(engine_hist['cycle'], engine_hist[sensor_cols[0]], label=f'{sensor_cols[0]}', linewidth=2)
if len(sensor_cols) > 3:
axes[0, 0].plot(engine_hist['cycle'], engine_hist[sensor_cols[3]], label=f'{sensor_cols[3]}', linewidth=2)
if len(sensor_cols) > 6:
axes[0, 0].plot(engine_hist['cycle'], engine_hist[sensor_cols[6]], label=f'{sensor_cols[6]}', linewidth=2)
axes[0, 0].set_title('Sensor Trends')
axes[0, 0].set_xlabel('Cycle')
axes[0, 0].set_ylabel('Normalized Value')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Plot 2: Anomaly score trend with thresholds
axes[0, 1].plot(engine_hist['cycle'], engine_hist['anomaly_score'], 'b-', linewidth=2, label='Current Score')
axes[0, 1].axhline(y=high_threshold, color='r', linestyle='--', alpha=0.7, label=f'High Risk ({high_threshold:.3f})')
axes[0, 1].axhline(y=medium_threshold, color='orange', linestyle='--', alpha=0.7, label=f'Medium Risk ({medium_threshold:.3f})')
axes[0, 1].axhline(y=0, color='g', linestyle='-', alpha=0.5, label='Normal')
axes[0, 1].set_title('Anomaly Score Over Time')
axes[0, 1].set_xlabel('Cycle')
axes[0, 1].set_ylabel('Anomaly Score')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Plot 3: Current sensor values (top 6)
if len(sens_vals) >= 1: # Need at least one
num_bars = min(6, len(sens_vals))
current_values = engine_data[sens_vals[:num_bars]].iloc[0].values
bar_colors = ['red' if x <= high_threshold else 'orange' if x <= medium_threshold else 'green' for x in current_values]
axes[1, 0].bar(range(num_bars), current_values, color=bar_colors)
axes[1, 0].set_title('Current Top Abnormal Sensors')
axes[1, 0].set_xticks(range(num_bars))
axes[1, 0].set_xticklabels([s.replace('sensor', 'S') for s in sens_vals[:num_bars]], rotation=45)
axes[1, 0].set_ylabel('Normalized Value')
axes[1, 0].grid(True, alpha=0.3)
# Plot 4: Risk distribution
axes[1, 1].hist(all_scores, bins=50, alpha=0.7, color='lightblue', edgecolor='black', linewidth=0.5)
axes[1, 1].axvline(x=anomaly_score, color='red', linestyle='--', linewidth=2, label=f'Truck {engine_id}: {anomaly_score:.3f}')
axes[1, 1].axvline(x=high_threshold, color='r', linestyle=':', alpha=0.7, label=f'High Risk Threshold')
axes[1, 1].axvline(x=medium_threshold, color='orange', linestyle=':', alpha=0.7, label=f'Medium Risk Threshold')
axes[1, 1].set_title('Anomaly Score Distribution')
axes[1, 1].set_xlabel('Anomaly Score')
axes[1, 1].set_ylabel('Frequency')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
except Exception as e:
print(f" Error creating plot: {e}")
return result, None
return result, fig
# --- Main Application Logic ---
# Initialize the app
app_initialized = False
try:
print("=== Starting Initialization Process ===")
app_initialized = initialize_app()
print("=== Initialization Process Complete ===")
except Exception as e:
print(f" Critical error during initialization: {e}")
import traceback
traceback.print_exc()
# --- Debug Information ---
print("\n=== POST-INITIALIZATION DEBUG INFO ===")
print(f"app_initialized: {app_initialized}")
if df is not None and not df.empty:
print(f" Data loaded successfully. Shape: {df.shape}")
print(f" Columns: {list(df.columns)}")
if 'id' in df.columns:
unique_ids = sorted(df['id'].dropna().unique())
print(f" Unique Truck IDs found: {len(unique_ids)} (Min: {int(min(unique_ids)) if len(unique_ids) > 0 else 'N/A'}, Max: {int(max(unique_ids)) if len(unique_ids) > 0 else 'N/A'})")
print(f" First 10 IDs: {list(map(int, unique_ids[:10]))}")
else:
print(" 'id' column is missing!")
if 'anomaly_score' in df.columns:
try:
print(f" Anomaly scores range: [{df['anomaly_score'].min():.3f}, {df['anomaly_score'].max():.3f}]")
except:
print(" Error calculating anomaly score range.")
else:
print(" 'anomaly_score' column is missing - model might not have trained correctly.")
sensor_cols_debug = [col for col in df.columns if col.startswith('sensor')]
print(f" Sensor columns identified: {len(sensor_cols_debug)}")
else:
print(" Data (df) failed to load or is empty after initialization.")
print("=======================================\n")
# --- Gradio Interface Creation ---
if app_initialized and df is not None and not df.empty:
safe_min_id = 1
safe_max_id = 100
try:
if 'id' in df.columns and not df['id'].empty:
unique_ids = df['id'].dropna().unique()
if len(unique_ids) > 0:
calculated_min_id = int(min(unique_ids))
calculated_max_id = int(max(unique_ids))
# Apply sanity checks
if calculated_min_id > 0 and calculated_max_id >= calculated_min_id:
safe_min_id = calculated_min_id
safe_max_id = calculated_max_id
print(f" Setting interface ID range: {safe_min_id}-{safe_max_id}")
else:
print(f" Calculated ID range [{calculated_min_id}, {calculated_max_id}] seems invalid, using defaults 1-100")
else:
print(" No unique IDs found in data, using defaults 1-100")
else:
print(" 'id' column not found in data, using defaults 1-100")
except Exception as e:
print(f" Error calculating ID range: {e}, using defaults 1-100")
# --- End of max_truck_id calculation ---
print(f" Creating main Gradio interface with ID range {safe_min_id}-{safe_max_id}")
demo = gr.Interface(
fn=predict_failure,
inputs=gr.Number(
label="⛏️ Enter Mining Truck ID",
value=safe_min_id, # Start with the actual minimum ID found in data
minimum=safe_min_id,
maximum=safe_max_id,
step=1
),
outputs=[
gr.Markdown(label=" Failure Prediction & AI Insights"),
gr.Plot(label=" Equipment Health Dashboard") # Handle potential None plots gracefully
],
title=" FIFO Mining Equipment Failure Predictor",
description="""
AI-powered predictive maintenance using unsupervised learning + Generative AI.
Detects equipment anomalies before failures occur to prevent costly downtime.
""",
examples=[[safe_min_id], [min(safe_min_id + 4, safe_max_id)], [min(safe_min_id + 9, safe_max_id)]], # Dynamic examples based on actual data range
theme=gr.themes.Soft(primary_hue="blue", secondary_hue="cyan") # Updated theme syntax for newer Gradio versions
)
else:
print(" Creating fallback Gradio interface")
# Fallback interface
def error_message(truck_id):
return " Application failed to initialize correctly. Please check the console logs and data files.", None
demo = gr.Interface(
fn=error_message,
inputs=gr.Number(label="⛏️ Enter Mining Truck ID", value=1),
outputs=[gr.Markdown(label="Error"), gr.Plot(label="Plot")],
title=" FIFO Mining Predictor - Initialization Error",
description="Failed to load data or model. Check file paths and data format.",
theme=gr.themes.Soft(primary_hue="red", secondary_hue="pink")
)
# For local development / Hugging Face Spaces
if __name__ == "__main__":
print(" Starting FIFO Mining Equipment Failure Predictor...")
demo.launch()