|
|
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from sklearn.model_selection import train_test_split |
|
|
from sklearn.preprocessing import StandardScaler, LabelEncoder |
|
|
from sklearn.ensemble import RandomForestClassifier |
|
|
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score |
|
|
import random |
|
|
|
|
|
machine_failure_bp = Blueprint('machine_failure', __name__, url_prefix='/predict/machine_failure') |
|
|
|
|
|
|
|
|
_current_df_machine = None |
|
|
_model_machine = None |
|
|
_scaler_machine = None |
|
|
_encoders_machine = None |
|
|
_feature_names_machine = None |
|
|
_target_col_machine = None |
|
|
_original_cols_machine = None |
|
|
|
|
|
|
|
|
def get_summary_stats(df): |
|
|
"""Helper function to get summary statistics from a dataframe.""" |
|
|
return { |
|
|
'total_rows': len(df), |
|
|
'total_columns': len(df.columns), |
|
|
'columns': list(df.columns), |
|
|
'numeric_columns': list(df.select_dtypes(include=[np.number]).columns), |
|
|
'categorical_columns': list(df.select_dtypes(exclude=[np.number]).columns), |
|
|
'missing_values': df.isnull().sum().to_dict() |
|
|
} |
|
|
|
|
|
def preprocess_data(df, for_prediction=False, label_encoders=None): |
|
|
"""Helper function to preprocess data for modeling.""" |
|
|
df_processed = df.copy() |
|
|
|
|
|
|
|
|
categorical_columns = [col for col in df_processed.columns if df_processed[col].dtype == 'object' and col not in ['timestamp', 'maintenance_timestamp']] |
|
|
|
|
|
|
|
|
for time_col in ['timestamp', 'maintenance_timestamp']: |
|
|
if time_col in df_processed.columns: |
|
|
df_processed[time_col] = pd.to_datetime(df_processed[time_col], errors='coerce') |
|
|
df_processed[f'{time_col}_hour'] = df_processed[time_col].dt.hour.fillna(0) |
|
|
df_processed[f'{time_col}_day'] = df_processed[time_col].dt.day.fillna(0) |
|
|
df_processed[f'{time_col}_month'] = df_processed[time_col].dt.month.fillna(0) |
|
|
df_processed = df_processed.drop(columns=[time_col]) |
|
|
|
|
|
|
|
|
current_label_encoders = {} |
|
|
if not for_prediction: |
|
|
current_label_encoders = {} |
|
|
for col in categorical_columns: |
|
|
if col in df_processed.columns: |
|
|
le = LabelEncoder() |
|
|
df_processed[col] = le.fit_transform(df_processed[col].astype(str).fillna('missing')) |
|
|
current_label_encoders[col] = le |
|
|
else: |
|
|
for col, le in label_encoders.items(): |
|
|
if col in df_processed.columns: |
|
|
|
|
|
df_processed[col] = df_processed[col].astype(str).fillna('missing').apply( |
|
|
lambda x: le.transform([x])[0] if x in le.classes_ else -1 |
|
|
) |
|
|
return df_processed, current_label_encoders |
|
|
|
|
|
|
|
|
@machine_failure_bp.route('/', methods=['GET']) |
|
|
def show_machine_failure(): |
|
|
"""Renders the main page for the machine failure tool.""" |
|
|
return render_template('machine_failure.html', title="Machine Failure Prediction") |
|
|
|
|
|
|
|
|
@machine_failure_bp.route('/upload_machine', methods=['POST']) |
|
|
def upload_file_machine(): |
|
|
"""Handles file upload and displays data preview.""" |
|
|
global _current_df_machine, _original_cols_machine |
|
|
if 'machine_file' not in request.files: |
|
|
flash('No file selected') |
|
|
return redirect(url_for('machine_failure.show_machine_failure')) |
|
|
|
|
|
file = request.files['machine_file'] |
|
|
if file.filename == '': |
|
|
flash('No file selected') |
|
|
return redirect(url_for('machine_failure.show_machine_failure')) |
|
|
|
|
|
try: |
|
|
_current_df_machine = pd.read_csv(file) |
|
|
_original_cols_machine = _current_df_machine.columns.tolist() |
|
|
preview_data = _current_df_machine.head().to_dict('records') |
|
|
summary_stats = get_summary_stats(_current_df_machine) |
|
|
|
|
|
return render_template('machine_failure.html', |
|
|
title="Machine Failure Prediction", |
|
|
preview_data=preview_data, |
|
|
columns=_current_df_machine.columns.tolist(), |
|
|
summary_stats=summary_stats) |
|
|
except Exception as e: |
|
|
flash(f'Error processing file: {str(e)}') |
|
|
return redirect(url_for('machine_failure.show_machine_failure')) |
|
|
|
|
|
|
|
|
@machine_failure_bp.route('/run_prediction', methods=['POST']) |
|
|
def run_prediction(): |
|
|
"""Trains the model and returns performance metrics.""" |
|
|
global _current_df_machine, _model_machine, _scaler_machine, _encoders_machine, _feature_names_machine, _target_col_machine |
|
|
if _current_df_machine is None: |
|
|
return jsonify({'success': False, 'error': 'No data available. Please upload a CSV file first.'}) |
|
|
|
|
|
target_col = request.form.get('target_col') |
|
|
if not target_col: |
|
|
return jsonify({'success': False, 'error': 'Target column not selected.'}) |
|
|
|
|
|
_target_col_machine = target_col |
|
|
|
|
|
try: |
|
|
df_processed, label_encoders = preprocess_data(_current_df_machine.copy()) |
|
|
_encoders_machine = label_encoders |
|
|
|
|
|
if target_col not in df_processed.columns: |
|
|
return jsonify({'success': False, 'error': f"Target column '{target_col}' not found after preprocessing."}) |
|
|
|
|
|
X = df_processed.drop(columns=[target_col]) |
|
|
y = df_processed[target_col] |
|
|
_feature_names_machine = X.columns.tolist() |
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
|
|
|
|
|
_scaler_machine = StandardScaler() |
|
|
X_train_scaled = _scaler_machine.fit_transform(X_train) |
|
|
X_test_scaled = _scaler_machine.transform(X_test) |
|
|
|
|
|
_model_machine = RandomForestClassifier(random_state=42) |
|
|
_model_machine.fit(X_train_scaled, y_train) |
|
|
y_pred = _model_machine.predict(X_test_scaled) |
|
|
|
|
|
importances = _model_machine.feature_importances_ |
|
|
feature_importance = sorted(zip(_feature_names_machine, importances), key=lambda x: x[1], reverse=True)[:5] |
|
|
top_features = [{'feature': f, 'importance': float(imp)} for f, imp in feature_importance] |
|
|
|
|
|
metrics = { |
|
|
'Accuracy': accuracy_score(y_test, y_pred), |
|
|
'Precision': precision_score(y_test, y_pred, average='weighted', zero_division=0), |
|
|
'Recall': recall_score(y_test, y_pred, average='weighted', zero_division=0), |
|
|
'F1 Score': f1_score(y_test, y_pred, average='weighted', zero_division=0) |
|
|
} |
|
|
|
|
|
return jsonify({'success': True, 'metrics': metrics, 'top_features': top_features}) |
|
|
except Exception as e: |
|
|
return jsonify({'success': False, 'error': f'An error occurred: {str(e)}'}) |
|
|
|
|
|
|
|
|
@machine_failure_bp.route('/get_form_data', methods=['GET']) |
|
|
def get_form_data(): |
|
|
"""Generates the fields for the single prediction form.""" |
|
|
if _current_df_machine is None: |
|
|
return jsonify({'success': False, 'error': 'No data available. Please upload a file first.'}) |
|
|
if _target_col_machine is None: |
|
|
return jsonify({'success': False, 'error': 'Model not trained yet. Please run a prediction first.'}) |
|
|
|
|
|
df = _current_df_machine |
|
|
exclude_cols = ['error_severity', 'downtime_minutes', 'failure_type', _target_col_machine] |
|
|
form_fields = [] |
|
|
|
|
|
for col in df.columns: |
|
|
if col.lower() in [ec.lower() for ec in exclude_cols]: |
|
|
continue |
|
|
|
|
|
field_info = {'name': col} |
|
|
if pd.api.types.is_numeric_dtype(df[col]): |
|
|
field_info['type'] = 'number' |
|
|
field_info['default_value'] = round(df[col].mean(), 2) if not df[col].empty else 0 |
|
|
elif col in ['timestamp', 'maintenance_timestamp']: |
|
|
field_info['type'] = 'text' |
|
|
field_info['placeholder'] = 'YYYY-MM-DD HH:MM:SS' |
|
|
field_info['default_value'] = pd.to_datetime(df[col].mode()[0]).strftime('%Y-%m-%d %H:%M:%S') if not df[col].mode().empty else '' |
|
|
else: |
|
|
field_info['type'] = 'select' |
|
|
field_info['options'] = [str(x) for x in df[col].dropna().unique().tolist()] |
|
|
field_info['default_value'] = df[col].mode()[0] if not df[col].mode().empty else '' |
|
|
form_fields.append(field_info) |
|
|
|
|
|
return jsonify({'success': True, 'form_fields': form_fields}) |
|
|
|
|
|
|
|
|
@machine_failure_bp.route('/predict_single', methods=['POST']) |
|
|
def predict_single(): |
|
|
"""Makes a prediction for a single instance of data.""" |
|
|
if not all([_model_machine, _scaler_machine, _encoders_machine, _feature_names_machine, _original_cols_machine]): |
|
|
return jsonify({'success': False, 'error': 'Model or configuration not ready. Please run a prediction first.'}) |
|
|
|
|
|
try: |
|
|
input_data = request.json |
|
|
input_df = pd.DataFrame([input_data], columns=_original_cols_machine) |
|
|
|
|
|
preprocessed_df, _ = preprocess_data(input_df.copy(), for_prediction=True, label_encoders=_encoders_machine) |
|
|
|
|
|
|
|
|
final_features = pd.DataFrame(columns=_feature_names_machine) |
|
|
final_features = pd.concat([final_features, preprocessed_df], ignore_index=True).fillna(0) |
|
|
|
|
|
input_scaled = _scaler_machine.transform(final_features[_feature_names_machine]) |
|
|
|
|
|
prediction = _model_machine.predict(input_scaled)[0] |
|
|
prediction_display = "Failure" if prediction == 1 else "No Failure" |
|
|
|
|
|
probability = _model_machine.predict_proba(input_scaled)[0].tolist() |
|
|
|
|
|
return jsonify({'success': True, 'prediction': prediction_display, 'probability': probability}) |
|
|
except Exception as e: |
|
|
return jsonify({'success': False, 'error': f'An error occurred during prediction: {str(e)}'}) |