| | from flask import Blueprint, render_template, request, jsonify, redirect, url_for, flash |
| | import pandas as pd |
| | import numpy as np |
| | from sklearn.model_selection import train_test_split |
| | from sklearn.preprocessing import StandardScaler, LabelEncoder |
| | from sklearn.ensemble import RandomForestClassifier |
| | from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score |
| | import random |
| |
|
| | supply_failure_bp = Blueprint('supply_failure', __name__, url_prefix='/predict/supply_failure') |
| |
|
| | |
| | _current_df_supply = None |
| | _model_supply = None |
| | _scaler_supply = None |
| | _encoders_supply = None |
| | _feature_names_supply = None |
| | _original_cols_supply = None |
| | _target_col_supply = 'failure_flag' |
| |
|
| |
|
| | def get_summary_stats_supply(df): |
| | """Helper function to get summary statistics.""" |
| | return { |
| | 'total_rows': len(df), |
| | 'total_columns': len(df.columns), |
| | 'columns': list(df.columns), |
| | 'numeric_columns': list(df.select_dtypes(include=[np.number]).columns), |
| | 'categorical_columns': list(df.select_dtypes(exclude=[np.number]).columns), |
| | 'missing_values': df.isnull().sum().to_dict() |
| | } |
| |
|
| | def preprocess_data_supply(df, for_prediction=False, label_encoders=None): |
| | """Helper function to preprocess supply chain data.""" |
| | df_processed = df.copy() |
| | |
| | date_cols = ['order_date', 'promised_delivery_date', 'actual_delivery_date'] |
| | categorical_columns = [col for col in df_processed.columns if df_processed[col].dtype == 'object' and col not in date_cols] |
| |
|
| | for col in date_cols: |
| | if col in df_processed.columns: |
| | df_processed[col] = pd.to_datetime(df_processed[col], errors='coerce') |
| | df_processed[f'{col}_day_of_week'] = df_processed[col].dt.dayofweek.fillna(-1) |
| | df_processed[f'{col}_month'] = df_processed[col].dt.month.fillna(-1) |
| | df_processed = df_processed.drop(columns=[col]) |
| |
|
| | current_label_encoders = {} |
| | if not for_prediction: |
| | for col in categorical_columns: |
| | if col in df_processed.columns: |
| | le = LabelEncoder() |
| | df_processed[col] = le.fit_transform(df_processed[col].astype(str).fillna('missing')) |
| | current_label_encoders[col] = le |
| | else: |
| | for col, le in label_encoders.items(): |
| | if col in df_processed.columns: |
| | df_processed[col] = df_processed[col].astype(str).fillna('missing').apply( |
| | lambda x: le.transform([x])[0] if x in le.classes_ else -1) |
| | |
| | |
| | numeric_cols = df_processed.select_dtypes(include=np.number).columns |
| | for col in numeric_cols: |
| | df_processed[col] = df_processed[col].fillna(0) |
| |
|
| | return df_processed, current_label_encoders |
| |
|
| |
|
| | @supply_failure_bp.route('/', methods=['GET']) |
| | def show_supply_failure(): |
| | """Renders the main page for the supply failure tool.""" |
| | return render_template('supply_failure.html', title="Supply Failure Prediction") |
| |
|
| |
|
| | @supply_failure_bp.route('/upload_file_supply', methods=['POST']) |
| | def upload_file_supply(): |
| | """Handles file upload and displays data preview.""" |
| | global _current_df_supply, _original_cols_supply |
| | if 'supply_file' not in request.files: |
| | flash('No file selected') |
| | return redirect(url_for('supply_failure.show_supply_failure')) |
| |
|
| | file = request.files['supply_file'] |
| | if file.filename == '': |
| | flash('No file selected') |
| | return redirect(url_for('supply_failure.show_supply_failure')) |
| |
|
| | try: |
| | _current_df_supply = pd.read_csv(file) |
| | _original_cols_supply = _current_df_supply.columns.tolist() |
| | preview_data = _current_df_supply.head().to_dict('records') |
| | summary_stats = get_summary_stats_supply(_current_df_supply) |
| |
|
| | return render_template('supply_failure.html', |
| | title="Supply Failure Prediction", |
| | preview_data=preview_data, |
| | columns=_current_df_supply.columns.tolist(), |
| | summary_stats=summary_stats) |
| | except Exception as e: |
| | flash(f'Error processing file: {str(e)}') |
| | return redirect(url_for('supply_failure.show_supply_failure')) |
| |
|
| |
|
| | @supply_failure_bp.route('/run_prediction', methods=['POST']) |
| | def run_prediction_supply(): |
| | """Trains the model and returns performance metrics.""" |
| | global _current_df_supply, _model_supply, _scaler_supply, _encoders_supply, _feature_names_supply, _target_col_supply |
| | if _current_df_supply is None: |
| | return jsonify({'success': False, 'error': 'No data available. Please upload a CSV file first.'}) |
| |
|
| | try: |
| | df_processed, label_encoders = preprocess_data_supply(_current_df_supply.copy()) |
| | _encoders_supply = label_encoders |
| |
|
| | if _target_col_supply not in df_processed.columns: |
| | return jsonify({'success': False, 'error': f"Target column '{_target_col_supply}' not found after preprocessing."}) |
| |
|
| | X = df_processed.drop(columns=[_target_col_supply]) |
| | y = df_processed[_target_col_supply] |
| | _feature_names_supply = X.columns.tolist() |
| |
|
| | X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
| | |
| | _scaler_supply = StandardScaler() |
| | X_train_scaled = _scaler_supply.fit_transform(X_train) |
| | X_test_scaled = _scaler_supply.transform(X_test) |
| | |
| | _model_supply = RandomForestClassifier(random_state=42) |
| | _model_supply.fit(X_train_scaled, y_train) |
| | y_pred = _model_supply.predict(X_test_scaled) |
| | |
| | importances = _model_supply.feature_importances_ |
| | feature_importance = sorted(zip(_feature_names_supply, importances), key=lambda x: x[1], reverse=True)[:5] |
| | top_features = [{'feature': f, 'importance': float(imp)} for f, imp in feature_importance] |
| |
|
| | metrics = { |
| | 'Accuracy': accuracy_score(y_test, y_pred), |
| | 'Precision': precision_score(y_test, y_pred, average='weighted', zero_division=0), |
| | 'Recall': recall_score(y_test, y_pred, average='weighted', zero_division=0), |
| | 'F1 Score': f1_score(y_test, y_pred, average='weighted', zero_division=0) |
| | } |
| |
|
| | return jsonify({'success': True, 'metrics': metrics, 'top_features': top_features}) |
| | except Exception as e: |
| | return jsonify({'success': False, 'error': f'An error occurred: {str(e)}'}) |
| |
|
| | @supply_failure_bp.route('/get_form_data', methods=['GET']) |
| | def get_form_data_supply(): |
| | """Generates the fields for the single prediction form.""" |
| | if _current_df_supply is None: |
| | return jsonify({'success': False, 'error': 'No data available. Please upload a file first.'}) |
| |
|
| | df = _current_df_supply |
| | exclude_cols = [ |
| | 'delivery_delay_days', 'delivered_quantity', 'return_reason', |
| | 'delivery_status', 'failure_type', _target_col_supply, 'order_id', |
| | 'component_id', 'po_approval_delay_days', 'customs_clearance_days', |
| | 'actual_delivery_date' |
| | ] |
| | form_fields = [] |
| | |
| | for col in df.columns: |
| | if col.lower() in [ec.lower() for ec in exclude_cols]: |
| | continue |
| |
|
| | field_info = {'name': col} |
| | if pd.api.types.is_numeric_dtype(df[col]): |
| | field_info['type'] = 'number' |
| | field_info['default_value'] = round(df[col].mean(), 2) if not df[col].empty else 0 |
| | elif col in ['order_date', 'promised_delivery_date']: |
| | field_info['type'] = 'text' |
| | field_info['placeholder'] = 'YYYY-MM-DD' |
| | field_info['default_value'] = pd.to_datetime(df[col].mode()[0]).strftime('%Y-%m-%d') if not df[col].mode().empty else '' |
| | else: |
| | field_info['type'] = 'select' |
| | field_info['options'] = [str(x) for x in df[col].dropna().unique().tolist()] |
| | field_info['default_value'] = df[col].mode()[0] if not df[col].mode().empty else '' |
| | form_fields.append(field_info) |
| | |
| | return jsonify({'success': True, 'form_fields': form_fields}) |
| |
|
| |
|
| | @supply_failure_bp.route('/predict_single', methods=['POST']) |
| | def predict_single_supply(): |
| | """Makes a prediction for a single instance of data.""" |
| | if not all([_model_supply, _scaler_supply, _encoders_supply, _feature_names_supply, _original_cols_supply]): |
| | return jsonify({'success': False, 'error': 'Model or configuration not ready. Please run a prediction first.'}) |
| |
|
| | try: |
| | input_data = request.json |
| | input_df = pd.DataFrame([input_data], columns=_original_cols_supply) |
| |
|
| | preprocessed_df, _ = preprocess_data_supply(input_df.copy(), for_prediction=True, label_encoders=_encoders_supply) |
| | |
| | final_features = pd.DataFrame(columns=_feature_names_supply) |
| | final_features = pd.concat([final_features, preprocessed_df], ignore_index=True).fillna(0) |
| |
|
| | input_scaled = _scaler_supply.transform(final_features[_feature_names_supply]) |
| | |
| | prediction = _model_supply.predict(input_scaled)[0] |
| | prediction_display = "Delivery Failed" if prediction == 1 else "Delivery Successful" |
| | |
| | probability = _model_supply.predict_proba(input_scaled)[0].tolist() |
| |
|
| | return jsonify({'success': True, 'prediction': prediction_display, 'probability': probability}) |
| | except Exception as e: |
| | return jsonify({'success': False, 'error': f'An error occurred during prediction: {str(e)}'}) |