from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, session import pandas as pd import numpy as np import plotly.express as px import plotly.utils import json import os import joblib from datetime import datetime from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score import random machine_failure_bp = Blueprint('machine_failure', __name__, url_prefix='/predict/machine_failure') UPLOAD_FOLDER = 'temp_uploads' os.makedirs(UPLOAD_FOLDER, exist_ok=True) def get_current_df(): try: csv_path = session.get('machine_csv_path') print(f"Debug - CSV Path from session: {csv_path}") if csv_path and os.path.exists(csv_path): print(f"Debug - File exists at path: {csv_path}") return pd.read_csv(csv_path) print("Debug - No valid CSV path found") return None except Exception as e: print(f"Debug - Error in get_current_df: {str(e)}") return None def get_summary_stats(df): return { 'total_rows': len(df), 'total_columns': len(df.columns), 'columns': list(df.columns), 'numeric_columns': list(df.select_dtypes(include=[np.number]).columns), 'categorical_columns': list(df.select_dtypes(exclude=[np.number]).columns), 'missing_values': df.isnull().sum().to_dict() } def preprocess_data(df, for_prediction=False, label_encoders=None): """Preprocess the dataframe for machine learning Args: df (pd.DataFrame): The input DataFrame. for_prediction (bool): True if preprocessing for a single prediction, False for training. label_encoders (dict): Dictionary of pre-fitted LabelEncoders for single prediction. """ df_processed = df.copy() categorical_columns = [] numerical_columns = [] # Dynamically determine column types based on the current DataFrame for column in df_processed.columns: if column in ['timestamp', 'maintenance_timestamp']: continue # Check if column is numeric or can be converted to numeric (after dropping NaNs for check) if pd.api.types.is_numeric_dtype(df_processed[column]): numerical_columns.append(column) else: try: # Attempt to convert to numeric, if successful, it's numeric if pd.to_numeric(df_processed[column].dropna()).notna().all(): numerical_columns.append(column) else: categorical_columns.append(column) except ValueError: categorical_columns.append(column) # Handle timestamps for time_col in ['timestamp', 'maintenance_timestamp']: if time_col in df_processed.columns: # Convert to datetime, coercing errors df_processed[time_col] = pd.to_datetime(df_processed[time_col], errors='coerce') # Extract features only if the column contains valid datetime values if not df_processed[time_col].isnull().all(): df_processed[f'{time_col}_hour'] = df_processed[time_col].dt.hour.fillna(0) # Fill NaN with 0 if time part is missing df_processed[f'{time_col}_day'] = df_processed[time_col].dt.day.fillna(0) df_processed[f'{time_col}_month'] = df_processed[time_col].dt.month.fillna(0) else: df_processed[f'{time_col}_hour'] = 0 df_processed[f'{time_col}_day'] = 0 df_processed[f'{time_col}_month'] = 0 df_processed = df_processed.drop(columns=[time_col]) # Encode categorical variables current_label_encoders = {} if not for_prediction: # During training, fit and save encoders for col in categorical_columns: if col in df_processed.columns: le = LabelEncoder() df_processed[col] = le.fit_transform(df_processed[col].astype(str)) current_label_encoders[col] = le else: for col, le in label_encoders.items(): if col in df_processed.columns: # Use a lambda function to handle unseen labels: map to -1 or a default if not in classes df_processed[col] = df_processed[col].astype(str).apply( lambda x: le.transform([x])[0] if x in le.classes_ else -1 ) return df_processed, current_label_encoders @machine_failure_bp.route('/', methods=['GET']) def show_machine_failure(): return render_template('machine_failure.html', title="Machine Failure Prediction") @machine_failure_bp.route('/upload_file', methods=['POST']) def upload_file_machine(): if 'machine_file' not in request.files: flash('No file selected') return redirect(url_for('machine_failure.show_machine_failure')) file = request.files['machine_file'] if file.filename == '': flash('No file selected') return redirect(url_for('machine_failure.show_machine_failure')) try: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') safe_filename = f"machine_data_{timestamp}.csv" file_path = os.path.join(UPLOAD_FOLDER, safe_filename) file.save(file_path) session['machine_csv_path'] = file_path print(f"Debug - Saved file to: {file_path}") df = pd.read_csv(file_path) preview_data = df.head().to_dict('records') summary_stats = get_summary_stats(df) session['original_columns'] = df.columns.tolist() return render_template('machine_failure.html', title="Machine Failure Prediction", preview_data=preview_data, columns=df.columns.tolist(), summary_stats=summary_stats) except Exception as e: print(f"Debug - Upload error: {str(e)}") flash(f'Error processing file: {str(e)}') return redirect(url_for('machine_failure.show_machine_failure')) @machine_failure_bp.route('/run_prediction', methods=['POST']) def run_prediction(): try: df = get_current_df() if df is None: return jsonify({'success': False, 'error': 'No data available. Please upload a CSV file first.'}) target_col = request.form.get('target_col') if not target_col: return jsonify({'success': False, 'error': 'Target column not selected.'}) # Preprocess the data for training df_processed, label_encoders = preprocess_data(df.copy(), for_prediction=False) encoders_path = os.path.join(UPLOAD_FOLDER, f'encoders_{datetime.now().strftime("%Y%m%d_%H%M%S")}.joblib') joblib.dump(label_encoders, encoders_path) session['encoders_path'] = encoders_path # Prepare features and target if target_col not in df_processed.columns: return jsonify({'success': False, 'error': f"Target column '{target_col}' not found after preprocessing. Check if it was dropped or transformed incorrectly."}) X = df_processed.drop(columns=[target_col]) y = df_processed[target_col] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) clf = RandomForestClassifier(random_state=42) clf.fit(X_train_scaled, y_train) y_pred = clf.predict(X_test_scaled) importances = clf.feature_importances_ feature_names = X.columns feature_importance = sorted( zip(feature_names, importances), key=lambda x: x[1], reverse=True )[:5] top_features = [{'feature': f, 'importance': float(imp)} for f, imp in feature_importance] session['feature_names'] = X.columns.tolist() session['target_column_name'] = target_col metrics = { 'Accuracy': accuracy_score(y_test, y_pred), 'Precision': precision_score(y_test, y_pred, average='weighted', zero_division=0), 'Recall': recall_score(y_test, y_pred, average='weighted', zero_division=0), 'F1 Score': f1_score(y_test, y_pred, average='weighted', zero_division=0) } model_path = os.path.join(UPLOAD_FOLDER, f'model_{datetime.now().strftime("%Y%m%d_%H%M%S")}.joblib') scaler_path = os.path.join(UPLOAD_FOLDER, f'scaler_{datetime.now().strftime("%Y%m%d_%H%M%S")}.joblib') joblib.dump(clf, model_path) joblib.dump(scaler, scaler_path) session['model_path'] = model_path session['scaler_path'] = scaler_path return jsonify({ 'success': True, 'metrics': metrics, 'top_features': top_features, }) except Exception as e: print(f"Error in run_prediction: {e}") return jsonify({'success': False, 'error': str(e)}) @machine_failure_bp.route('/get_form_data', methods=['GET']) def get_form_data(): try: df = get_current_df() if df is None: return jsonify({'success': False, 'error': 'No data available. Please upload a file first.'}) target_col = session.get('target_column_name') if not target_col: return jsonify({'success': False, 'error': 'Target column not found in session. Please run prediction first.'}) exclude_cols = ['error_severity', 'downtime_minutes', 'failure_type', target_col] form_fields = [] for col in df.columns: if col.lower() in [ec.lower() for ec in exclude_cols]: continue default_value = None if not df[col].dropna().empty: if pd.api.types.is_numeric_dtype(df[col]): min_val = df[col].min() max_val = df[col].max() if pd.isna(min_val) or pd.isna(max_val): default_value = 0.0 else: default_value = round(random.uniform(float(min_val), float(max_val)), 2) elif col in ['timestamp', 'maintenance_timestamp']: sample_date = random.choice(df[col].dropna().tolist()) try: parsed_date = pd.to_datetime(sample_date) if pd.isna(parsed_date): default_value = "YYYY-MM-DD HH:MM:SS" # Fallback for invalid dates else: default_value = parsed_date.strftime('%Y-%m-%d %H:%M:%S') except Exception: default_value = "YYYY-MM-DD HH:MM:SS" else: unique_vals_str = [str(x) for x in df[col].dropna().unique()] if unique_vals_str: default_value = random.choice(unique_vals_str) else: default_value = "" if pd.api.types.is_numeric_dtype(df[col]): form_fields.append({ 'name': col, 'type': 'number', 'default_value': default_value }) elif col in ['timestamp', 'maintenance_timestamp']: form_fields.append({ 'name': col, 'type': 'text', 'placeholder': 'YYYY-MM-DD HH:MM:SS (optional)', 'default_value': default_value }) else: unique_values = [str(x) for x in df[col].dropna().unique().tolist()] form_fields.append({ 'name': col, 'type': 'select', 'options': unique_values, 'default_value': default_value }) return jsonify({'success': True, 'form_fields': form_fields}) except Exception as e: print(f"Error in get_form_data: {e}") return jsonify({'success': False, 'error': str(e)}) @machine_failure_bp.route('/predict_single', methods=['POST']) def predict_single(): try: model_path = session.get('model_path') scaler_path = session.get('scaler_path') encoders_path = session.get('encoders_path') feature_names = session.get('feature_names') target_col = session.get('target_column_name') if not all([model_path, scaler_path, encoders_path, feature_names, target_col]): return jsonify({'success': False, 'error': 'Model or preprocessing artifacts not found. Please train a model first.'}) model = joblib.load(model_path) scaler = joblib.load(scaler_path) label_encoders = joblib.load(encoders_path) input_data = request.json if not input_data: return jsonify({'success': False, 'error': 'No input data provided.'}) original_uploaded_columns = session.get('original_columns') if not original_uploaded_columns: return jsonify({'success': False, 'error': 'Original dataset column names not found in session. Please upload a file.'}) full_input_df = pd.DataFrame(columns=original_uploaded_columns) single_row_input_df = pd.DataFrame([input_data]) for col in original_uploaded_columns: if col in single_row_input_df.columns: full_input_df.loc[0, col] = single_row_input_df.loc[0, col] else: full_input_df.loc[0, col] = np.nan preprocessed_input_df, _ = preprocess_data(full_input_df.copy(), for_prediction=True, label_encoders=label_encoders) final_input_features = pd.DataFrame(columns=feature_names) for col in feature_names: if col in preprocessed_input_df.columns: final_input_features[col] = pd.to_numeric(preprocessed_input_df[col], errors='coerce').values else: final_input_features[col] = 0.0 final_input_features = final_input_features.fillna(0.0) input_scaled = scaler.transform(final_input_features) prediction_value = model.predict(input_scaled)[0] prediction_display = prediction_value if target_col in label_encoders: if isinstance(prediction_value, (int, np.integer)) and prediction_value < len(label_encoders[target_col].classes_): prediction_display = str(label_encoders[target_col].inverse_transform([prediction_value])[0]) else: prediction_display = str(prediction_value) + " (Unknown Class)" else: if isinstance(prediction_value, np.number): prediction_display = float(prediction_value) else: prediction_display = prediction_value probability = None if hasattr(model, 'predict_proba'): probability = model.predict_proba(input_scaled)[0].tolist() probability = [float(p) for p in probability] return jsonify({ 'success': True, 'prediction': prediction_display, 'probability': probability }) except Exception as e: print(f"Error in predict_single: {e}") return jsonify({'success': False, 'error': str(e)})