Spaces:
Sleeping
Sleeping
| # app.py - Enhanced Gradio Fraud Detection App with Visualizations | |
| import time | |
| import os | |
| import shutil | |
| import tempfile | |
| import uuid | |
| import threading | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| import gradio as gr | |
| import plotly.graph_objects as go | |
| from io import BytesIO | |
| from sklearn.metrics import ( | |
| roc_auc_score, precision_recall_fscore_support, confusion_matrix, | |
| precision_recall_curve, average_precision_score, f1_score, roc_curve | |
| ) | |
| # Import Hugging Face Spaces GPU decorator (optional, only works on Spaces) | |
| try: | |
| import spaces | |
| SPACES_AVAILABLE = True | |
| except ImportError: | |
| SPACES_AVAILABLE = False | |
| spaces = None | |
| # Try to import GPU-accelerated libraries for data processing | |
| try: | |
| import cupy as cp | |
| CUPY_AVAILABLE = True | |
| print("β CuPy available - GPU acceleration enabled for data processing") | |
| except ImportError: | |
| CUPY_AVAILABLE = False | |
| cp = None | |
| print("β οΈ CuPy not available - using CPU for data processing") | |
| # Import Power BI export function | |
| try: | |
| from powerbi_export import export_powerbi_csvs | |
| POWERBI_AVAILABLE = True | |
| except ImportError: | |
| POWERBI_AVAILABLE = False | |
| print("β οΈ powerbi_export module not found. Power BI export will be skipped.") | |
| # Fix for Gradio API info generation bug | |
| # Monkey-patch to handle the TypeError in gradio_client.utils when schema is a bool | |
| try: | |
| import gradio_client.utils as client_utils | |
| # Patch get_type to handle bool values | |
| original_get_type = client_utils.get_type | |
| def patched_get_type(schema): | |
| """Patched version that handles bool schema values""" | |
| # Handle case where schema is a bool (True/False) instead of a dict | |
| if isinstance(schema, bool): | |
| return "bool" | |
| if not isinstance(schema, dict): | |
| return "Any" | |
| # Check if "const" key exists before using "in" operator | |
| if isinstance(schema, dict) and "const" in schema: | |
| return str(schema["const"]) | |
| try: | |
| return original_get_type(schema) | |
| except TypeError: | |
| # Fallback if original function fails | |
| return "Any" | |
| client_utils.get_type = patched_get_type | |
| # Also patch _json_schema_to_python_type to handle additionalProperties being a bool | |
| original_json_schema_to_python_type = client_utils._json_schema_to_python_type | |
| def patched_json_schema_to_python_type(schema, defs=None): | |
| """Patched version that handles bool additionalProperties""" | |
| if isinstance(schema, bool): | |
| return "Any" | |
| if isinstance(schema, dict) and "additionalProperties" in schema: | |
| additional_props = schema["additionalProperties"] | |
| if isinstance(additional_props, bool): | |
| # If additionalProperties is True, allow any properties | |
| # If False, no additional properties allowed | |
| schema = schema.copy() | |
| schema["additionalProperties"] = {"type": "object" if additional_props else "null"} | |
| try: | |
| return original_json_schema_to_python_type(schema, defs) | |
| except (TypeError, KeyError) as e: | |
| # Fallback to avoid crashes | |
| return "Any" | |
| client_utils._json_schema_to_python_type = patched_json_schema_to_python_type | |
| print("β Gradio API info bug patched") | |
| except Exception as e: | |
| print(f"β οΈ Could not patch Gradio client utils: {e}") | |
| # Get the directory where this script is located | |
| try: | |
| SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| except NameError: | |
| # Fallback if __file__ is not available (shouldn't happen in normal execution) | |
| SCRIPT_DIR = os.getcwd() | |
| print(f"β οΈ Warning: Could not determine script directory, using current working directory: {SCRIPT_DIR}") | |
| # Print paths for debugging | |
| print(f"π Script directory: {SCRIPT_DIR}") | |
| print(f"π Sample dataset path: {os.path.join(SCRIPT_DIR, 'sample_transactions.csv')}") | |
| print(f"π Dataset path: {os.path.join(SCRIPT_DIR, 'dataset', 'fraudTest.csv')}") | |
| print(f"π Model path: {os.path.join(SCRIPT_DIR, 'fraud_lgbm_calibrated.pkl')}") | |
| MODEL_PATH = os.path.join(SCRIPT_DIR, "fraud_lgbm_calibrated.pkl") | |
| SAMPLE_DATASET_PATH = os.path.join(SCRIPT_DIR, "sample_transactions.csv") | |
| DATASET_PATH = os.path.join(SCRIPT_DIR, "dataset", "fraudTest.csv") | |
| model = None | |
| def load_model(): | |
| """Load the model file if it exists.""" | |
| global model | |
| if model is not None: | |
| return model | |
| if not os.path.exists(MODEL_PATH): | |
| raise FileNotFoundError( | |
| f"Model file not found: {MODEL_PATH}\n" | |
| f"Please train the model first or download it from your repository.\n" | |
| f"The model file should be in the same directory as app.py" | |
| ) | |
| print("Loading model...") | |
| model = joblib.load(MODEL_PATH) | |
| print(f"β Model loaded! Features: {model.n_features_in_}") | |
| print(f" Model type: {type(model)}") | |
| # Test model with dummy data to verify it works | |
| try: | |
| test_features = np.zeros((1, model.n_features_in_)) | |
| test_pred = model.predict_proba(test_features)[:, 1] | |
| print(f" β Model test prediction: {test_pred[0]:.4f}") | |
| except Exception as e: | |
| print(f" β οΈ Model test failed: {e}") | |
| return model | |
| # Try to load model at startup (but don't crash if it fails) | |
| try: | |
| load_model() | |
| except FileNotFoundError as e: | |
| print(f"β οΈ {e}") | |
| print("β οΈ App will start but predictions will fail until model is available.") | |
| def score_batch(raw_df: pd.DataFrame, threshold: float = 0.5) -> pd.DataFrame: | |
| """ | |
| Batch scoring with the exact 25 features the calibrated model expects. | |
| Uses GPU acceleration for feature engineering when available. | |
| """ | |
| if raw_df is None or raw_df.empty: | |
| raise ValueError("No data provided") | |
| df = raw_df.copy() | |
| # === EXACT 25 FEATURES THE MODEL WAS TRAINED ON === | |
| exact_features = [ | |
| 'amt', 'city_pop', 'dayofweek', 'dayofyear', 'dist_category_bucket_idx', | |
| 'dist_home_merch', 'dow_cos', 'dow_sin', 'hour', 'hour_cos', 'hour_sin', | |
| 'is_business_hours', 'is_night', 'is_weekend', 'max_amt', 'mean_amt', | |
| 'median_amt', 'month', 'std_amt', 'time_since_last_txn', | |
| 'total_amt_last_1h', 'total_amt_last_24h', 'transaction_count', | |
| 'txn_count_last_1h', 'txn_count_last_24h' | |
| ] | |
| # Extract time features from unix_time if present | |
| if "unix_time" in df.columns: | |
| ts = pd.to_datetime(pd.to_numeric(df["unix_time"], errors="coerce"), unit="s", utc=True) | |
| df["hour"] = ts.dt.hour.fillna(0).astype(int) | |
| df["dayofweek"] = ts.dt.dayofweek.fillna(0).astype(int) | |
| df["month"] = ts.dt.month.fillna(1).astype(int) | |
| df["dayofyear"] = ts.dt.dayofyear.fillna(1).astype(int) | |
| else: | |
| # Default time features if not available | |
| df["hour"] = 12 # Noon as default | |
| df["dayofweek"] = 1 # Monday as default | |
| df["month"] = 6 # June as default | |
| df["dayofyear"] = 150 # Mid-year as default | |
| # Cyclic encodings for hour and dayofweek | |
| # Use GPU acceleration if available for large datasets | |
| for col, period, sin_col, cos_col in [ | |
| ("hour", 24, "hour_sin", "hour_cos"), | |
| ("dayofweek", 7, "dow_sin", "dow_cos") | |
| ]: | |
| if col in df.columns: | |
| # Use GPU for data processing if CuPy is available | |
| # For ZeroGPU: Always try GPU if available (even for small datasets) | |
| # For regular GPU: Use GPU for large datasets (>1000 rows) | |
| use_gpu = CUPY_AVAILABLE and (len(df) > 100 or os.getenv("SPACE_ID") is not None) | |
| if use_gpu: | |
| try: | |
| gpu_array = cp.asarray(df[col].values) | |
| angle = 2 * cp.pi * gpu_array / period | |
| df[sin_col] = cp.asnumpy(cp.sin(angle)) | |
| df[cos_col] = cp.asnumpy(cp.cos(angle)) | |
| if len(df) <= 100: | |
| print(f" β GPU used for {col} encoding (ZeroGPU context)") | |
| except Exception as e: | |
| # Fallback to CPU if GPU operation fails | |
| print(f" β οΈ GPU operation failed for {col}, using CPU: {e}") | |
| angle = 2 * np.pi * df[col] / period | |
| df[sin_col] = np.sin(angle) | |
| df[cos_col] = np.cos(angle) | |
| else: | |
| # CPU computation for small datasets or when CuPy unavailable | |
| angle = 2 * np.pi * df[col] / period | |
| df[sin_col] = np.sin(angle) | |
| df[cos_col] = np.cos(angle) | |
| # Time-based flags | |
| if "hour" in df.columns: | |
| df["is_weekend"] = (df["dayofweek"] >= 5).astype(int) | |
| df["is_night"] = ((df["hour"] >= 22) | (df["hour"] <= 5)).astype(int) | |
| df["is_business_hours"] = ((df["hour"] >= 9) & (df["hour"] <= 17)).astype(int) | |
| else: | |
| df["is_weekend"] = 0 | |
| df["is_night"] = 0 | |
| df["is_business_hours"] = 0 | |
| # Ensure base features exist with sensible defaults | |
| if "amt" not in df.columns: | |
| # Try to find amount column | |
| for col in ["amount", "Amount", "TransactionAmt"]: | |
| if col in df.columns: | |
| df["amt"] = df[col] | |
| break | |
| if "amt" not in df.columns: | |
| df["amt"] = 50.0 # Default amount | |
| if "city_pop" not in df.columns: | |
| df["city_pop"] = 100000.0 # Default city population | |
| if "dist_home_merch" not in df.columns: | |
| df["dist_home_merch"] = 5.0 # Default distance (5 km) | |
| # Create dist_category_bucket_idx from dist_category if available | |
| if "dist_category_bucket_idx" not in df.columns: | |
| if "dist_category" in df.columns: | |
| # Map distance categories to bucket indices | |
| dist_order = {"<1km": 0, "1-10km": 1, "10-50km": 2, "50-100km": 3, "100-500km": 4, "500km+": 5, ">=500km": 5} | |
| df["dist_category_bucket_idx"] = df["dist_category"].map(dist_order).fillna(3).astype(int) | |
| else: | |
| # Default to middle bucket (3 = 50-100km) | |
| df["dist_category_bucket_idx"] = 3 | |
| # Fill historical/velocity features with sensible defaults (not 0!) | |
| # These features typically have non-zero values in real data | |
| velocity_defaults = { | |
| 'txn_count_last_1h': 1.0, # At least this transaction | |
| 'total_amt_last_1h': df["amt"].fillna(50.0), # Use current amount as estimate | |
| 'txn_count_last_24h': 3.0, # Typical: 3 transactions per day | |
| 'total_amt_last_24h': df["amt"].fillna(50.0) * 3, # Estimate based on count | |
| } | |
| for col, default_val in velocity_defaults.items(): | |
| if col not in df.columns: | |
| if isinstance(default_val, pd.Series): | |
| df[col] = default_val | |
| else: | |
| df[col] = default_val | |
| # Fill aggregated features with sensible defaults | |
| amt_filled = df["amt"].fillna(50.0) | |
| aggregate_defaults = { | |
| 'time_since_last_txn': 3600.0, # 1 hour ago (in seconds) | |
| 'mean_amt': amt_filled, # Use current amount as mean | |
| 'std_amt': amt_filled * 0.3, # 30% of mean as std | |
| 'median_amt': amt_filled, # Use current amount as median | |
| 'max_amt': amt_filled, # Use current amount as max | |
| 'transaction_count': 1.0, # At least this transaction | |
| } | |
| for col, default_val in aggregate_defaults.items(): | |
| if col not in df.columns: | |
| if isinstance(default_val, pd.Series): | |
| df[col] = default_val | |
| else: | |
| df[col] = default_val | |
| # Use ONLY the 25 features, in sorted order (LightGBM is picky about feature order) | |
| feature_df = df[sorted(exact_features)].copy() | |
| # Final safety check: replace any NaN/inf with defaults | |
| feature_df = feature_df.replace([np.inf, -np.inf], np.nan) | |
| for col in feature_df.columns: | |
| if feature_df[col].isna().any(): | |
| # Use median or mean for that column | |
| fill_val = feature_df[col].median() if feature_df[col].notna().any() else 0.0 | |
| if pd.isna(fill_val): | |
| fill_val = 0.0 | |
| feature_df[col] = feature_df[col].fillna(fill_val) | |
| # Debug: Print feature statistics | |
| print(f"π Feature Engineering Summary:") | |
| print(f" - Features created: {len(exact_features)}") | |
| print(f" - Feature ranges:") | |
| for col in sorted(exact_features)[:5]: # Show first 5 | |
| if col in feature_df.columns: | |
| print(f" {col}: [{feature_df[col].min():.2f}, {feature_df[col].max():.2f}]") | |
| print(f" - Any NaN values: {feature_df.isna().sum().sum()}") | |
| # Load model if not already loaded | |
| try: | |
| current_model = load_model() | |
| except FileNotFoundError as e: | |
| raise FileNotFoundError( | |
| f"Model file not found: {MODEL_PATH}\n" | |
| f"Cannot perform predictions without the model.\n" | |
| f"Please ensure fraud_lgbm_calibrated.pkl is in the same directory as app.py" | |
| ) | |
| # Predict calibrated probabilities | |
| try: | |
| prob = current_model.predict_proba(feature_df)[:, 1] | |
| print(f" - Probability range: [{prob.min():.4f}, {prob.max():.4f}]") | |
| print(f" - Mean probability: {prob.mean():.4f}") | |
| except Exception as e: | |
| print(f"β Prediction error: {e}") | |
| print(f" Model expects {current_model.n_features_in_} features, got {feature_df.shape[1]}") | |
| print(f" Feature columns: {list(feature_df.columns)}") | |
| raise | |
| df["fraud_probability"] = prob | |
| df["fraud_prediction"] = (prob >= threshold).astype(int) | |
| # Risk levels | |
| conditions = [ | |
| df["fraud_probability"] < 0.1, | |
| df["fraud_probability"] < 0.5, | |
| df["fraud_probability"] < 0.9, | |
| ] | |
| choices = ["Low", "Medium", "High"] | |
| df["risk_level"] = np.select(conditions, choices, default="Critical") | |
| return df | |
| def create_visualizations(result_df: pd.DataFrame, threshold: float, model_metrics: dict = None): | |
| """Create comprehensive visualizations for fraud detection results.""" | |
| visualizations = [] | |
| # 1. Fraud Probability Distribution | |
| fig1 = go.Figure() | |
| if len(result_df) > 0: | |
| fig1.add_trace(go.Histogram( | |
| x=result_df["fraud_probability"], | |
| nbinsx=min(50, len(result_df) // 2), | |
| name="All Transactions", | |
| marker_color="lightblue", | |
| opacity=0.7 | |
| )) | |
| fraud_df = result_df[result_df["fraud_prediction"] == 1] | |
| if len(fraud_df) > 0: | |
| fig1.add_trace(go.Histogram( | |
| x=fraud_df["fraud_probability"], | |
| nbinsx=min(50, len(fraud_df) // 2), | |
| name="Flagged as Fraud", | |
| marker_color="red", | |
| opacity=0.8 | |
| )) | |
| fig1.add_vline( | |
| x=threshold, | |
| line_dash="dash", | |
| line_color="orange", | |
| annotation_text=f"Threshold: {threshold:.2f}" | |
| ) | |
| fig1.update_layout( | |
| title="Fraud Probability Distribution", | |
| xaxis_title="Fraud Probability", | |
| yaxis_title="Count", | |
| height=400, | |
| showlegend=True | |
| ) | |
| visualizations.append(fig1) | |
| # 2. Risk Level Breakdown (Pie Chart) | |
| if len(result_df) > 0: | |
| risk_counts = result_df["risk_level"].value_counts() | |
| color_map = {"Low": "green", "Medium": "yellow", "High": "orange", "Critical": "red"} | |
| colors = [color_map.get(label, "gray") for label in risk_counts.index] | |
| fig2 = go.Figure(data=[go.Pie( | |
| labels=risk_counts.index, | |
| values=risk_counts.values, | |
| hole=0.4, | |
| marker_colors=colors | |
| )]) | |
| fig2.update_layout( | |
| title="Risk Level Distribution", | |
| height=400 | |
| ) | |
| else: | |
| fig2 = go.Figure() | |
| fig2.update_layout(title="Risk Level Distribution (No Data)", height=400) | |
| visualizations.append(fig2) | |
| # 3. Amount vs Fraud Probability (Scatter) | |
| amount_col = None | |
| for col in ["amt", "amount", "Amount", "TransactionAmt"]: | |
| if col in result_df.columns: | |
| amount_col = col | |
| break | |
| if amount_col and len(result_df) > 0: | |
| fig3 = go.Figure() | |
| fraud_df = result_df[result_df["fraud_prediction"] == 1] | |
| normal_df = result_df[result_df["fraud_prediction"] == 0] | |
| if len(normal_df) > 0: | |
| fig3.add_trace(go.Scatter( | |
| x=normal_df[amount_col], | |
| y=normal_df["fraud_probability"], | |
| mode="markers", | |
| name="Normal", | |
| marker=dict(color="blue", size=4, opacity=0.5) | |
| )) | |
| if len(fraud_df) > 0: | |
| fig3.add_trace(go.Scatter( | |
| x=fraud_df[amount_col], | |
| y=fraud_df["fraud_probability"], | |
| mode="markers", | |
| name="Fraud", | |
| marker=dict(color="red", size=6, opacity=0.8) | |
| )) | |
| fig3.add_hline( | |
| y=threshold, | |
| line_dash="dash", | |
| line_color="orange", | |
| annotation_text=f"Threshold: {threshold:.2f}" | |
| ) | |
| fig3.update_layout( | |
| title="Transaction Amount vs Fraud Probability", | |
| xaxis_title=f"Amount ({amount_col})", | |
| yaxis_title="Fraud Probability", | |
| height=400, | |
| showlegend=True | |
| ) | |
| visualizations.append(fig3) | |
| else: | |
| fig3 = go.Figure() | |
| fig3.update_layout( | |
| title="Transaction Amount vs Fraud Probability (Amount column not found)", | |
| height=400 | |
| ) | |
| visualizations.append(fig3) | |
| # 4. Fraud Statistics Bar Chart | |
| if len(result_df) > 0: | |
| total = len(result_df) | |
| fraud_count = int(result_df["fraud_prediction"].sum()) | |
| fraud_rate = (fraud_count / total) * 100 if total > 0 else 0.0 | |
| fig4 = go.Figure(data=[ | |
| go.Bar( | |
| x=["Total Transactions", "Fraud Detected", "Normal"], | |
| y=[total, fraud_count, total - fraud_count], | |
| marker_color=["blue", "red", "green"], | |
| text=[f"{total:,}", f"{fraud_count:,}", f"{total - fraud_count:,}"], | |
| textposition="auto" | |
| ) | |
| ]) | |
| fig4.update_layout( | |
| title="Transaction Summary", | |
| yaxis_title="Count", | |
| height=400 | |
| ) | |
| else: | |
| fig4 = go.Figure() | |
| fig4.update_layout(title="Transaction Summary (No Data)", height=400) | |
| visualizations.append(fig4) | |
| # 5. Fraud Probability Over Time (if time column exists) | |
| time_col = None | |
| for col in ["Time", "time", "unix_time", "timestamp"]: | |
| if col in result_df.columns: | |
| time_col = col | |
| break | |
| if time_col and len(result_df) > 0: | |
| # Sort by time and create rolling average | |
| sorted_df = result_df.sort_values(time_col).copy() | |
| window_size = max(1, min(100, len(sorted_df) // 10)) | |
| sorted_df["rolling_avg"] = sorted_df["fraud_probability"].rolling( | |
| window=window_size, center=True | |
| ).mean() | |
| fig5 = go.Figure() | |
| fig5.add_trace(go.Scatter( | |
| x=sorted_df[time_col], | |
| y=sorted_df["fraud_probability"], | |
| mode="markers", | |
| name="Individual", | |
| marker=dict(size=3, opacity=0.3, color="lightblue") | |
| )) | |
| if sorted_df["rolling_avg"].notna().any(): | |
| fig5.add_trace(go.Scatter( | |
| x=sorted_df[time_col], | |
| y=sorted_df["rolling_avg"], | |
| mode="lines", | |
| name="Rolling Average", | |
| line=dict(color="blue", width=2) | |
| )) | |
| fig5.add_hline( | |
| y=threshold, | |
| line_dash="dash", | |
| line_color="orange", | |
| annotation_text=f"Threshold: {threshold:.2f}" | |
| ) | |
| fig5.update_layout( | |
| title="Fraud Probability Over Time", | |
| xaxis_title=time_col, | |
| yaxis_title="Fraud Probability", | |
| height=400, | |
| showlegend=True | |
| ) | |
| visualizations.append(fig5) | |
| else: | |
| fig5 = go.Figure() | |
| fig5.update_layout( | |
| title="Fraud Probability Over Time (Time column not found)", | |
| height=400 | |
| ) | |
| visualizations.append(fig5) | |
| # 6. Top 20 Most Suspicious Transactions | |
| if len(result_df) > 0: | |
| top_suspicious = result_df.nlargest(min(20, len(result_df)), "fraud_probability") | |
| fig6 = go.Figure(data=[ | |
| go.Bar( | |
| x=[str(i) for i in top_suspicious.index], | |
| y=top_suspicious["fraud_probability"], | |
| marker_color=top_suspicious["fraud_probability"], | |
| marker_colorscale="Reds", | |
| text=top_suspicious["fraud_probability"].round(3), | |
| textposition="auto" | |
| ) | |
| ]) | |
| fig6.update_layout( | |
| title="Top 20 Most Suspicious Transactions", | |
| xaxis_title="Transaction Index", | |
| yaxis_title="Fraud Probability", | |
| height=400 | |
| ) | |
| visualizations.append(fig6) | |
| else: | |
| fig6 = go.Figure() | |
| fig6.update_layout(title="Top 20 Most Suspicious Transactions (No Data)", height=400) | |
| visualizations.append(fig6) | |
| # 7. Model Performance Metrics (if available) | |
| if model_metrics and len(model_metrics) > 0: | |
| fig7 = go.Figure() | |
| # Create metrics bar chart - include PR-AUC for imbalanced data | |
| metrics_names = ['ROC-AUC', 'PR-AUC', 'Precision', 'Recall', 'F1-Score'] | |
| metrics_values = [ | |
| model_metrics.get('roc_auc', 0), | |
| model_metrics.get('pr_auc', 0), | |
| model_metrics.get('precision', 0), | |
| model_metrics.get('recall', 0), | |
| model_metrics.get('f1', 0) | |
| ] | |
| colors = ['#1f77b4', '#9467bd', '#ff7f0e', '#2ca02c', '#d62728'] | |
| fig7.add_trace(go.Bar( | |
| x=metrics_names, | |
| y=metrics_values, | |
| marker_color=colors, | |
| text=[f'{v:.4f}' for v in metrics_values], | |
| textposition='auto' | |
| )) | |
| fig7.update_layout( | |
| title="Model Performance Metrics (PR-AUC better for imbalanced data)", | |
| yaxis_title="Score", | |
| yaxis_range=[0, 1], | |
| height=400 | |
| ) | |
| visualizations.append(fig7) | |
| # 8. Confusion Matrix Heatmap | |
| if all(k in model_metrics for k in ['true_negatives', 'false_positives', 'false_negatives', 'true_positives']): | |
| fig8 = go.Figure(data=go.Heatmap( | |
| z=[[model_metrics['true_negatives'], model_metrics['false_positives']], | |
| [model_metrics['false_negatives'], model_metrics['true_positives']]], | |
| x=['Predicted Normal', 'Predicted Fraud'], | |
| y=['Actual Normal', 'Actual Fraud'], | |
| colorscale='Blues', | |
| text=[[model_metrics['true_negatives'], model_metrics['false_positives']], | |
| [model_metrics['false_negatives'], model_metrics['true_positives']]], | |
| texttemplate='%{text}', | |
| textfont={"size": 16}, | |
| colorbar=dict(title="Count") | |
| )) | |
| fig8.update_layout( | |
| title="Confusion Matrix", | |
| height=400 | |
| ) | |
| visualizations.append(fig8) | |
| else: | |
| visualizations.append(None) | |
| else: | |
| # Add None placeholders if no metrics | |
| visualizations.append(None) | |
| visualizations.append(None) | |
| # ============================================================================ | |
| # ADVANCED VISUALIZATIONS | |
| # ============================================================================ | |
| # 9. ROC Curve (if ground truth available) | |
| if model_metrics and 'roc_auc' in model_metrics: | |
| try: | |
| # Get ground truth and predictions | |
| ground_truth_cols = ["is_fraud", "fraud", "target", "label", "isFraud"] | |
| y_true = None | |
| for col in ground_truth_cols: | |
| if col in result_df.columns: | |
| y_true = result_df[col] | |
| break | |
| if y_true is not None: | |
| y_prob = result_df["fraud_probability"] | |
| fpr, tpr, roc_thresholds = roc_curve(y_true, y_prob) | |
| fig9 = go.Figure() | |
| fig9.add_trace(go.Scatter( | |
| x=fpr, | |
| y=tpr, | |
| mode='lines', | |
| name=f'ROC Curve (AUC = {model_metrics["roc_auc"]:.4f})', | |
| line=dict(color='blue', width=2) | |
| )) | |
| fig9.add_trace(go.Scatter( | |
| x=[0, 1], | |
| y=[0, 1], | |
| mode='lines', | |
| name='Random Classifier', | |
| line=dict(color='red', dash='dash', width=1) | |
| )) | |
| fig9.update_layout( | |
| title="ROC Curve (Receiver Operating Characteristic)", | |
| xaxis_title="False Positive Rate", | |
| yaxis_title="True Positive Rate", | |
| height=400, | |
| showlegend=True | |
| ) | |
| visualizations.append(fig9) | |
| else: | |
| visualizations.append(None) | |
| except Exception as e: | |
| print(f"Warning: Could not create ROC curve: {e}") | |
| visualizations.append(None) | |
| else: | |
| visualizations.append(None) | |
| # 10. Precision-Recall Curve (if ground truth available) | |
| if model_metrics and 'pr_auc' in model_metrics: | |
| try: | |
| ground_truth_cols = ["is_fraud", "fraud", "target", "label", "isFraud"] | |
| y_true = None | |
| for col in ground_truth_cols: | |
| if col in result_df.columns: | |
| y_true = result_df[col] | |
| break | |
| if y_true is not None: | |
| y_prob = result_df["fraud_probability"] | |
| precisions, recalls, pr_thresholds = precision_recall_curve(y_true, y_prob) | |
| fig10 = go.Figure() | |
| fig10.add_trace(go.Scatter( | |
| x=recalls, | |
| y=precisions, | |
| mode='lines', | |
| name=f'PR Curve (AUC = {model_metrics["pr_auc"]:.4f})', | |
| line=dict(color='green', width=2), | |
| fill='tonexty' | |
| )) | |
| fig10.add_hline( | |
| y=y_true.mean(), | |
| line_dash="dash", | |
| line_color="orange", | |
| annotation_text=f"Baseline (Fraud Rate: {y_true.mean():.2%})" | |
| ) | |
| fig10.update_layout( | |
| title="Precision-Recall Curve (Better for Imbalanced Data)", | |
| xaxis_title="Recall", | |
| yaxis_title="Precision", | |
| height=400, | |
| showlegend=True | |
| ) | |
| visualizations.append(fig10) | |
| else: | |
| visualizations.append(None) | |
| except Exception as e: | |
| print(f"Warning: Could not create PR curve: {e}") | |
| visualizations.append(None) | |
| else: | |
| visualizations.append(None) | |
| # 11. Amount Distribution Comparison (Box Plot) | |
| amount_col = None | |
| for col in ["amt", "amount", "Amount", "TransactionAmt"]: | |
| if col in result_df.columns: | |
| amount_col = col | |
| break | |
| if amount_col and len(result_df) > 0: | |
| fraud_df = result_df[result_df["fraud_prediction"] == 1] | |
| normal_df = result_df[result_df["fraud_prediction"] == 0] | |
| fig11 = go.Figure() | |
| if len(normal_df) > 0: | |
| fig11.add_trace(go.Box( | |
| y=normal_df[amount_col], | |
| name="Normal Transactions", | |
| marker_color="blue", | |
| boxmean='sd' | |
| )) | |
| if len(fraud_df) > 0: | |
| fig11.add_trace(go.Box( | |
| y=fraud_df[amount_col], | |
| name="Flagged as Fraud", | |
| marker_color="red", | |
| boxmean='sd' | |
| )) | |
| fig11.update_layout( | |
| title="Transaction Amount Distribution Comparison", | |
| yaxis_title=f"Amount ({amount_col})", | |
| height=400, | |
| showlegend=True | |
| ) | |
| visualizations.append(fig11) | |
| else: | |
| fig11 = go.Figure() | |
| fig11.update_layout( | |
| title="Transaction Amount Distribution (Amount column not found)", | |
| height=400 | |
| ) | |
| visualizations.append(fig11) | |
| # 12. Fraud Probability Distribution Comparison (Violin Plot) | |
| if len(result_df) > 0: | |
| ground_truth_cols = ["is_fraud", "fraud", "target", "label", "isFraud"] | |
| has_gt = any(col in result_df.columns for col in ground_truth_cols) | |
| if has_gt: | |
| gt_col = next((col for col in ground_truth_cols if col in result_df.columns), None) | |
| if gt_col: | |
| fraud_probs = result_df[result_df[gt_col] == 1]["fraud_probability"] | |
| normal_probs = result_df[result_df[gt_col] == 0]["fraud_probability"] | |
| fig12 = go.Figure() | |
| if len(normal_probs) > 0: | |
| fig12.add_trace(go.Violin( | |
| y=normal_probs, | |
| name="Actual Normal", | |
| box_visible=True, | |
| meanline_visible=True, | |
| marker_color="blue" | |
| )) | |
| if len(fraud_probs) > 0: | |
| fig12.add_trace(go.Violin( | |
| y=fraud_probs, | |
| name="Actual Fraud", | |
| box_visible=True, | |
| meanline_visible=True, | |
| marker_color="red" | |
| )) | |
| fig12.add_hline( | |
| y=threshold, | |
| line_dash="dash", | |
| line_color="orange", | |
| annotation_text=f"Threshold: {threshold:.2f}" | |
| ) | |
| fig12.update_layout( | |
| title="Fraud Probability Distribution: Actual Fraud vs Normal", | |
| yaxis_title="Fraud Probability", | |
| height=400, | |
| showlegend=True | |
| ) | |
| visualizations.append(fig12) | |
| else: | |
| visualizations.append(None) | |
| else: | |
| # Use predicted labels if no ground truth | |
| fraud_probs = result_df[result_df["fraud_prediction"] == 1]["fraud_probability"] | |
| normal_probs = result_df[result_df["fraud_prediction"] == 0]["fraud_probability"] | |
| fig12 = go.Figure() | |
| if len(normal_probs) > 0: | |
| fig12.add_trace(go.Violin( | |
| y=normal_probs, | |
| name="Predicted Normal", | |
| box_visible=True, | |
| meanline_visible=True, | |
| marker_color="blue" | |
| )) | |
| if len(fraud_probs) > 0: | |
| fig12.add_trace(go.Violin( | |
| y=fraud_probs, | |
| name="Predicted Fraud", | |
| box_visible=True, | |
| meanline_visible=True, | |
| marker_color="red" | |
| )) | |
| fig12.add_hline( | |
| y=threshold, | |
| line_dash="dash", | |
| line_color="orange", | |
| annotation_text=f"Threshold: {threshold:.2f}" | |
| ) | |
| fig12.update_layout( | |
| title="Fraud Probability Distribution: Predicted Fraud vs Normal", | |
| yaxis_title="Fraud Probability", | |
| height=400, | |
| showlegend=True | |
| ) | |
| visualizations.append(fig12) | |
| else: | |
| visualizations.append(None) | |
| # 13. Feature Correlation Heatmap (if multiple features available) | |
| if len(result_df) > 0: | |
| try: | |
| # Select numeric features for correlation | |
| numeric_cols = result_df.select_dtypes(include=[np.number]).columns.tolist() | |
| # Exclude probability and prediction columns for cleaner visualization | |
| exclude_cols = ['fraud_probability', 'fraud_prediction', 'is_fraud', 'fraud', 'target', 'label'] | |
| feature_cols = [col for col in numeric_cols if col not in exclude_cols] | |
| # Limit to top 15 features for readability | |
| if len(feature_cols) > 15: | |
| # Select features with highest variance | |
| variances = result_df[feature_cols].var().sort_values(ascending=False) | |
| feature_cols = variances.head(15).index.tolist() | |
| if len(feature_cols) > 1: | |
| corr_matrix = result_df[feature_cols + ['fraud_probability']].corr() | |
| fig13 = go.Figure(data=go.Heatmap( | |
| z=corr_matrix.values, | |
| x=corr_matrix.columns, | |
| y=corr_matrix.index, | |
| colorscale='RdBu', | |
| zmid=0, | |
| text=corr_matrix.round(2).values, | |
| texttemplate='%{text}', | |
| textfont={"size": 10}, | |
| colorbar=dict(title="Correlation") | |
| )) | |
| fig13.update_layout( | |
| title="Feature Correlation Heatmap (Top Features)", | |
| height=500, | |
| width=600 | |
| ) | |
| visualizations.append(fig13) | |
| else: | |
| visualizations.append(None) | |
| except Exception as e: | |
| print(f"Warning: Could not create correlation heatmap: {e}") | |
| visualizations.append(None) | |
| else: | |
| visualizations.append(None) | |
| # 14. Cumulative Fraud Detection Over Time | |
| time_col = None | |
| for col in ["Time", "time", "unix_time", "timestamp"]: | |
| if col in result_df.columns: | |
| time_col = col | |
| break | |
| if time_col and len(result_df) > 0: | |
| sorted_df = result_df.sort_values(time_col).copy() | |
| sorted_df['cumulative_fraud'] = sorted_df['fraud_prediction'].cumsum() | |
| sorted_df['cumulative_total'] = range(1, len(sorted_df) + 1) | |
| sorted_df['fraud_rate'] = sorted_df['cumulative_fraud'] / sorted_df['cumulative_total'] | |
| fig14 = go.Figure() | |
| fig14.add_trace(go.Scatter( | |
| x=sorted_df[time_col], | |
| y=sorted_df['cumulative_fraud'], | |
| mode='lines', | |
| name='Cumulative Fraud Detected', | |
| line=dict(color='red', width=2) | |
| )) | |
| fig14.add_trace(go.Scatter( | |
| x=sorted_df[time_col], | |
| y=sorted_df['fraud_rate'] * 100, | |
| mode='lines', | |
| name='Fraud Rate (%)', | |
| yaxis='y2', | |
| line=dict(color='orange', width=2, dash='dash') | |
| )) | |
| fig14.update_layout( | |
| title="Cumulative Fraud Detection Over Time", | |
| xaxis_title=time_col, | |
| yaxis_title="Cumulative Fraud Count", | |
| yaxis2=dict( | |
| title="Fraud Rate (%)", | |
| overlaying='y', | |
| side='right' | |
| ), | |
| height=400, | |
| showlegend=True | |
| ) | |
| visualizations.append(fig14) | |
| else: | |
| visualizations.append(None) | |
| # 15. Threshold Sensitivity Analysis (if ground truth available) | |
| if model_metrics: | |
| try: | |
| ground_truth_cols = ["is_fraud", "fraud", "target", "label", "isFraud"] | |
| y_true = None | |
| for col in ground_truth_cols: | |
| if col in result_df.columns: | |
| y_true = result_df[col] | |
| break | |
| if y_true is not None and len(y_true) > 0: | |
| y_prob = result_df["fraud_probability"] | |
| thresholds = np.linspace(0.01, 0.99, 50) | |
| precisions_list = [] | |
| recalls_list = [] | |
| f1_scores_list = [] | |
| for t in thresholds: | |
| y_pred = (y_prob >= t).astype(int) | |
| if y_pred.sum() > 0: | |
| prec, rec, f1, _ = precision_recall_fscore_support( | |
| y_true, y_pred, average='binary', zero_division=0 | |
| ) | |
| precisions_list.append(prec) | |
| recalls_list.append(rec) | |
| f1_scores_list.append(f1) | |
| else: | |
| precisions_list.append(0) | |
| recalls_list.append(0) | |
| f1_scores_list.append(0) | |
| fig15 = go.Figure() | |
| fig15.add_trace(go.Scatter( | |
| x=thresholds, | |
| y=precisions_list, | |
| mode='lines', | |
| name='Precision', | |
| line=dict(color='blue', width=2) | |
| )) | |
| fig15.add_trace(go.Scatter( | |
| x=thresholds, | |
| y=recalls_list, | |
| mode='lines', | |
| name='Recall', | |
| line=dict(color='green', width=2) | |
| )) | |
| fig15.add_trace(go.Scatter( | |
| x=thresholds, | |
| y=f1_scores_list, | |
| mode='lines', | |
| name='F1-Score', | |
| line=dict(color='red', width=2) | |
| )) | |
| fig15.add_vline( | |
| x=threshold, | |
| line_dash="dash", | |
| line_color="orange", | |
| annotation_text=f"Current: {threshold:.2f}" | |
| ) | |
| fig15.update_layout( | |
| title="Threshold Sensitivity Analysis", | |
| xaxis_title="Threshold", | |
| yaxis_title="Score", | |
| height=400, | |
| showlegend=True | |
| ) | |
| visualizations.append(fig15) | |
| else: | |
| visualizations.append(None) | |
| except Exception as e: | |
| print(f"Warning: Could not create threshold sensitivity: {e}") | |
| visualizations.append(None) | |
| else: | |
| visualizations.append(None) | |
| return visualizations | |
| def predict_fraud_enhanced(file, threshold: float = 0.5): | |
| """ | |
| Enhanced fraud detection with comprehensive visualizations and GPU acceleration support. | |
| """ | |
| print("=" * 60) | |
| print("FUNCTION CALLED: predict_fraud_enhanced") | |
| print(f"File received: {file}") | |
| print(f"Threshold: {threshold}") | |
| print("=" * 60) | |
| if file is None: | |
| print("ERROR: File is None") | |
| return ( | |
| "Please upload a CSV file.", | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| try: | |
| start = time.time() | |
| # Read CSV (limit to 10000 rows for performance) | |
| # Handle different Gradio file input formats (Gradio 6.0 compatibility) | |
| # With type="filepath", Gradio returns a string path directly | |
| file_path = None | |
| # Method 1: Direct string path (most common in Gradio 6.0) | |
| # Also handle if user provides a relative path like "dataset/fraudTest.csv" | |
| if isinstance(file, str): | |
| file_path = file | |
| # If it's a relative path and exists, use it directly | |
| if not os.path.isabs(file_path) and os.path.exists(file_path): | |
| print(f"DEBUG: Using relative path: {file_path}") | |
| else: | |
| print(f"DEBUG: File is string: {file_path}") | |
| # Method 2: FileData object (check all possible attributes) | |
| elif file is not None: | |
| # Try all possible file path attributes | |
| for attr in ['path', 'name', 'orig_name', 'file_path', 'file_name']: | |
| if hasattr(file, attr): | |
| value = getattr(file, attr) | |
| if value and isinstance(value, str) and len(value) > 3: | |
| file_path = value | |
| print(f"DEBUG: Found file path via {attr}: {file_path}") | |
| break | |
| # If still no path, try tuple/list | |
| if not file_path and isinstance(file, (list, tuple)) and len(file) > 0: | |
| file_path = file[0] if isinstance(file[0], str) else str(file[0]) | |
| print(f"DEBUG: File is tuple/list: {file_path}") | |
| # Last resort: convert to string | |
| if not file_path: | |
| file_str = str(file) | |
| print(f"DEBUG: File as string: {file_str}") | |
| # Check if it's a valid path-like string | |
| if file_str and file_str != 'file' and file_str != 'None' and len(file_str) > 3: | |
| if os.path.sep in file_str or file_str.endswith('.csv'): | |
| file_path = file_str | |
| print(f"DEBUG: Extracted path from string: {file_path}") | |
| print(f"DEBUG: Final file_path: {file_path}") | |
| # Validate file path exists | |
| if not file_path: | |
| error_msg = f"Error: Could not determine file path. File type: {type(file)}, File value: {file}" | |
| print(error_msg) | |
| return ( | |
| error_msg, | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| if not os.path.exists(file_path): | |
| error_msg = f"Error: File not found at path: {file_path}. Please try uploading the file again." | |
| print(error_msg) | |
| return ( | |
| error_msg, | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| print(f"DEBUG: File exists, reading CSV from: {file_path}") | |
| # Read CSV with error handling for large files and permission issues | |
| # Windows/Gradio temp files often have permission issues, so we try multiple methods | |
| df = None | |
| read_methods_tried = [] | |
| # Method 1: Try reading in binary mode first (most reliable for locked files) | |
| try: | |
| print("Trying binary read method (most reliable for Windows temp files)...") | |
| with open(file_path, 'rb') as f: | |
| content = f.read() | |
| df = pd.read_csv(BytesIO(content), nrows=10000, low_memory=False) | |
| print("β File read successfully using binary method") | |
| except (PermissionError, IOError, OSError) as binary_error: | |
| read_methods_tried.append(f"Binary read: {str(binary_error)}") | |
| print(f"β Binary read failed, trying direct read...") | |
| # Method 2: Try to read directly | |
| try: | |
| df = pd.read_csv(file_path, nrows=10000, low_memory=False) | |
| print("β File read successfully using direct method") | |
| except (PermissionError, IOError, OSError) as perm_error: | |
| read_methods_tried.append(f"Direct read: {str(perm_error)}") | |
| print(f"β Permission denied, trying copy methods...") | |
| # Method 3: Try copying to project directory (more reliable) | |
| try: | |
| print("Trying copy to project directory...") | |
| project_temp = os.path.join(os.getcwd(), f"temp_upload_{uuid.uuid4().hex[:8]}.csv") | |
| # Use shutil.copy instead of copy2 to avoid metadata issues | |
| shutil.copy(file_path, project_temp) | |
| # Small delay to ensure file is fully copied | |
| time.sleep(0.1) | |
| df = pd.read_csv(project_temp, nrows=10000, low_memory=False) | |
| # Clean up | |
| try: | |
| os.remove(project_temp) | |
| print("β Temporary file cleaned up") | |
| except: | |
| pass | |
| print("β File read successfully after copying to project directory") | |
| except Exception as copy_error: | |
| read_methods_tried.append(f"Copy method: {str(copy_error)}") | |
| # Method 4: Try system temp directory with different approach | |
| try: | |
| print("Trying system temp directory with alternative method...") | |
| temp_dir = tempfile.gettempdir() | |
| temp_filename = f"gradio_upload_{uuid.uuid4().hex[:8]}.csv" | |
| temp_file = os.path.join(temp_dir, temp_filename) | |
| # Read and write in chunks to avoid locking | |
| with open(file_path, 'rb') as src, open(temp_file, 'wb') as dst: | |
| shutil.copyfileobj(src, dst) | |
| df = pd.read_csv(temp_file, nrows=10000, low_memory=False) | |
| # Clean up | |
| try: | |
| os.remove(temp_file) | |
| except: | |
| pass | |
| print("β File read successfully using system temp directory") | |
| except Exception as temp_error: | |
| read_methods_tried.append(f"System temp: {str(temp_error)}") | |
| # All methods failed - provide helpful error message | |
| error_details = "\n".join([f"- {m}" for m in read_methods_tried]) | |
| return ( | |
| f"β Error: Could not read file due to permission/access issues.\n\n" | |
| f"**Quick Fix:**\n" | |
| f"1. **Copy the file to the project folder first** (e.g., `dataset/fraudTest.csv`)\n" | |
| f"2. Then upload it from there - this avoids Windows temp file permission issues\n\n" | |
| f"**Alternative Solutions:**\n" | |
| f"- Close any programs that might be using the file (Excel, text editor, etc.)\n" | |
| f"- Try uploading the file again (sometimes works on retry)\n" | |
| f"- If using Windows, try running the app as Administrator\n" | |
| f"- Use the file directly: The app can process `dataset/fraudTest.csv` if it exists\n\n" | |
| f"**Why this happens:**\n" | |
| f"Gradio stores uploaded files in Windows temp directory which can have permission restrictions.\n" | |
| f"Copying the file to your project folder first avoids this issue.\n\n" | |
| f"**Technical details:**\n{error_details}", | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| except Exception as csv_error: | |
| # Handle CSV parsing errors separately | |
| if "Permission denied" in str(csv_error) or "Errno 13" in str(csv_error): | |
| return ( | |
| f"β Permission Error: Could not access the uploaded file.\n\n" | |
| f"**Solution:** Copy your CSV file to the project folder (e.g., `dataset/fraudTest.csv`) " | |
| f"and upload it from there. This avoids Windows temp file permission issues.\n\n" | |
| f"**Error:** {str(csv_error)}", | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| return ( | |
| f"Error reading CSV file: {str(csv_error)}\n\nPlease ensure the file is a valid CSV format.", | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| if df is None: | |
| return ( | |
| "Error: Could not read the CSV file. Please try uploading again.", | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| if df.empty: | |
| return ( | |
| "CSV is empty.", | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| print(f"Processing {len(df)} rows...") | |
| print(f"CSV columns: {list(df.columns)[:10]}...") # Print first 10 columns for debugging | |
| print(f"Total columns in CSV: {len(df.columns)}") | |
| # Score the batch | |
| try: | |
| print("Starting score_batch...") | |
| result_df = score_batch(df, threshold=threshold) | |
| print(f"Score_batch completed. Result shape: {result_df.shape}") | |
| print(f"Fraud probabilities range: {result_df['fraud_probability'].min():.4f} to {result_df['fraud_probability'].max():.4f}") | |
| except FileNotFoundError as model_error: | |
| # Handle missing model file with user-friendly message | |
| error_msg = ( | |
| f"β **Model File Not Found**\n\n" | |
| f"The fraud detection model (`fraud_lgbm_calibrated.pkl`) is required to make predictions.\n\n" | |
| f"**To fix this:**\n" | |
| f"1. Download the model file from your GitHub repository\n" | |
| f"2. Place it in the same directory as `app.py`\n" | |
| f"3. Or train a new model using the notebook: `notebooks/eda_and_feature_engineering.ipynb`\n\n" | |
| f"**Error details:** {str(model_error)}" | |
| ) | |
| print(f"MODEL ERROR: {error_msg}") | |
| return ( | |
| error_msg, | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| except Exception as score_error: | |
| import traceback | |
| error_details = f"Error during scoring: {str(score_error)}\n\n{traceback.format_exc()}" | |
| print(f"SCORING ERROR: {error_details}") | |
| return ( | |
| f"β {error_details}", | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| # Calculate statistics | |
| total = len(result_df) | |
| fraud_count = int(result_df["fraud_prediction"].sum()) | |
| fraud_rate = (fraud_count / total) * 100 if total > 0 else 0.0 | |
| avg_prob = result_df["fraud_probability"].mean() | |
| max_prob = result_df["fraud_probability"].max() | |
| min_prob = result_df["fraud_probability"].min() | |
| median_prob = result_df["fraud_probability"].median() | |
| elapsed = time.time() - start | |
| # Calculate model performance metrics if ground truth is available | |
| model_metrics = {} | |
| has_ground_truth = False | |
| # Check for ground truth labels (common column names) | |
| ground_truth_cols = ["is_fraud", "fraud", "target", "label", "isFraud"] | |
| y_true = None | |
| for col in ground_truth_cols: | |
| if col in result_df.columns: | |
| y_true = result_df[col] | |
| has_ground_truth = True | |
| print(f"Found ground truth column: {col}") | |
| break | |
| if has_ground_truth and y_true is not None: | |
| try: | |
| y_prob = result_df["fraud_probability"] | |
| y_pred = result_df["fraud_prediction"] | |
| # Calculate metrics at current threshold | |
| model_metrics["roc_auc"] = roc_auc_score(y_true, y_prob) | |
| precision, recall, f1, _ = precision_recall_fscore_support( | |
| y_true, y_pred, average="binary", zero_division=0 | |
| ) | |
| model_metrics["precision"] = float(precision) | |
| model_metrics["recall"] = float(recall) | |
| model_metrics["f1"] = float(f1) | |
| # PR-AUC (better for imbalanced datasets) | |
| model_metrics["pr_auc"] = average_precision_score(y_true, y_prob) | |
| # Confusion matrix | |
| cm = confusion_matrix(y_true, y_pred) | |
| model_metrics["true_negatives"] = int(cm[0, 0]) | |
| model_metrics["false_positives"] = int(cm[0, 1]) | |
| model_metrics["false_negatives"] = int(cm[1, 0]) | |
| model_metrics["true_positives"] = int(cm[1, 1]) | |
| print(f"Model metrics calculated: AUC={model_metrics['roc_auc']:.4f}, PR-AUC={model_metrics['pr_auc']:.4f}") | |
| except Exception as e: | |
| print(f"Warning: Could not calculate metrics: {str(e)}") | |
| import traceback | |
| print(traceback.format_exc()) | |
| has_ground_truth = False | |
| # Create summary text with model metrics | |
| summary = f""" | |
| ## π Fraud Detection Results | |
| **Processing Summary:** | |
| - β Processed **{total:,}** transactions in **{elapsed:.2f}s** | |
| - π― Detection threshold: **{threshold:.2f}** | |
| **Fraud Statistics:** | |
| - π¨ **Fraud flagged:** {fraud_count:,} transactions ({fraud_rate:.2f}%) | |
| - β **Normal transactions:** {total - fraud_count:,} ({(100 - fraud_rate):.2f}%) | |
| **Probability Statistics:** | |
| - π **Average probability:** {avg_prob:.4f} | |
| - π **Median probability:** {median_prob:.4f} | |
| - β¬οΈ **Maximum probability:** {max_prob:.4f} | |
| - β¬οΈ **Minimum probability:** {min_prob:.4f} | |
| **Risk Level Breakdown:** | |
| {result_df['risk_level'].value_counts().to_string()} | |
| """ | |
| # Add model performance metrics if available | |
| if has_ground_truth and model_metrics: | |
| # Calculate additional metrics | |
| tp = model_metrics.get('true_positives', 0) | |
| fp = model_metrics.get('false_positives', 0) | |
| fn = model_metrics.get('false_negatives', 0) | |
| tn = model_metrics.get('true_negatives', 0) | |
| summary += f""" | |
| --- | |
| ## π― Model Performance Metrics | |
| **Current Threshold ({threshold:.2f}) Performance:** | |
| - π **ROC-AUC:** {model_metrics['roc_auc']:.4f} {'β Good discrimination' if model_metrics['roc_auc'] > 0.8 else 'β οΈ Needs improvement'} | |
| - π **PR-AUC (Better for imbalanced data):** {model_metrics.get('pr_auc', 0):.4f} {'β Good for imbalanced data' if model_metrics.get('pr_auc', 0) > 0.1 else 'β οΈ Very low - model struggles with imbalance'} | |
| - π― **Precision:** {model_metrics['precision']:.4f} ({int((1-model_metrics['precision'])*100)}% false positives) | |
| - π **Recall:** {model_metrics['recall']:.4f} ({int((1-model_metrics['recall'])*100)}% fraud missed) | |
| - βοΈ **F1-Score:** {model_metrics['f1']:.4f} {'β Balanced' if model_metrics['f1'] > 0.2 else 'β οΈ Poor balance'} | |
| **Confusion Matrix:** | |
| - β **True Negatives (Correctly identified normal):** {tn:,} | |
| - β **False Positives (Normal flagged as fraud):** {fp:,} | |
| - β **False Negatives (Fraud missed):** {fn:,} | |
| - β **True Positives (Correctly identified fraud):** {tp:,} | |
| **Accuracy:** {(tp + tn) / total * 100:.2f}% | |
| """ | |
| # Export to Power BI (automatic) | |
| if POWERBI_AVAILABLE: | |
| try: | |
| export_powerbi_csvs(result_df) | |
| print("β Data exported to Power BI format") | |
| except Exception as e: | |
| print(f"β οΈ Power BI export failed: {e}") | |
| # Get top suspicious transactions | |
| top_df = result_df.nlargest(20, "fraud_probability") | |
| # Select columns to display | |
| display_cols = ["fraud_probability", "fraud_prediction", "risk_level"] | |
| # Add amount column if available | |
| for amt_col in ["amt", "amount", "Amount", "TransactionAmt"]: | |
| if amt_col in top_df.columns: | |
| display_cols.insert(1, amt_col) | |
| break | |
| # Add first few original columns | |
| for col in result_df.columns: | |
| if col not in ["fraud_probability", "fraud_prediction", "risk_level"] and col not in display_cols: | |
| if len(display_cols) < 8: # Limit display columns | |
| display_cols.append(col) | |
| else: | |
| break | |
| display_cols = [c for c in display_cols if c in top_df.columns] | |
| top_display = top_df[display_cols].copy() | |
| # Format probability column for display | |
| if "fraud_probability" in top_display.columns: | |
| top_display["fraud_probability"] = top_display["fraud_probability"].round(4) | |
| # Create visualizations (pass model_metrics if available) | |
| try: | |
| viz_list = create_visualizations(result_df, threshold, model_metrics if has_ground_truth else None) | |
| except Exception as viz_error: | |
| import traceback | |
| print(f"Warning: Error creating some visualizations: {str(viz_error)}") | |
| print(traceback.format_exc()) | |
| # Create empty visualizations list if there's an error | |
| viz_list = [] | |
| # Ensure we always return 15 visualizations | |
| while len(viz_list) < 15: | |
| viz_list.append(None) | |
| # Return all outputs (summary, table, and 15 visualizations) | |
| print(f"Returning results: summary length={len(summary)}, top_display shape={top_display.shape}, viz_count={len([v for v in viz_list if v is not None])}") | |
| return ( | |
| summary, | |
| top_display, | |
| viz_list[0], # Probability Distribution | |
| viz_list[1], # Risk Level Breakdown | |
| viz_list[2], # Amount Analysis | |
| viz_list[3], # Statistics | |
| viz_list[4], # Time Series | |
| viz_list[5], # Top 20 Suspicious | |
| viz_list[6] if len(viz_list) > 6 else None, # Model Performance Metrics | |
| viz_list[7] if len(viz_list) > 7 else None, # Confusion Matrix | |
| viz_list[8] if len(viz_list) > 8 else None, # ROC Curve | |
| viz_list[9] if len(viz_list) > 9 else None, # Precision-Recall Curve | |
| viz_list[10] if len(viz_list) > 10 else None, # Amount Distribution (Box Plot) | |
| viz_list[11] if len(viz_list) > 11 else None, # Probability Distribution (Violin) | |
| viz_list[12] if len(viz_list) > 12 else None, # Correlation Heatmap | |
| viz_list[13] if len(viz_list) > 13 else None, # Cumulative Fraud Detection | |
| viz_list[14] if len(viz_list) > 14 else None, # Threshold Sensitivity | |
| ) | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"β Error: {str(e)}\n\n{traceback.format_exc()}" | |
| print(f"FATAL ERROR: {error_msg}") | |
| return ( | |
| error_msg, | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| # Apply GPU decorator to prediction function for ZeroGPU | |
| # This allows GPU to be allocated only during prediction calls, not for the entire server | |
| # ZeroGPU requires at least one @spaces.GPU decorated function to detect and allocate GPU | |
| if SPACES_AVAILABLE: | |
| print("Applying @spaces.GPU decorator to predict_fraud_enhanced for ZeroGPU support") | |
| predict_fraud_enhanced = spaces.GPU(predict_fraud_enhanced) | |
| # Create Gradio Interface | |
| with gr.Blocks( | |
| title="Credit Card Fraud Detection", | |
| analytics_enabled=False | |
| ) as demo: | |
| gr.Markdown(""" | |
| # π‘οΈ Credit Card Fraud Detection System | |
| Upload a CSV file with transaction data to detect potential fraud. The system uses a calibrated | |
| LightGBM model to predict fraud probabilities and provides comprehensive visualizations. | |
| **Supported formats:** CSV files with numeric features (V1-V28, Amount, Time, etc.) | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File( | |
| label="π Upload Transaction CSV", | |
| file_count="single" | |
| ) | |
| # Add button to use dataset file directly (avoids permission issues) | |
| use_dataset_btn = gr.Button( | |
| "π Use dataset/fraudTest.csv (Skip Upload)", | |
| variant="secondary", | |
| size="sm" | |
| ) | |
| # Add Sample button to use sample dataset | |
| sample_btn = gr.Button( | |
| "π Use Sample Dataset", | |
| variant="secondary", | |
| size="sm" | |
| ) | |
| gr.Markdown(""" | |
| **π‘ Tip:** If you get permission errors, click the button above to use the dataset file directly! | |
| **π― Try Sample:** Click "Use Sample Dataset" to test the model with sample data! | |
| """) | |
| with gr.Column(scale=1): | |
| threshold = gr.Slider( | |
| minimum=0.01, | |
| maximum=0.99, | |
| value=0.05, | |
| step=0.01, | |
| label="π― Fraud Detection Threshold", | |
| info="β οΈ For imbalanced data, use 0.01-0.05 for better recall. Default 0.5 is too high!" | |
| ) | |
| # Function to use dataset file directly | |
| def use_dataset_file(): | |
| if os.path.exists(DATASET_PATH): | |
| # Return the file path as a string - the predict function will handle it | |
| return DATASET_PATH | |
| else: | |
| return None | |
| # When button is clicked, trigger prediction with dataset file | |
| def use_dataset_and_predict(threshold_val): | |
| try: | |
| print(f"DEBUG: Looking for dataset at: {DATASET_PATH}") | |
| print(f"DEBUG: File exists: {os.path.exists(DATASET_PATH)}") | |
| if os.path.exists(DATASET_PATH): | |
| print(f"DEBUG: Found dataset, calling predict_fraud_enhanced...") | |
| # Call predict function directly with dataset path | |
| return predict_fraud_enhanced(DATASET_PATH, threshold_val) | |
| else: | |
| # Try relative path as fallback | |
| fallback_path = "dataset/fraudTest.csv" | |
| if os.path.exists(fallback_path): | |
| print(f"DEBUG: Found dataset at relative path: {fallback_path}") | |
| return predict_fraud_enhanced(fallback_path, threshold_val) | |
| # Try absolute path from current directory | |
| abs_fallback = os.path.abspath(fallback_path) | |
| if os.path.exists(abs_fallback): | |
| print(f"DEBUG: Found dataset at absolute fallback path: {abs_fallback}") | |
| return predict_fraud_enhanced(abs_fallback, threshold_val) | |
| return ( | |
| f"β Error: Dataset not found.\n\n" | |
| f"**Expected locations:**\n" | |
| f"- {DATASET_PATH}\n" | |
| f"- {fallback_path}\n" | |
| f"- {abs_fallback}\n\n" | |
| f"Please upload a file instead.", | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"β Error loading dataset: {str(e)}\n\n{traceback.format_exc()}" | |
| print(f"ERROR in use_dataset_and_predict: {error_msg}") | |
| return ( | |
| error_msg, | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| # When Sample button is clicked, use sample dataset | |
| def use_sample_and_predict(threshold_val): | |
| try: | |
| print(f"DEBUG: Looking for sample dataset at: {SAMPLE_DATASET_PATH}") | |
| print(f"DEBUG: File exists: {os.path.exists(SAMPLE_DATASET_PATH)}") | |
| print(f"DEBUG: Current working directory: {os.getcwd()}") | |
| print(f"DEBUG: Script directory: {SCRIPT_DIR}") | |
| if os.path.exists(SAMPLE_DATASET_PATH): | |
| print(f"DEBUG: Found sample dataset, calling predict_fraud_enhanced...") | |
| # Call predict function directly with sample path | |
| return predict_fraud_enhanced(SAMPLE_DATASET_PATH, threshold_val) | |
| else: | |
| # Try relative path as fallback | |
| fallback_path = "sample_transactions.csv" | |
| if os.path.exists(fallback_path): | |
| print(f"DEBUG: Found sample dataset at relative path: {fallback_path}") | |
| return predict_fraud_enhanced(fallback_path, threshold_val) | |
| # Try absolute path from current directory | |
| abs_fallback = os.path.abspath(fallback_path) | |
| if os.path.exists(abs_fallback): | |
| print(f"DEBUG: Found sample dataset at absolute fallback path: {abs_fallback}") | |
| return predict_fraud_enhanced(abs_fallback, threshold_val) | |
| return ( | |
| f"β Error: Sample dataset not found.\n\n" | |
| f"**Expected locations:**\n" | |
| f"- {SAMPLE_DATASET_PATH}\n" | |
| f"- {fallback_path}\n" | |
| f"- {abs_fallback}\n\n" | |
| f"**Current working directory:** {os.getcwd()}\n" | |
| f"**Script directory:** {SCRIPT_DIR}\n\n" | |
| f"Please ensure the sample_transactions.csv file exists in one of these locations.", | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"β Error loading sample dataset: {str(e)}\n\n{traceback.format_exc()}" | |
| print(f"ERROR in use_sample_and_predict: {error_msg}") | |
| return ( | |
| error_msg, | |
| pd.DataFrame(), | |
| None, None, None, None, None, None, None, None, None, None, None, None, None, None, None | |
| ) | |
| btn = gr.Button("π Detect Fraud", variant="primary", size="lg") | |
| with gr.Row(): | |
| summary_output = gr.Markdown(label="Summary") | |
| with gr.Tabs(): | |
| with gr.Tab("π Top Suspicious Transactions"): | |
| output_table = gr.Dataframe( | |
| label="Top 20 Most Suspicious Transactions", | |
| wrap=True | |
| ) | |
| with gr.Tab("π Probability Distribution"): | |
| viz1 = gr.Plot(label="Fraud Probability Distribution") | |
| with gr.Tab("π₯§ Risk Level Breakdown"): | |
| viz2 = gr.Plot(label="Risk Level Distribution") | |
| with gr.Tab("π° Amount Analysis"): | |
| viz3 = gr.Plot(label="Amount vs Fraud Probability") | |
| with gr.Tab("π Statistics"): | |
| viz4 = gr.Plot(label="Transaction Summary") | |
| with gr.Tab("β° Time Series"): | |
| viz5 = gr.Plot(label="Fraud Probability Over Time") | |
| with gr.Tab("π Top 20 Suspicious"): | |
| viz6 = gr.Plot(label="Top 20 Most Suspicious Transactions") | |
| with gr.Tab("π― Model Performance"): | |
| viz7 = gr.Plot(label="Model Performance Metrics") | |
| with gr.Tab("π Confusion Matrix"): | |
| viz8 = gr.Plot(label="Confusion Matrix") | |
| with gr.Tab("π ROC Curve"): | |
| viz9 = gr.Plot(label="ROC Curve") | |
| with gr.Tab("π Precision-Recall Curve"): | |
| viz10 = gr.Plot(label="Precision-Recall Curve") | |
| with gr.Tab("π¦ Amount Distribution"): | |
| viz11 = gr.Plot(label="Amount Distribution Comparison") | |
| with gr.Tab("π» Probability Distribution"): | |
| viz12 = gr.Plot(label="Fraud Probability Distribution Comparison") | |
| with gr.Tab("π₯ Correlation Heatmap"): | |
| viz13 = gr.Plot(label="Feature Correlation Heatmap") | |
| with gr.Tab("π Cumulative Detection"): | |
| viz14 = gr.Plot(label="Cumulative Fraud Detection Over Time") | |
| with gr.Tab("βοΈ Threshold Analysis"): | |
| viz15 = gr.Plot(label="Threshold Sensitivity Analysis") | |
| # Connect button to use dataset file (after outputs are defined) | |
| use_dataset_btn.click( | |
| fn=use_dataset_and_predict, | |
| inputs=[threshold], | |
| outputs=[ | |
| summary_output, | |
| output_table, | |
| viz1, viz2, viz3, viz4, viz5, viz6, viz7, viz8, | |
| viz9, viz10, viz11, viz12, viz13, viz14, viz15 | |
| ] | |
| ) | |
| # Connect Sample button (after outputs are defined) | |
| sample_btn.click( | |
| fn=use_sample_and_predict, | |
| inputs=[threshold], | |
| outputs=[ | |
| summary_output, | |
| output_table, | |
| viz1, viz2, viz3, viz4, viz5, viz6, viz7, viz8, | |
| viz9, viz10, viz11, viz12, viz13, viz14, viz15 | |
| ] | |
| ) | |
| btn.click( | |
| fn=predict_fraud_enhanced, | |
| inputs=[file_input, threshold], | |
| outputs=[ | |
| summary_output, | |
| output_table, | |
| viz1, viz2, viz3, viz4, viz5, viz6, viz7, viz8, | |
| viz9, viz10, viz11, viz12, viz13, viz14, viz15 | |
| ] | |
| # Removed api_name to avoid Gradio API info generation bug | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### π Notes: | |
| - The model expects numeric features (V1-V28, Amount, optionally Time) | |
| - If exact feature names are not found, the system will use available numeric columns | |
| - Processing is limited to 10,000 rows for optimal performance | |
| - Adjust the threshold to balance between false positives and false negatives | |
| """) | |
| # Module-level function for launching the app | |
| def launch_app(): | |
| """Launch the Gradio app.""" | |
| # Test GPU availability (optional - CuPy will use GPU automatically if available) | |
| if CUPY_AVAILABLE: | |
| try: | |
| # Quick test to verify GPU is accessible | |
| test_gpu = cp.array([1.0, 2.0, 3.0]) | |
| result = cp.sum(test_gpu) | |
| gpu_result = float(result) | |
| print(f"β GPU available for data processing (test result: {gpu_result})") | |
| except Exception as e: | |
| print(f"β οΈ GPU test failed (will use CPU): {e}") | |
| # Launch Gradio app | |
| # Note: GPU operations happen in score_batch() using CuPy | |
| # For ZeroGPU: GPU will be allocated automatically when CuPy operations are performed | |
| # We don't use @spaces.GPU decorator on launch_app() to avoid GPU context timeout issues | |
| demo.launch() | |
| if __name__ == "__main__": | |
| import os | |
| print("Starting Gradio app...") | |
| # Check if running on Hugging Face Spaces | |
| is_spaces = os.getenv("SPACE_ID") is not None | |
| if is_spaces: | |
| print("Detected Hugging Face Spaces environment") | |
| # On Spaces, launch without GPU decorator on launch_app | |
| # GPU operations will happen in score_batch() using CuPy | |
| # ZeroGPU will allocate GPU automatically when CuPy operations are performed | |
| print("Note: GPU will be used automatically in prediction functions via CuPy") | |
| launch_app() | |
| else: | |
| print("Running locally - server will be available at http://127.0.0.1:7860") | |
| print("Watch this console for debug output when you upload files!") | |
| demo.queue(api_open=False) | |
| try: | |
| demo.launch( | |
| server_name="0.0.0.0", # Use 0.0.0.0 instead of 127.0.0.1 for better compatibility | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| quiet=False, | |
| inbrowser=False, | |
| ) | |
| except ValueError as e: | |
| # Fallback if 0.0.0.0 doesn't work, try with share enabled | |
| if "localhost is not accessible" in str(e): | |
| print("β οΈ Localhost not accessible, trying with share=True...") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, # Enable share as fallback | |
| show_error=True, | |
| quiet=False, | |
| inbrowser=False, | |
| ) | |
| else: | |
| raise | |