pranshh's picture
Update modules/machine_failure.py
5dfbf82 verified
raw
history blame
15.7 kB
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, session
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.utils
import json
import os
import joblib
from datetime import datetime
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import random
machine_failure_bp = Blueprint('machine_failure', __name__, url_prefix='/predict/machine_failure')
UPLOAD_FOLDER = 'temp_uploads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
def get_current_df():
try:
csv_path = session.get('machine_csv_path')
print(f"Debug - CSV Path from session: {csv_path}")
if csv_path and os.path.exists(csv_path):
print(f"Debug - File exists at path: {csv_path}")
return pd.read_csv(csv_path)
print("Debug - No valid CSV path found")
return None
except Exception as e:
print(f"Debug - Error in get_current_df: {str(e)}")
return None
def get_summary_stats(df):
return {
'total_rows': len(df),
'total_columns': len(df.columns),
'columns': list(df.columns),
'numeric_columns': list(df.select_dtypes(include=[np.number]).columns),
'categorical_columns': list(df.select_dtypes(exclude=[np.number]).columns),
'missing_values': df.isnull().sum().to_dict()
}
def preprocess_data(df, for_prediction=False, label_encoders=None):
"""Preprocess the dataframe for machine learning
Args:
df (pd.DataFrame): The input DataFrame.
for_prediction (bool): True if preprocessing for a single prediction, False for training.
label_encoders (dict): Dictionary of pre-fitted LabelEncoders for single prediction.
"""
df_processed = df.copy()
categorical_columns = []
numerical_columns = []
# Dynamically determine column types based on the current DataFrame
for column in df_processed.columns:
if column in ['timestamp', 'maintenance_timestamp']:
continue
# Check if column is numeric or can be converted to numeric (after dropping NaNs for check)
if pd.api.types.is_numeric_dtype(df_processed[column]):
numerical_columns.append(column)
else:
try:
# Attempt to convert to numeric, if successful, it's numeric
if pd.to_numeric(df_processed[column].dropna()).notna().all():
numerical_columns.append(column)
else:
categorical_columns.append(column)
except ValueError:
categorical_columns.append(column)
# Handle timestamps
for time_col in ['timestamp', 'maintenance_timestamp']:
if time_col in df_processed.columns:
# Convert to datetime, coercing errors
df_processed[time_col] = pd.to_datetime(df_processed[time_col], errors='coerce')
# Extract features only if the column contains valid datetime values
if not df_processed[time_col].isnull().all():
df_processed[f'{time_col}_hour'] = df_processed[time_col].dt.hour.fillna(0) # Fill NaN with 0 if time part is missing
df_processed[f'{time_col}_day'] = df_processed[time_col].dt.day.fillna(0)
df_processed[f'{time_col}_month'] = df_processed[time_col].dt.month.fillna(0)
else:
df_processed[f'{time_col}_hour'] = 0
df_processed[f'{time_col}_day'] = 0
df_processed[f'{time_col}_month'] = 0
df_processed = df_processed.drop(columns=[time_col])
# Encode categorical variables
current_label_encoders = {}
if not for_prediction: # During training, fit and save encoders
for col in categorical_columns:
if col in df_processed.columns:
le = LabelEncoder()
df_processed[col] = le.fit_transform(df_processed[col].astype(str))
current_label_encoders[col] = le
else:
for col, le in label_encoders.items():
if col in df_processed.columns:
# Use a lambda function to handle unseen labels: map to -1 or a default if not in classes
df_processed[col] = df_processed[col].astype(str).apply(
lambda x: le.transform([x])[0] if x in le.classes_ else -1
)
return df_processed, current_label_encoders
@machine_failure_bp.route('/', methods=['GET'])
def show_machine_failure():
return render_template('machine_failure.html', title="Machine Failure Prediction")
@machine_failure_bp.route('/upload_file', methods=['POST'])
def upload_file_machine():
if 'machine_file' not in request.files:
flash('No file selected')
return redirect(url_for('machine_failure.show_machine_failure'))
file = request.files['machine_file']
if file.filename == '':
flash('No file selected')
return redirect(url_for('machine_failure.show_machine_failure'))
try:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
safe_filename = f"machine_data_{timestamp}.csv"
file_path = os.path.join(UPLOAD_FOLDER, safe_filename)
file.save(file_path)
session['machine_csv_path'] = file_path
print(f"Debug - Saved file to: {file_path}")
df = pd.read_csv(file_path)
preview_data = df.head().to_dict('records')
summary_stats = get_summary_stats(df)
session['original_columns'] = df.columns.tolist()
return render_template('machine_failure.html',
title="Machine Failure Prediction",
preview_data=preview_data,
columns=df.columns.tolist(),
summary_stats=summary_stats)
except Exception as e:
print(f"Debug - Upload error: {str(e)}")
flash(f'Error processing file: {str(e)}')
return redirect(url_for('machine_failure.show_machine_failure'))
@machine_failure_bp.route('/run_prediction', methods=['POST'])
def run_prediction():
try:
df = get_current_df()
if df is None:
return jsonify({'success': False, 'error': 'No data available. Please upload a CSV file first.'})
target_col = request.form.get('target_col')
if not target_col:
return jsonify({'success': False, 'error': 'Target column not selected.'})
# Preprocess the data for training
df_processed, label_encoders = preprocess_data(df.copy(), for_prediction=False)
encoders_path = os.path.join(UPLOAD_FOLDER, f'encoders_{datetime.now().strftime("%Y%m%d_%H%M%S")}.joblib')
joblib.dump(label_encoders, encoders_path)
session['encoders_path'] = encoders_path
# Prepare features and target
if target_col not in df_processed.columns:
return jsonify({'success': False, 'error': f"Target column '{target_col}' not found after preprocessing. Check if it was dropped or transformed incorrectly."})
X = df_processed.drop(columns=[target_col])
y = df_processed[target_col]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
clf = RandomForestClassifier(random_state=42)
clf.fit(X_train_scaled, y_train)
y_pred = clf.predict(X_test_scaled)
importances = clf.feature_importances_
feature_names = X.columns
feature_importance = sorted(
zip(feature_names, importances),
key=lambda x: x[1],
reverse=True
)[:5]
top_features = [{'feature': f, 'importance': float(imp)} for f, imp in feature_importance]
session['feature_names'] = X.columns.tolist()
session['target_column_name'] = target_col
metrics = {
'Accuracy': accuracy_score(y_test, y_pred),
'Precision': precision_score(y_test, y_pred, average='weighted', zero_division=0),
'Recall': recall_score(y_test, y_pred, average='weighted', zero_division=0),
'F1 Score': f1_score(y_test, y_pred, average='weighted', zero_division=0)
}
model_path = os.path.join(UPLOAD_FOLDER, f'model_{datetime.now().strftime("%Y%m%d_%H%M%S")}.joblib')
scaler_path = os.path.join(UPLOAD_FOLDER, f'scaler_{datetime.now().strftime("%Y%m%d_%H%M%S")}.joblib')
joblib.dump(clf, model_path)
joblib.dump(scaler, scaler_path)
session['model_path'] = model_path
session['scaler_path'] = scaler_path
return jsonify({
'success': True,
'metrics': metrics,
'top_features': top_features,
})
except Exception as e:
print(f"Error in run_prediction: {e}")
return jsonify({'success': False, 'error': str(e)})
@machine_failure_bp.route('/get_form_data', methods=['GET'])
def get_form_data():
try:
df = get_current_df()
if df is None:
return jsonify({'success': False, 'error': 'No data available. Please upload a file first.'})
target_col = session.get('target_column_name')
if not target_col:
return jsonify({'success': False, 'error': 'Target column not found in session. Please run prediction first.'})
exclude_cols = ['error_severity', 'downtime_minutes', 'failure_type', target_col]
form_fields = []
for col in df.columns:
if col.lower() in [ec.lower() for ec in exclude_cols]:
continue
default_value = None
if not df[col].dropna().empty:
if pd.api.types.is_numeric_dtype(df[col]):
min_val = df[col].min()
max_val = df[col].max()
if pd.isna(min_val) or pd.isna(max_val):
default_value = 0.0
else:
default_value = round(random.uniform(float(min_val), float(max_val)), 2)
elif col in ['timestamp', 'maintenance_timestamp']:
sample_date = random.choice(df[col].dropna().tolist())
try:
parsed_date = pd.to_datetime(sample_date)
if pd.isna(parsed_date):
default_value = "YYYY-MM-DD HH:MM:SS" # Fallback for invalid dates
else:
default_value = parsed_date.strftime('%Y-%m-%d %H:%M:%S')
except Exception:
default_value = "YYYY-MM-DD HH:MM:SS"
else:
unique_vals_str = [str(x) for x in df[col].dropna().unique()]
if unique_vals_str:
default_value = random.choice(unique_vals_str)
else:
default_value = ""
if pd.api.types.is_numeric_dtype(df[col]):
form_fields.append({
'name': col,
'type': 'number',
'default_value': default_value
})
elif col in ['timestamp', 'maintenance_timestamp']:
form_fields.append({
'name': col,
'type': 'text',
'placeholder': 'YYYY-MM-DD HH:MM:SS (optional)',
'default_value': default_value
})
else:
unique_values = [str(x) for x in df[col].dropna().unique().tolist()]
form_fields.append({
'name': col,
'type': 'select',
'options': unique_values,
'default_value': default_value
})
return jsonify({'success': True, 'form_fields': form_fields})
except Exception as e:
print(f"Error in get_form_data: {e}")
return jsonify({'success': False, 'error': str(e)})
@machine_failure_bp.route('/predict_single', methods=['POST'])
def predict_single():
try:
model_path = session.get('model_path')
scaler_path = session.get('scaler_path')
encoders_path = session.get('encoders_path')
feature_names = session.get('feature_names')
target_col = session.get('target_column_name')
if not all([model_path, scaler_path, encoders_path, feature_names, target_col]):
return jsonify({'success': False, 'error': 'Model or preprocessing artifacts not found. Please train a model first.'})
model = joblib.load(model_path)
scaler = joblib.load(scaler_path)
label_encoders = joblib.load(encoders_path)
input_data = request.json
if not input_data:
return jsonify({'success': False, 'error': 'No input data provided.'})
original_uploaded_columns = session.get('original_columns')
if not original_uploaded_columns:
return jsonify({'success': False, 'error': 'Original dataset column names not found in session. Please upload a file.'})
full_input_df = pd.DataFrame(columns=original_uploaded_columns)
single_row_input_df = pd.DataFrame([input_data])
for col in original_uploaded_columns:
if col in single_row_input_df.columns:
full_input_df.loc[0, col] = single_row_input_df.loc[0, col]
else:
full_input_df.loc[0, col] = np.nan
preprocessed_input_df, _ = preprocess_data(full_input_df.copy(), for_prediction=True, label_encoders=label_encoders)
final_input_features = pd.DataFrame(columns=feature_names)
for col in feature_names:
if col in preprocessed_input_df.columns:
final_input_features[col] = pd.to_numeric(preprocessed_input_df[col], errors='coerce').values
else:
final_input_features[col] = 0.0
final_input_features = final_input_features.fillna(0.0)
input_scaled = scaler.transform(final_input_features)
prediction_value = model.predict(input_scaled)[0]
prediction_display = prediction_value
if target_col in label_encoders:
if isinstance(prediction_value, (int, np.integer)) and prediction_value < len(label_encoders[target_col].classes_):
prediction_display = str(label_encoders[target_col].inverse_transform([prediction_value])[0])
else:
prediction_display = str(prediction_value) + " (Unknown Class)"
else:
if isinstance(prediction_value, np.number):
prediction_display = float(prediction_value)
else:
prediction_display = prediction_value
probability = None
if hasattr(model, 'predict_proba'):
probability = model.predict_proba(input_scaled)[0].tolist()
probability = [float(p) for p in probability]
return jsonify({
'success': True,
'prediction': prediction_display,
'probability': probability
})
except Exception as e:
print(f"Error in predict_single: {e}")
return jsonify({'success': False, 'error': str(e)})