from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash, session import pandas as pd import numpy as np import plotly.express as px import plotly.utils import json import os import joblib from datetime import datetime from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler, LabelEncoder from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score import random supply_failure_bp = Blueprint('supply_failure', __name__, url_prefix='/predict/supply_failure') UPLOAD_FOLDER = 'temp_uploads' os.makedirs(UPLOAD_FOLDER, exist_ok=True) def get_current_df_supply(): try: csv_path = session.get('supply_csv_path') if csv_path and os.path.exists(csv_path): return pd.read_csv(csv_path) return None except Exception as e: print(f"Error in get_current_df_supply: {str(e)}") return None def get_summary_stats_supply(df): return { 'total_rows': len(df), 'total_columns': len(df.columns), 'columns': list(df.columns), 'numeric_columns': list(df.select_dtypes(include=[np.number]).columns), 'categorical_columns': list(df.select_dtypes(exclude=[np.number]).columns), 'missing_values': df.isnull().sum().to_dict() } def preprocess_data_supply(df, for_prediction=False, label_encoders=None): df_processed = df.copy() # Identify date columns based on known names date_cols = ['order_date', 'promised_delivery_date', 'actual_delivery_date'] # Process date columns: extract features and drop original for col in date_cols: if col in df_processed.columns: # Convert to datetime, coercing errors to NaT df_processed[col] = pd.to_datetime(df_processed[col], errors='coerce') # Extract features only if there are valid datetime values if not df_processed[col].isnull().all(): df_processed[f'{col}_day_of_week'] = df_processed[col].dt.dayofweek.fillna(-1) # -1 for NaN dates df_processed[f'{col}_month'] = df_processed[col].dt.month.fillna(-1) df_processed[f'{col}_year'] = df_processed[col].dt.year.fillna(-1) df_processed[f'{col}_day'] = df_processed[col].dt.day.fillna(-1) else: # If all dates are NaT, add dummy columns filled with -1 df_processed[f'{col}_day_of_week'] = -1 df_processed[f'{col}_month'] = -1 df_processed[f'{col}_year'] = -1 df_processed[f'{col}_day'] = -1 df_processed = df_processed.drop(columns=[col]) # Identify numerical and categorical columns after date processing categorical_columns = [] numerical_columns = [] for column in df_processed.columns: if pd.api.types.is_numeric_dtype(df_processed[column]): numerical_columns.append(column) else: try: # Attempt to convert to numeric, if successful, it's numeric if pd.to_numeric(df_processed[column].dropna()).notna().all(): numerical_columns.append(column) else: categorical_columns.append(column) except ValueError: categorical_columns.append(column) # Encode categorical variables current_label_encoders = {} if not for_prediction: # During training, fit and save encoders for col in categorical_columns: if col in df_processed.columns: le = LabelEncoder() df_processed[col] = le.fit_transform(df_processed[col].astype(str).fillna('missing_value')) current_label_encoders[col] = le else: # For prediction, use provided encoders for col, le in label_encoders.items(): if col in df_processed.columns: df_processed[col] = df_processed[col].astype(str).apply( lambda x: le.transform([x])[0] if x in le.classes_ else -1 ) # Ensure numerical columns are truly numeric and fill any NaNs for col in numerical_columns: if col in df_processed.columns: df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce').fillna(0) # Fill numerical NaNs with 0 return df_processed, current_label_encoders @supply_failure_bp.route('/', methods=['GET']) def show_supply_failure(): return render_template('supply_failure.html', title="Supply Failure Prediction") @supply_failure_bp.route('/upload', methods=['POST']) def upload_file_supply(): if 'supply_file' not in request.files: flash('No file selected') return redirect(url_for('supply_failure.show_supply_failure')) file = request.files['supply_file'] if file.filename == '': flash('No file selected') return redirect(url_for('supply_failure.show_supply_failure')) try: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') safe_filename = f"supply_data_{timestamp}.csv" file_path = os.path.join(UPLOAD_FOLDER, safe_filename) file.save(file_path) session['supply_csv_path'] = file_path df = pd.read_csv(file_path) preview_data = df.head().to_dict('records') summary_stats = get_summary_stats_supply(df) session['original_columns_supply'] = df.columns.tolist() return render_template('supply_failure.html', title="Supply Failure Prediction", preview_data=preview_data, columns=df.columns.tolist(), summary_stats=summary_stats) except Exception as e: flash(f'Error processing file: {str(e)}') return redirect(url_for('supply_failure.show_supply_failure')) @supply_failure_bp.route('/run_prediction', methods=['POST']) def run_prediction_supply(): try: df = get_current_df_supply() if df is None: return jsonify({'success': False, 'error': 'No data available. Please upload a CSV file first.'}) target_col = 'failure_flag' # Fixed target column as per definition df_processed, label_encoders = preprocess_data_supply(df.copy(), for_prediction=False) encoders_path = os.path.join(UPLOAD_FOLDER, f'supply_encoders_{datetime.now().strftime("%Y%m%d_%H%M%S")}.joblib') joblib.dump(label_encoders, encoders_path) session['supply_encoders_path'] = encoders_path if target_col not in df_processed.columns: return jsonify({'success': False, 'error': f"Target column '{target_col}' not found after preprocessing. Check if it was dropped or transformed incorrectly."}) X = df_processed.drop(columns=[target_col]) y = df_processed[target_col] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) clf = RandomForestClassifier(random_state=42) clf.fit(X_train_scaled, y_train) y_pred = clf.predict(X_test_scaled) importances = clf.feature_importances_ feature_names = X.columns feature_importance = sorted( zip(feature_names, importances), key=lambda x: x[1], reverse=True )[:5] top_features = [{'feature': f, 'importance': float(imp)} for f, imp in feature_importance] session['supply_feature_names'] = X.columns.tolist() session['supply_target_column_name'] = target_col metrics = { 'Accuracy': accuracy_score(y_test, y_pred), 'Precision': precision_score(y_test, y_pred, average='weighted', zero_division=0), 'Recall': recall_score(y_test, y_pred, average='weighted', zero_division=0), 'F1 Score': f1_score(y_test, y_pred, average='weighted', zero_division=0) } model_path = os.path.join(UPLOAD_FOLDER, f'supply_model_{datetime.now().strftime("%Y%m%d_%H%M%S")}.joblib') scaler_path = os.path.join(UPLOAD_FOLDER, f'supply_scaler_{datetime.now().strftime("%Y%m%d_%H%M%S")}.joblib') joblib.dump(clf, model_path) joblib.dump(scaler, scaler_path) session['supply_model_path'] = model_path session['supply_scaler_path'] = scaler_path return jsonify({ 'success': True, 'metrics': metrics, 'top_features': top_features, }) except Exception as e: print(f"Error in run_prediction_supply: {e}") return jsonify({'success': False, 'error': str(e)}) @supply_failure_bp.route('/get_form_data', methods=['GET']) def get_form_data_supply(): try: df = get_current_df_supply() if df is None: return jsonify({'success': False, 'error': 'No data available. Please upload a file first.'}) target_col = 'failure_flag' # Fixed target column for supply chain if not target_col: # Should not happen if fixed, but good for robustness return jsonify({'success': False, 'error': 'Target column not found in session. Please run prediction first.'}) # Columns to exclude from the form as requested by the user exclude_cols = ['delivery_delay_days', 'delivered_quantity', 'return_reason', 'delivery_status', 'failure_type', target_col, 'order_id', 'component_id', 'po_approval_delay_days', 'customs_clearance_days', 'actual_delivery_date'] form_fields = [] for col in df.columns: if col.lower() in [ec.lower() for ec in exclude_cols]: continue default_value = None if not df[col].dropna().empty: if pd.api.types.is_numeric_dtype(df[col]): min_val = df[col].min() max_val = df[col].max() if pd.isna(min_val) or pd.isna(max_val): default_value = 0.0 else: default_value = round(random.uniform(float(min_val), float(max_val)), 2) elif col in ['order_date', 'promised_delivery_date', 'actual_delivery_date']: sample_date = random.choice(df[col].dropna().tolist()) try: parsed_date = pd.to_datetime(sample_date) if pd.isna(parsed_date): default_value = "YYYY-MM-DD HH:MM:SS" else: default_value = parsed_date.strftime('%Y-%m-%d %H:%M:%S') except Exception: default_value = "YYYY-MM-DD HH:MM:SS" else: # Categorical or other types unique_vals_str = [str(x) for x in df[col].dropna().unique()] if unique_vals_str: default_value = random.choice(unique_vals_str) else: default_value = "" if pd.api.types.is_numeric_dtype(df[col]): form_fields.append({ 'name': col, 'type': 'number', 'default_value': default_value }) elif col in ['order_date', 'promised_delivery_date', 'actual_delivery_date']: form_fields.append({ 'name': col, 'type': 'text', 'placeholder': 'YYYY-MM-DD HH:MM:SS', 'default_value': default_value }) else: # Categorical unique_values = [str(x) for x in df[col].dropna().unique().tolist()] form_fields.append({ 'name': col, 'type': 'select', 'options': unique_values, 'default_value': default_value }) return jsonify({'success': True, 'form_fields': form_fields}) except Exception as e: print(f"Error in get_form_data_supply: {e}") return jsonify({'success': False, 'error': str(e)}) @supply_failure_bp.route('/predict_single', methods=['POST']) def predict_single_supply(): try: model_path = session.get('supply_model_path') scaler_path = session.get('supply_scaler_path') encoders_path = session.get('supply_encoders_path') feature_names = session.get('supply_feature_names') target_col = session.get('supply_target_column_name') original_uploaded_columns = session.get('original_columns_supply') if not all([model_path, scaler_path, encoders_path, feature_names, target_col, original_uploaded_columns]): return jsonify({'success': False, 'error': 'Model or preprocessing artifacts not found for supply chain. Please train a model first.'}) model = joblib.load(model_path) scaler = joblib.load(scaler_path) label_encoders = joblib.load(encoders_path) input_data = request.json if not input_data: return jsonify({'success': False, 'error': 'No input data provided.'}) full_input_df = pd.DataFrame(columns=original_uploaded_columns) single_row_input_df = pd.DataFrame([input_data]) for col in original_uploaded_columns: if col in single_row_input_df.columns: full_input_df.loc[0, col] = single_row_input_df.loc[0, col] else: full_input_df.loc[0, col] = np.nan preprocessed_input_df, _ = preprocess_data_supply(full_input_df.copy(), for_prediction=True, label_encoders=label_encoders) final_input_features = pd.DataFrame(columns=feature_names) for col in feature_names: if col in preprocessed_input_df.columns: final_input_features[col] = pd.to_numeric(preprocessed_input_df[col], errors='coerce').values else: final_input_features[col] = 0.0 final_input_features = final_input_features.fillna(0.0) input_scaled = scaler.transform(final_input_features) prediction_value = model.predict(input_scaled)[0] # Convert prediction_value to standard Python int/float/str prediction_display = prediction_value if target_col in label_encoders and prediction_value in label_encoders[target_col].classes_: # Check if target was encoded and value is in classes prediction_display = str(label_encoders[target_col].inverse_transform([prediction_value])[0]) else: if isinstance(prediction_value, np.number): prediction_display = float(prediction_value) else: prediction_display = prediction_value # Keep as is if not np.number # Convert 0/1 to "No Failure"/"Failure" based on the definition for failure_flag if prediction_display == 0 or prediction_display == "0": user_friendly_prediction = "Delivery Successful" elif prediction_display == 1 or prediction_display == "1": user_friendly_prediction = "Delivery Failed" else: user_friendly_prediction = str(prediction_display) # Fallback if target is something else probability = None if hasattr(model, 'predict_proba'): probability = model.predict_proba(input_scaled)[0].tolist() probability = [float(p) for p in probability] return jsonify({ 'success': True, 'prediction': user_friendly_prediction, 'probability': probability }) except Exception as e: print(f"Error in predict_single_supply: {e}") return jsonify({'success': False, 'error': str(e)})