Beepeen24's picture
Update app.py
9599815 verified
# app.py - Enhanced Gradio Fraud Detection App with Visualizations
import time
import os
import shutil
import tempfile
import uuid
import threading
import joblib
import numpy as np
import pandas as pd
import gradio as gr
import plotly.graph_objects as go
from io import BytesIO
from sklearn.metrics import (
roc_auc_score, precision_recall_fscore_support, confusion_matrix,
precision_recall_curve, average_precision_score, f1_score, roc_curve
)
# Import Hugging Face Spaces GPU decorator (optional, only works on Spaces)
try:
import spaces
SPACES_AVAILABLE = True
except ImportError:
SPACES_AVAILABLE = False
spaces = None
# Try to import GPU-accelerated libraries for data processing
try:
import cupy as cp
CUPY_AVAILABLE = True
print("βœ… CuPy available - GPU acceleration enabled for data processing")
except ImportError:
CUPY_AVAILABLE = False
cp = None
print("⚠️ CuPy not available - using CPU for data processing")
# Import Power BI export function
try:
from powerbi_export import export_powerbi_csvs
POWERBI_AVAILABLE = True
except ImportError:
POWERBI_AVAILABLE = False
print("⚠️ powerbi_export module not found. Power BI export will be skipped.")
# Fix for Gradio API info generation bug
# Monkey-patch to handle the TypeError in gradio_client.utils when schema is a bool
try:
import gradio_client.utils as client_utils
# Patch get_type to handle bool values
original_get_type = client_utils.get_type
def patched_get_type(schema):
"""Patched version that handles bool schema values"""
# Handle case where schema is a bool (True/False) instead of a dict
if isinstance(schema, bool):
return "bool"
if not isinstance(schema, dict):
return "Any"
# Check if "const" key exists before using "in" operator
if isinstance(schema, dict) and "const" in schema:
return str(schema["const"])
try:
return original_get_type(schema)
except TypeError:
# Fallback if original function fails
return "Any"
client_utils.get_type = patched_get_type
# Also patch _json_schema_to_python_type to handle additionalProperties being a bool
original_json_schema_to_python_type = client_utils._json_schema_to_python_type
def patched_json_schema_to_python_type(schema, defs=None):
"""Patched version that handles bool additionalProperties"""
if isinstance(schema, bool):
return "Any"
if isinstance(schema, dict) and "additionalProperties" in schema:
additional_props = schema["additionalProperties"]
if isinstance(additional_props, bool):
# If additionalProperties is True, allow any properties
# If False, no additional properties allowed
schema = schema.copy()
schema["additionalProperties"] = {"type": "object" if additional_props else "null"}
try:
return original_json_schema_to_python_type(schema, defs)
except (TypeError, KeyError) as e:
# Fallback to avoid crashes
return "Any"
client_utils._json_schema_to_python_type = patched_json_schema_to_python_type
print("βœ… Gradio API info bug patched")
except Exception as e:
print(f"⚠️ Could not patch Gradio client utils: {e}")
# Get the directory where this script is located
try:
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
except NameError:
# Fallback if __file__ is not available (shouldn't happen in normal execution)
SCRIPT_DIR = os.getcwd()
print(f"⚠️ Warning: Could not determine script directory, using current working directory: {SCRIPT_DIR}")
# Print paths for debugging
print(f"πŸ“ Script directory: {SCRIPT_DIR}")
print(f"πŸ“ Sample dataset path: {os.path.join(SCRIPT_DIR, 'sample_transactions.csv')}")
print(f"πŸ“ Dataset path: {os.path.join(SCRIPT_DIR, 'dataset', 'fraudTest.csv')}")
print(f"πŸ“ Model path: {os.path.join(SCRIPT_DIR, 'fraud_lgbm_calibrated.pkl')}")
MODEL_PATH = os.path.join(SCRIPT_DIR, "fraud_lgbm_calibrated.pkl")
SAMPLE_DATASET_PATH = os.path.join(SCRIPT_DIR, "sample_transactions.csv")
DATASET_PATH = os.path.join(SCRIPT_DIR, "dataset", "fraudTest.csv")
model = None
def load_model():
"""Load the model file if it exists."""
global model
if model is not None:
return model
if not os.path.exists(MODEL_PATH):
raise FileNotFoundError(
f"Model file not found: {MODEL_PATH}\n"
f"Please train the model first or download it from your repository.\n"
f"The model file should be in the same directory as app.py"
)
print("Loading model...")
model = joblib.load(MODEL_PATH)
print(f"βœ… Model loaded! Features: {model.n_features_in_}")
print(f" Model type: {type(model)}")
# Test model with dummy data to verify it works
try:
test_features = np.zeros((1, model.n_features_in_))
test_pred = model.predict_proba(test_features)[:, 1]
print(f" βœ… Model test prediction: {test_pred[0]:.4f}")
except Exception as e:
print(f" ⚠️ Model test failed: {e}")
return model
# Try to load model at startup (but don't crash if it fails)
try:
load_model()
except FileNotFoundError as e:
print(f"⚠️ {e}")
print("⚠️ App will start but predictions will fail until model is available.")
def score_batch(raw_df: pd.DataFrame, threshold: float = 0.5) -> pd.DataFrame:
"""
Batch scoring with the exact 25 features the calibrated model expects.
Uses GPU acceleration for feature engineering when available.
"""
if raw_df is None or raw_df.empty:
raise ValueError("No data provided")
df = raw_df.copy()
# === EXACT 25 FEATURES THE MODEL WAS TRAINED ON ===
exact_features = [
'amt', 'city_pop', 'dayofweek', 'dayofyear', 'dist_category_bucket_idx',
'dist_home_merch', 'dow_cos', 'dow_sin', 'hour', 'hour_cos', 'hour_sin',
'is_business_hours', 'is_night', 'is_weekend', 'max_amt', 'mean_amt',
'median_amt', 'month', 'std_amt', 'time_since_last_txn',
'total_amt_last_1h', 'total_amt_last_24h', 'transaction_count',
'txn_count_last_1h', 'txn_count_last_24h'
]
# Extract time features from unix_time if present
if "unix_time" in df.columns:
ts = pd.to_datetime(pd.to_numeric(df["unix_time"], errors="coerce"), unit="s", utc=True)
df["hour"] = ts.dt.hour.fillna(0).astype(int)
df["dayofweek"] = ts.dt.dayofweek.fillna(0).astype(int)
df["month"] = ts.dt.month.fillna(1).astype(int)
df["dayofyear"] = ts.dt.dayofyear.fillna(1).astype(int)
else:
# Default time features if not available
df["hour"] = 12 # Noon as default
df["dayofweek"] = 1 # Monday as default
df["month"] = 6 # June as default
df["dayofyear"] = 150 # Mid-year as default
# Cyclic encodings for hour and dayofweek
# Use GPU acceleration if available for large datasets
for col, period, sin_col, cos_col in [
("hour", 24, "hour_sin", "hour_cos"),
("dayofweek", 7, "dow_sin", "dow_cos")
]:
if col in df.columns:
# Use GPU for data processing if CuPy is available
# For ZeroGPU: Always try GPU if available (even for small datasets)
# For regular GPU: Use GPU for large datasets (>1000 rows)
use_gpu = CUPY_AVAILABLE and (len(df) > 100 or os.getenv("SPACE_ID") is not None)
if use_gpu:
try:
gpu_array = cp.asarray(df[col].values)
angle = 2 * cp.pi * gpu_array / period
df[sin_col] = cp.asnumpy(cp.sin(angle))
df[cos_col] = cp.asnumpy(cp.cos(angle))
if len(df) <= 100:
print(f" βœ… GPU used for {col} encoding (ZeroGPU context)")
except Exception as e:
# Fallback to CPU if GPU operation fails
print(f" ⚠️ GPU operation failed for {col}, using CPU: {e}")
angle = 2 * np.pi * df[col] / period
df[sin_col] = np.sin(angle)
df[cos_col] = np.cos(angle)
else:
# CPU computation for small datasets or when CuPy unavailable
angle = 2 * np.pi * df[col] / period
df[sin_col] = np.sin(angle)
df[cos_col] = np.cos(angle)
# Time-based flags
if "hour" in df.columns:
df["is_weekend"] = (df["dayofweek"] >= 5).astype(int)
df["is_night"] = ((df["hour"] >= 22) | (df["hour"] <= 5)).astype(int)
df["is_business_hours"] = ((df["hour"] >= 9) & (df["hour"] <= 17)).astype(int)
else:
df["is_weekend"] = 0
df["is_night"] = 0
df["is_business_hours"] = 0
# Ensure base features exist with sensible defaults
if "amt" not in df.columns:
# Try to find amount column
for col in ["amount", "Amount", "TransactionAmt"]:
if col in df.columns:
df["amt"] = df[col]
break
if "amt" not in df.columns:
df["amt"] = 50.0 # Default amount
if "city_pop" not in df.columns:
df["city_pop"] = 100000.0 # Default city population
if "dist_home_merch" not in df.columns:
df["dist_home_merch"] = 5.0 # Default distance (5 km)
# Create dist_category_bucket_idx from dist_category if available
if "dist_category_bucket_idx" not in df.columns:
if "dist_category" in df.columns:
# Map distance categories to bucket indices
dist_order = {"<1km": 0, "1-10km": 1, "10-50km": 2, "50-100km": 3, "100-500km": 4, "500km+": 5, ">=500km": 5}
df["dist_category_bucket_idx"] = df["dist_category"].map(dist_order).fillna(3).astype(int)
else:
# Default to middle bucket (3 = 50-100km)
df["dist_category_bucket_idx"] = 3
# Fill historical/velocity features with sensible defaults (not 0!)
# These features typically have non-zero values in real data
velocity_defaults = {
'txn_count_last_1h': 1.0, # At least this transaction
'total_amt_last_1h': df["amt"].fillna(50.0), # Use current amount as estimate
'txn_count_last_24h': 3.0, # Typical: 3 transactions per day
'total_amt_last_24h': df["amt"].fillna(50.0) * 3, # Estimate based on count
}
for col, default_val in velocity_defaults.items():
if col not in df.columns:
if isinstance(default_val, pd.Series):
df[col] = default_val
else:
df[col] = default_val
# Fill aggregated features with sensible defaults
amt_filled = df["amt"].fillna(50.0)
aggregate_defaults = {
'time_since_last_txn': 3600.0, # 1 hour ago (in seconds)
'mean_amt': amt_filled, # Use current amount as mean
'std_amt': amt_filled * 0.3, # 30% of mean as std
'median_amt': amt_filled, # Use current amount as median
'max_amt': amt_filled, # Use current amount as max
'transaction_count': 1.0, # At least this transaction
}
for col, default_val in aggregate_defaults.items():
if col not in df.columns:
if isinstance(default_val, pd.Series):
df[col] = default_val
else:
df[col] = default_val
# Use ONLY the 25 features, in sorted order (LightGBM is picky about feature order)
feature_df = df[sorted(exact_features)].copy()
# Final safety check: replace any NaN/inf with defaults
feature_df = feature_df.replace([np.inf, -np.inf], np.nan)
for col in feature_df.columns:
if feature_df[col].isna().any():
# Use median or mean for that column
fill_val = feature_df[col].median() if feature_df[col].notna().any() else 0.0
if pd.isna(fill_val):
fill_val = 0.0
feature_df[col] = feature_df[col].fillna(fill_val)
# Debug: Print feature statistics
print(f"πŸ“Š Feature Engineering Summary:")
print(f" - Features created: {len(exact_features)}")
print(f" - Feature ranges:")
for col in sorted(exact_features)[:5]: # Show first 5
if col in feature_df.columns:
print(f" {col}: [{feature_df[col].min():.2f}, {feature_df[col].max():.2f}]")
print(f" - Any NaN values: {feature_df.isna().sum().sum()}")
# Load model if not already loaded
try:
current_model = load_model()
except FileNotFoundError as e:
raise FileNotFoundError(
f"Model file not found: {MODEL_PATH}\n"
f"Cannot perform predictions without the model.\n"
f"Please ensure fraud_lgbm_calibrated.pkl is in the same directory as app.py"
)
# Predict calibrated probabilities
try:
prob = current_model.predict_proba(feature_df)[:, 1]
print(f" - Probability range: [{prob.min():.4f}, {prob.max():.4f}]")
print(f" - Mean probability: {prob.mean():.4f}")
except Exception as e:
print(f"❌ Prediction error: {e}")
print(f" Model expects {current_model.n_features_in_} features, got {feature_df.shape[1]}")
print(f" Feature columns: {list(feature_df.columns)}")
raise
df["fraud_probability"] = prob
df["fraud_prediction"] = (prob >= threshold).astype(int)
# Risk levels
conditions = [
df["fraud_probability"] < 0.1,
df["fraud_probability"] < 0.5,
df["fraud_probability"] < 0.9,
]
choices = ["Low", "Medium", "High"]
df["risk_level"] = np.select(conditions, choices, default="Critical")
return df
def create_visualizations(result_df: pd.DataFrame, threshold: float, model_metrics: dict = None):
"""Create comprehensive visualizations for fraud detection results."""
visualizations = []
# 1. Fraud Probability Distribution
fig1 = go.Figure()
if len(result_df) > 0:
fig1.add_trace(go.Histogram(
x=result_df["fraud_probability"],
nbinsx=min(50, len(result_df) // 2),
name="All Transactions",
marker_color="lightblue",
opacity=0.7
))
fraud_df = result_df[result_df["fraud_prediction"] == 1]
if len(fraud_df) > 0:
fig1.add_trace(go.Histogram(
x=fraud_df["fraud_probability"],
nbinsx=min(50, len(fraud_df) // 2),
name="Flagged as Fraud",
marker_color="red",
opacity=0.8
))
fig1.add_vline(
x=threshold,
line_dash="dash",
line_color="orange",
annotation_text=f"Threshold: {threshold:.2f}"
)
fig1.update_layout(
title="Fraud Probability Distribution",
xaxis_title="Fraud Probability",
yaxis_title="Count",
height=400,
showlegend=True
)
visualizations.append(fig1)
# 2. Risk Level Breakdown (Pie Chart)
if len(result_df) > 0:
risk_counts = result_df["risk_level"].value_counts()
color_map = {"Low": "green", "Medium": "yellow", "High": "orange", "Critical": "red"}
colors = [color_map.get(label, "gray") for label in risk_counts.index]
fig2 = go.Figure(data=[go.Pie(
labels=risk_counts.index,
values=risk_counts.values,
hole=0.4,
marker_colors=colors
)])
fig2.update_layout(
title="Risk Level Distribution",
height=400
)
else:
fig2 = go.Figure()
fig2.update_layout(title="Risk Level Distribution (No Data)", height=400)
visualizations.append(fig2)
# 3. Amount vs Fraud Probability (Scatter)
amount_col = None
for col in ["amt", "amount", "Amount", "TransactionAmt"]:
if col in result_df.columns:
amount_col = col
break
if amount_col and len(result_df) > 0:
fig3 = go.Figure()
fraud_df = result_df[result_df["fraud_prediction"] == 1]
normal_df = result_df[result_df["fraud_prediction"] == 0]
if len(normal_df) > 0:
fig3.add_trace(go.Scatter(
x=normal_df[amount_col],
y=normal_df["fraud_probability"],
mode="markers",
name="Normal",
marker=dict(color="blue", size=4, opacity=0.5)
))
if len(fraud_df) > 0:
fig3.add_trace(go.Scatter(
x=fraud_df[amount_col],
y=fraud_df["fraud_probability"],
mode="markers",
name="Fraud",
marker=dict(color="red", size=6, opacity=0.8)
))
fig3.add_hline(
y=threshold,
line_dash="dash",
line_color="orange",
annotation_text=f"Threshold: {threshold:.2f}"
)
fig3.update_layout(
title="Transaction Amount vs Fraud Probability",
xaxis_title=f"Amount ({amount_col})",
yaxis_title="Fraud Probability",
height=400,
showlegend=True
)
visualizations.append(fig3)
else:
fig3 = go.Figure()
fig3.update_layout(
title="Transaction Amount vs Fraud Probability (Amount column not found)",
height=400
)
visualizations.append(fig3)
# 4. Fraud Statistics Bar Chart
if len(result_df) > 0:
total = len(result_df)
fraud_count = int(result_df["fraud_prediction"].sum())
fraud_rate = (fraud_count / total) * 100 if total > 0 else 0.0
fig4 = go.Figure(data=[
go.Bar(
x=["Total Transactions", "Fraud Detected", "Normal"],
y=[total, fraud_count, total - fraud_count],
marker_color=["blue", "red", "green"],
text=[f"{total:,}", f"{fraud_count:,}", f"{total - fraud_count:,}"],
textposition="auto"
)
])
fig4.update_layout(
title="Transaction Summary",
yaxis_title="Count",
height=400
)
else:
fig4 = go.Figure()
fig4.update_layout(title="Transaction Summary (No Data)", height=400)
visualizations.append(fig4)
# 5. Fraud Probability Over Time (if time column exists)
time_col = None
for col in ["Time", "time", "unix_time", "timestamp"]:
if col in result_df.columns:
time_col = col
break
if time_col and len(result_df) > 0:
# Sort by time and create rolling average
sorted_df = result_df.sort_values(time_col).copy()
window_size = max(1, min(100, len(sorted_df) // 10))
sorted_df["rolling_avg"] = sorted_df["fraud_probability"].rolling(
window=window_size, center=True
).mean()
fig5 = go.Figure()
fig5.add_trace(go.Scatter(
x=sorted_df[time_col],
y=sorted_df["fraud_probability"],
mode="markers",
name="Individual",
marker=dict(size=3, opacity=0.3, color="lightblue")
))
if sorted_df["rolling_avg"].notna().any():
fig5.add_trace(go.Scatter(
x=sorted_df[time_col],
y=sorted_df["rolling_avg"],
mode="lines",
name="Rolling Average",
line=dict(color="blue", width=2)
))
fig5.add_hline(
y=threshold,
line_dash="dash",
line_color="orange",
annotation_text=f"Threshold: {threshold:.2f}"
)
fig5.update_layout(
title="Fraud Probability Over Time",
xaxis_title=time_col,
yaxis_title="Fraud Probability",
height=400,
showlegend=True
)
visualizations.append(fig5)
else:
fig5 = go.Figure()
fig5.update_layout(
title="Fraud Probability Over Time (Time column not found)",
height=400
)
visualizations.append(fig5)
# 6. Top 20 Most Suspicious Transactions
if len(result_df) > 0:
top_suspicious = result_df.nlargest(min(20, len(result_df)), "fraud_probability")
fig6 = go.Figure(data=[
go.Bar(
x=[str(i) for i in top_suspicious.index],
y=top_suspicious["fraud_probability"],
marker_color=top_suspicious["fraud_probability"],
marker_colorscale="Reds",
text=top_suspicious["fraud_probability"].round(3),
textposition="auto"
)
])
fig6.update_layout(
title="Top 20 Most Suspicious Transactions",
xaxis_title="Transaction Index",
yaxis_title="Fraud Probability",
height=400
)
visualizations.append(fig6)
else:
fig6 = go.Figure()
fig6.update_layout(title="Top 20 Most Suspicious Transactions (No Data)", height=400)
visualizations.append(fig6)
# 7. Model Performance Metrics (if available)
if model_metrics and len(model_metrics) > 0:
fig7 = go.Figure()
# Create metrics bar chart - include PR-AUC for imbalanced data
metrics_names = ['ROC-AUC', 'PR-AUC', 'Precision', 'Recall', 'F1-Score']
metrics_values = [
model_metrics.get('roc_auc', 0),
model_metrics.get('pr_auc', 0),
model_metrics.get('precision', 0),
model_metrics.get('recall', 0),
model_metrics.get('f1', 0)
]
colors = ['#1f77b4', '#9467bd', '#ff7f0e', '#2ca02c', '#d62728']
fig7.add_trace(go.Bar(
x=metrics_names,
y=metrics_values,
marker_color=colors,
text=[f'{v:.4f}' for v in metrics_values],
textposition='auto'
))
fig7.update_layout(
title="Model Performance Metrics (PR-AUC better for imbalanced data)",
yaxis_title="Score",
yaxis_range=[0, 1],
height=400
)
visualizations.append(fig7)
# 8. Confusion Matrix Heatmap
if all(k in model_metrics for k in ['true_negatives', 'false_positives', 'false_negatives', 'true_positives']):
fig8 = go.Figure(data=go.Heatmap(
z=[[model_metrics['true_negatives'], model_metrics['false_positives']],
[model_metrics['false_negatives'], model_metrics['true_positives']]],
x=['Predicted Normal', 'Predicted Fraud'],
y=['Actual Normal', 'Actual Fraud'],
colorscale='Blues',
text=[[model_metrics['true_negatives'], model_metrics['false_positives']],
[model_metrics['false_negatives'], model_metrics['true_positives']]],
texttemplate='%{text}',
textfont={"size": 16},
colorbar=dict(title="Count")
))
fig8.update_layout(
title="Confusion Matrix",
height=400
)
visualizations.append(fig8)
else:
visualizations.append(None)
else:
# Add None placeholders if no metrics
visualizations.append(None)
visualizations.append(None)
# ============================================================================
# ADVANCED VISUALIZATIONS
# ============================================================================
# 9. ROC Curve (if ground truth available)
if model_metrics and 'roc_auc' in model_metrics:
try:
# Get ground truth and predictions
ground_truth_cols = ["is_fraud", "fraud", "target", "label", "isFraud"]
y_true = None
for col in ground_truth_cols:
if col in result_df.columns:
y_true = result_df[col]
break
if y_true is not None:
y_prob = result_df["fraud_probability"]
fpr, tpr, roc_thresholds = roc_curve(y_true, y_prob)
fig9 = go.Figure()
fig9.add_trace(go.Scatter(
x=fpr,
y=tpr,
mode='lines',
name=f'ROC Curve (AUC = {model_metrics["roc_auc"]:.4f})',
line=dict(color='blue', width=2)
))
fig9.add_trace(go.Scatter(
x=[0, 1],
y=[0, 1],
mode='lines',
name='Random Classifier',
line=dict(color='red', dash='dash', width=1)
))
fig9.update_layout(
title="ROC Curve (Receiver Operating Characteristic)",
xaxis_title="False Positive Rate",
yaxis_title="True Positive Rate",
height=400,
showlegend=True
)
visualizations.append(fig9)
else:
visualizations.append(None)
except Exception as e:
print(f"Warning: Could not create ROC curve: {e}")
visualizations.append(None)
else:
visualizations.append(None)
# 10. Precision-Recall Curve (if ground truth available)
if model_metrics and 'pr_auc' in model_metrics:
try:
ground_truth_cols = ["is_fraud", "fraud", "target", "label", "isFraud"]
y_true = None
for col in ground_truth_cols:
if col in result_df.columns:
y_true = result_df[col]
break
if y_true is not None:
y_prob = result_df["fraud_probability"]
precisions, recalls, pr_thresholds = precision_recall_curve(y_true, y_prob)
fig10 = go.Figure()
fig10.add_trace(go.Scatter(
x=recalls,
y=precisions,
mode='lines',
name=f'PR Curve (AUC = {model_metrics["pr_auc"]:.4f})',
line=dict(color='green', width=2),
fill='tonexty'
))
fig10.add_hline(
y=y_true.mean(),
line_dash="dash",
line_color="orange",
annotation_text=f"Baseline (Fraud Rate: {y_true.mean():.2%})"
)
fig10.update_layout(
title="Precision-Recall Curve (Better for Imbalanced Data)",
xaxis_title="Recall",
yaxis_title="Precision",
height=400,
showlegend=True
)
visualizations.append(fig10)
else:
visualizations.append(None)
except Exception as e:
print(f"Warning: Could not create PR curve: {e}")
visualizations.append(None)
else:
visualizations.append(None)
# 11. Amount Distribution Comparison (Box Plot)
amount_col = None
for col in ["amt", "amount", "Amount", "TransactionAmt"]:
if col in result_df.columns:
amount_col = col
break
if amount_col and len(result_df) > 0:
fraud_df = result_df[result_df["fraud_prediction"] == 1]
normal_df = result_df[result_df["fraud_prediction"] == 0]
fig11 = go.Figure()
if len(normal_df) > 0:
fig11.add_trace(go.Box(
y=normal_df[amount_col],
name="Normal Transactions",
marker_color="blue",
boxmean='sd'
))
if len(fraud_df) > 0:
fig11.add_trace(go.Box(
y=fraud_df[amount_col],
name="Flagged as Fraud",
marker_color="red",
boxmean='sd'
))
fig11.update_layout(
title="Transaction Amount Distribution Comparison",
yaxis_title=f"Amount ({amount_col})",
height=400,
showlegend=True
)
visualizations.append(fig11)
else:
fig11 = go.Figure()
fig11.update_layout(
title="Transaction Amount Distribution (Amount column not found)",
height=400
)
visualizations.append(fig11)
# 12. Fraud Probability Distribution Comparison (Violin Plot)
if len(result_df) > 0:
ground_truth_cols = ["is_fraud", "fraud", "target", "label", "isFraud"]
has_gt = any(col in result_df.columns for col in ground_truth_cols)
if has_gt:
gt_col = next((col for col in ground_truth_cols if col in result_df.columns), None)
if gt_col:
fraud_probs = result_df[result_df[gt_col] == 1]["fraud_probability"]
normal_probs = result_df[result_df[gt_col] == 0]["fraud_probability"]
fig12 = go.Figure()
if len(normal_probs) > 0:
fig12.add_trace(go.Violin(
y=normal_probs,
name="Actual Normal",
box_visible=True,
meanline_visible=True,
marker_color="blue"
))
if len(fraud_probs) > 0:
fig12.add_trace(go.Violin(
y=fraud_probs,
name="Actual Fraud",
box_visible=True,
meanline_visible=True,
marker_color="red"
))
fig12.add_hline(
y=threshold,
line_dash="dash",
line_color="orange",
annotation_text=f"Threshold: {threshold:.2f}"
)
fig12.update_layout(
title="Fraud Probability Distribution: Actual Fraud vs Normal",
yaxis_title="Fraud Probability",
height=400,
showlegend=True
)
visualizations.append(fig12)
else:
visualizations.append(None)
else:
# Use predicted labels if no ground truth
fraud_probs = result_df[result_df["fraud_prediction"] == 1]["fraud_probability"]
normal_probs = result_df[result_df["fraud_prediction"] == 0]["fraud_probability"]
fig12 = go.Figure()
if len(normal_probs) > 0:
fig12.add_trace(go.Violin(
y=normal_probs,
name="Predicted Normal",
box_visible=True,
meanline_visible=True,
marker_color="blue"
))
if len(fraud_probs) > 0:
fig12.add_trace(go.Violin(
y=fraud_probs,
name="Predicted Fraud",
box_visible=True,
meanline_visible=True,
marker_color="red"
))
fig12.add_hline(
y=threshold,
line_dash="dash",
line_color="orange",
annotation_text=f"Threshold: {threshold:.2f}"
)
fig12.update_layout(
title="Fraud Probability Distribution: Predicted Fraud vs Normal",
yaxis_title="Fraud Probability",
height=400,
showlegend=True
)
visualizations.append(fig12)
else:
visualizations.append(None)
# 13. Feature Correlation Heatmap (if multiple features available)
if len(result_df) > 0:
try:
# Select numeric features for correlation
numeric_cols = result_df.select_dtypes(include=[np.number]).columns.tolist()
# Exclude probability and prediction columns for cleaner visualization
exclude_cols = ['fraud_probability', 'fraud_prediction', 'is_fraud', 'fraud', 'target', 'label']
feature_cols = [col for col in numeric_cols if col not in exclude_cols]
# Limit to top 15 features for readability
if len(feature_cols) > 15:
# Select features with highest variance
variances = result_df[feature_cols].var().sort_values(ascending=False)
feature_cols = variances.head(15).index.tolist()
if len(feature_cols) > 1:
corr_matrix = result_df[feature_cols + ['fraud_probability']].corr()
fig13 = go.Figure(data=go.Heatmap(
z=corr_matrix.values,
x=corr_matrix.columns,
y=corr_matrix.index,
colorscale='RdBu',
zmid=0,
text=corr_matrix.round(2).values,
texttemplate='%{text}',
textfont={"size": 10},
colorbar=dict(title="Correlation")
))
fig13.update_layout(
title="Feature Correlation Heatmap (Top Features)",
height=500,
width=600
)
visualizations.append(fig13)
else:
visualizations.append(None)
except Exception as e:
print(f"Warning: Could not create correlation heatmap: {e}")
visualizations.append(None)
else:
visualizations.append(None)
# 14. Cumulative Fraud Detection Over Time
time_col = None
for col in ["Time", "time", "unix_time", "timestamp"]:
if col in result_df.columns:
time_col = col
break
if time_col and len(result_df) > 0:
sorted_df = result_df.sort_values(time_col).copy()
sorted_df['cumulative_fraud'] = sorted_df['fraud_prediction'].cumsum()
sorted_df['cumulative_total'] = range(1, len(sorted_df) + 1)
sorted_df['fraud_rate'] = sorted_df['cumulative_fraud'] / sorted_df['cumulative_total']
fig14 = go.Figure()
fig14.add_trace(go.Scatter(
x=sorted_df[time_col],
y=sorted_df['cumulative_fraud'],
mode='lines',
name='Cumulative Fraud Detected',
line=dict(color='red', width=2)
))
fig14.add_trace(go.Scatter(
x=sorted_df[time_col],
y=sorted_df['fraud_rate'] * 100,
mode='lines',
name='Fraud Rate (%)',
yaxis='y2',
line=dict(color='orange', width=2, dash='dash')
))
fig14.update_layout(
title="Cumulative Fraud Detection Over Time",
xaxis_title=time_col,
yaxis_title="Cumulative Fraud Count",
yaxis2=dict(
title="Fraud Rate (%)",
overlaying='y',
side='right'
),
height=400,
showlegend=True
)
visualizations.append(fig14)
else:
visualizations.append(None)
# 15. Threshold Sensitivity Analysis (if ground truth available)
if model_metrics:
try:
ground_truth_cols = ["is_fraud", "fraud", "target", "label", "isFraud"]
y_true = None
for col in ground_truth_cols:
if col in result_df.columns:
y_true = result_df[col]
break
if y_true is not None and len(y_true) > 0:
y_prob = result_df["fraud_probability"]
thresholds = np.linspace(0.01, 0.99, 50)
precisions_list = []
recalls_list = []
f1_scores_list = []
for t in thresholds:
y_pred = (y_prob >= t).astype(int)
if y_pred.sum() > 0:
prec, rec, f1, _ = precision_recall_fscore_support(
y_true, y_pred, average='binary', zero_division=0
)
precisions_list.append(prec)
recalls_list.append(rec)
f1_scores_list.append(f1)
else:
precisions_list.append(0)
recalls_list.append(0)
f1_scores_list.append(0)
fig15 = go.Figure()
fig15.add_trace(go.Scatter(
x=thresholds,
y=precisions_list,
mode='lines',
name='Precision',
line=dict(color='blue', width=2)
))
fig15.add_trace(go.Scatter(
x=thresholds,
y=recalls_list,
mode='lines',
name='Recall',
line=dict(color='green', width=2)
))
fig15.add_trace(go.Scatter(
x=thresholds,
y=f1_scores_list,
mode='lines',
name='F1-Score',
line=dict(color='red', width=2)
))
fig15.add_vline(
x=threshold,
line_dash="dash",
line_color="orange",
annotation_text=f"Current: {threshold:.2f}"
)
fig15.update_layout(
title="Threshold Sensitivity Analysis",
xaxis_title="Threshold",
yaxis_title="Score",
height=400,
showlegend=True
)
visualizations.append(fig15)
else:
visualizations.append(None)
except Exception as e:
print(f"Warning: Could not create threshold sensitivity: {e}")
visualizations.append(None)
else:
visualizations.append(None)
return visualizations
def predict_fraud_enhanced(file, threshold: float = 0.5):
"""
Enhanced fraud detection with comprehensive visualizations and GPU acceleration support.
"""
print("=" * 60)
print("FUNCTION CALLED: predict_fraud_enhanced")
print(f"File received: {file}")
print(f"Threshold: {threshold}")
print("=" * 60)
if file is None:
print("ERROR: File is None")
return (
"Please upload a CSV file.",
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
try:
start = time.time()
# Read CSV (limit to 10000 rows for performance)
# Handle different Gradio file input formats (Gradio 6.0 compatibility)
# With type="filepath", Gradio returns a string path directly
file_path = None
# Method 1: Direct string path (most common in Gradio 6.0)
# Also handle if user provides a relative path like "dataset/fraudTest.csv"
if isinstance(file, str):
file_path = file
# If it's a relative path and exists, use it directly
if not os.path.isabs(file_path) and os.path.exists(file_path):
print(f"DEBUG: Using relative path: {file_path}")
else:
print(f"DEBUG: File is string: {file_path}")
# Method 2: FileData object (check all possible attributes)
elif file is not None:
# Try all possible file path attributes
for attr in ['path', 'name', 'orig_name', 'file_path', 'file_name']:
if hasattr(file, attr):
value = getattr(file, attr)
if value and isinstance(value, str) and len(value) > 3:
file_path = value
print(f"DEBUG: Found file path via {attr}: {file_path}")
break
# If still no path, try tuple/list
if not file_path and isinstance(file, (list, tuple)) and len(file) > 0:
file_path = file[0] if isinstance(file[0], str) else str(file[0])
print(f"DEBUG: File is tuple/list: {file_path}")
# Last resort: convert to string
if not file_path:
file_str = str(file)
print(f"DEBUG: File as string: {file_str}")
# Check if it's a valid path-like string
if file_str and file_str != 'file' and file_str != 'None' and len(file_str) > 3:
if os.path.sep in file_str or file_str.endswith('.csv'):
file_path = file_str
print(f"DEBUG: Extracted path from string: {file_path}")
print(f"DEBUG: Final file_path: {file_path}")
# Validate file path exists
if not file_path:
error_msg = f"Error: Could not determine file path. File type: {type(file)}, File value: {file}"
print(error_msg)
return (
error_msg,
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
if not os.path.exists(file_path):
error_msg = f"Error: File not found at path: {file_path}. Please try uploading the file again."
print(error_msg)
return (
error_msg,
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
print(f"DEBUG: File exists, reading CSV from: {file_path}")
# Read CSV with error handling for large files and permission issues
# Windows/Gradio temp files often have permission issues, so we try multiple methods
df = None
read_methods_tried = []
# Method 1: Try reading in binary mode first (most reliable for locked files)
try:
print("Trying binary read method (most reliable for Windows temp files)...")
with open(file_path, 'rb') as f:
content = f.read()
df = pd.read_csv(BytesIO(content), nrows=10000, low_memory=False)
print("βœ“ File read successfully using binary method")
except (PermissionError, IOError, OSError) as binary_error:
read_methods_tried.append(f"Binary read: {str(binary_error)}")
print(f"⚠ Binary read failed, trying direct read...")
# Method 2: Try to read directly
try:
df = pd.read_csv(file_path, nrows=10000, low_memory=False)
print("βœ“ File read successfully using direct method")
except (PermissionError, IOError, OSError) as perm_error:
read_methods_tried.append(f"Direct read: {str(perm_error)}")
print(f"⚠ Permission denied, trying copy methods...")
# Method 3: Try copying to project directory (more reliable)
try:
print("Trying copy to project directory...")
project_temp = os.path.join(os.getcwd(), f"temp_upload_{uuid.uuid4().hex[:8]}.csv")
# Use shutil.copy instead of copy2 to avoid metadata issues
shutil.copy(file_path, project_temp)
# Small delay to ensure file is fully copied
time.sleep(0.1)
df = pd.read_csv(project_temp, nrows=10000, low_memory=False)
# Clean up
try:
os.remove(project_temp)
print("βœ“ Temporary file cleaned up")
except:
pass
print("βœ“ File read successfully after copying to project directory")
except Exception as copy_error:
read_methods_tried.append(f"Copy method: {str(copy_error)}")
# Method 4: Try system temp directory with different approach
try:
print("Trying system temp directory with alternative method...")
temp_dir = tempfile.gettempdir()
temp_filename = f"gradio_upload_{uuid.uuid4().hex[:8]}.csv"
temp_file = os.path.join(temp_dir, temp_filename)
# Read and write in chunks to avoid locking
with open(file_path, 'rb') as src, open(temp_file, 'wb') as dst:
shutil.copyfileobj(src, dst)
df = pd.read_csv(temp_file, nrows=10000, low_memory=False)
# Clean up
try:
os.remove(temp_file)
except:
pass
print("βœ“ File read successfully using system temp directory")
except Exception as temp_error:
read_methods_tried.append(f"System temp: {str(temp_error)}")
# All methods failed - provide helpful error message
error_details = "\n".join([f"- {m}" for m in read_methods_tried])
return (
f"❌ Error: Could not read file due to permission/access issues.\n\n"
f"**Quick Fix:**\n"
f"1. **Copy the file to the project folder first** (e.g., `dataset/fraudTest.csv`)\n"
f"2. Then upload it from there - this avoids Windows temp file permission issues\n\n"
f"**Alternative Solutions:**\n"
f"- Close any programs that might be using the file (Excel, text editor, etc.)\n"
f"- Try uploading the file again (sometimes works on retry)\n"
f"- If using Windows, try running the app as Administrator\n"
f"- Use the file directly: The app can process `dataset/fraudTest.csv` if it exists\n\n"
f"**Why this happens:**\n"
f"Gradio stores uploaded files in Windows temp directory which can have permission restrictions.\n"
f"Copying the file to your project folder first avoids this issue.\n\n"
f"**Technical details:**\n{error_details}",
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
except Exception as csv_error:
# Handle CSV parsing errors separately
if "Permission denied" in str(csv_error) or "Errno 13" in str(csv_error):
return (
f"❌ Permission Error: Could not access the uploaded file.\n\n"
f"**Solution:** Copy your CSV file to the project folder (e.g., `dataset/fraudTest.csv`) "
f"and upload it from there. This avoids Windows temp file permission issues.\n\n"
f"**Error:** {str(csv_error)}",
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
return (
f"Error reading CSV file: {str(csv_error)}\n\nPlease ensure the file is a valid CSV format.",
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
if df is None:
return (
"Error: Could not read the CSV file. Please try uploading again.",
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
if df.empty:
return (
"CSV is empty.",
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
print(f"Processing {len(df)} rows...")
print(f"CSV columns: {list(df.columns)[:10]}...") # Print first 10 columns for debugging
print(f"Total columns in CSV: {len(df.columns)}")
# Score the batch
try:
print("Starting score_batch...")
result_df = score_batch(df, threshold=threshold)
print(f"Score_batch completed. Result shape: {result_df.shape}")
print(f"Fraud probabilities range: {result_df['fraud_probability'].min():.4f} to {result_df['fraud_probability'].max():.4f}")
except FileNotFoundError as model_error:
# Handle missing model file with user-friendly message
error_msg = (
f"❌ **Model File Not Found**\n\n"
f"The fraud detection model (`fraud_lgbm_calibrated.pkl`) is required to make predictions.\n\n"
f"**To fix this:**\n"
f"1. Download the model file from your GitHub repository\n"
f"2. Place it in the same directory as `app.py`\n"
f"3. Or train a new model using the notebook: `notebooks/eda_and_feature_engineering.ipynb`\n\n"
f"**Error details:** {str(model_error)}"
)
print(f"MODEL ERROR: {error_msg}")
return (
error_msg,
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
except Exception as score_error:
import traceback
error_details = f"Error during scoring: {str(score_error)}\n\n{traceback.format_exc()}"
print(f"SCORING ERROR: {error_details}")
return (
f"❌ {error_details}",
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
# Calculate statistics
total = len(result_df)
fraud_count = int(result_df["fraud_prediction"].sum())
fraud_rate = (fraud_count / total) * 100 if total > 0 else 0.0
avg_prob = result_df["fraud_probability"].mean()
max_prob = result_df["fraud_probability"].max()
min_prob = result_df["fraud_probability"].min()
median_prob = result_df["fraud_probability"].median()
elapsed = time.time() - start
# Calculate model performance metrics if ground truth is available
model_metrics = {}
has_ground_truth = False
# Check for ground truth labels (common column names)
ground_truth_cols = ["is_fraud", "fraud", "target", "label", "isFraud"]
y_true = None
for col in ground_truth_cols:
if col in result_df.columns:
y_true = result_df[col]
has_ground_truth = True
print(f"Found ground truth column: {col}")
break
if has_ground_truth and y_true is not None:
try:
y_prob = result_df["fraud_probability"]
y_pred = result_df["fraud_prediction"]
# Calculate metrics at current threshold
model_metrics["roc_auc"] = roc_auc_score(y_true, y_prob)
precision, recall, f1, _ = precision_recall_fscore_support(
y_true, y_pred, average="binary", zero_division=0
)
model_metrics["precision"] = float(precision)
model_metrics["recall"] = float(recall)
model_metrics["f1"] = float(f1)
# PR-AUC (better for imbalanced datasets)
model_metrics["pr_auc"] = average_precision_score(y_true, y_prob)
# Confusion matrix
cm = confusion_matrix(y_true, y_pred)
model_metrics["true_negatives"] = int(cm[0, 0])
model_metrics["false_positives"] = int(cm[0, 1])
model_metrics["false_negatives"] = int(cm[1, 0])
model_metrics["true_positives"] = int(cm[1, 1])
print(f"Model metrics calculated: AUC={model_metrics['roc_auc']:.4f}, PR-AUC={model_metrics['pr_auc']:.4f}")
except Exception as e:
print(f"Warning: Could not calculate metrics: {str(e)}")
import traceback
print(traceback.format_exc())
has_ground_truth = False
# Create summary text with model metrics
summary = f"""
## πŸ“Š Fraud Detection Results
**Processing Summary:**
- βœ… Processed **{total:,}** transactions in **{elapsed:.2f}s**
- 🎯 Detection threshold: **{threshold:.2f}**
**Fraud Statistics:**
- 🚨 **Fraud flagged:** {fraud_count:,} transactions ({fraud_rate:.2f}%)
- βœ… **Normal transactions:** {total - fraud_count:,} ({(100 - fraud_rate):.2f}%)
**Probability Statistics:**
- πŸ“ˆ **Average probability:** {avg_prob:.4f}
- πŸ“Š **Median probability:** {median_prob:.4f}
- ⬆️ **Maximum probability:** {max_prob:.4f}
- ⬇️ **Minimum probability:** {min_prob:.4f}
**Risk Level Breakdown:**
{result_df['risk_level'].value_counts().to_string()}
"""
# Add model performance metrics if available
if has_ground_truth and model_metrics:
# Calculate additional metrics
tp = model_metrics.get('true_positives', 0)
fp = model_metrics.get('false_positives', 0)
fn = model_metrics.get('false_negatives', 0)
tn = model_metrics.get('true_negatives', 0)
summary += f"""
---
## 🎯 Model Performance Metrics
**Current Threshold ({threshold:.2f}) Performance:**
- πŸ“Š **ROC-AUC:** {model_metrics['roc_auc']:.4f} {'βœ… Good discrimination' if model_metrics['roc_auc'] > 0.8 else '⚠️ Needs improvement'}
- πŸ“ˆ **PR-AUC (Better for imbalanced data):** {model_metrics.get('pr_auc', 0):.4f} {'βœ… Good for imbalanced data' if model_metrics.get('pr_auc', 0) > 0.1 else '⚠️ Very low - model struggles with imbalance'}
- 🎯 **Precision:** {model_metrics['precision']:.4f} ({int((1-model_metrics['precision'])*100)}% false positives)
- πŸ” **Recall:** {model_metrics['recall']:.4f} ({int((1-model_metrics['recall'])*100)}% fraud missed)
- βš–οΈ **F1-Score:** {model_metrics['f1']:.4f} {'βœ… Balanced' if model_metrics['f1'] > 0.2 else '⚠️ Poor balance'}
**Confusion Matrix:**
- βœ… **True Negatives (Correctly identified normal):** {tn:,}
- ❌ **False Positives (Normal flagged as fraud):** {fp:,}
- ❌ **False Negatives (Fraud missed):** {fn:,}
- βœ… **True Positives (Correctly identified fraud):** {tp:,}
**Accuracy:** {(tp + tn) / total * 100:.2f}%
"""
# Export to Power BI (automatic)
if POWERBI_AVAILABLE:
try:
export_powerbi_csvs(result_df)
print("βœ… Data exported to Power BI format")
except Exception as e:
print(f"⚠️ Power BI export failed: {e}")
# Get top suspicious transactions
top_df = result_df.nlargest(20, "fraud_probability")
# Select columns to display
display_cols = ["fraud_probability", "fraud_prediction", "risk_level"]
# Add amount column if available
for amt_col in ["amt", "amount", "Amount", "TransactionAmt"]:
if amt_col in top_df.columns:
display_cols.insert(1, amt_col)
break
# Add first few original columns
for col in result_df.columns:
if col not in ["fraud_probability", "fraud_prediction", "risk_level"] and col not in display_cols:
if len(display_cols) < 8: # Limit display columns
display_cols.append(col)
else:
break
display_cols = [c for c in display_cols if c in top_df.columns]
top_display = top_df[display_cols].copy()
# Format probability column for display
if "fraud_probability" in top_display.columns:
top_display["fraud_probability"] = top_display["fraud_probability"].round(4)
# Create visualizations (pass model_metrics if available)
try:
viz_list = create_visualizations(result_df, threshold, model_metrics if has_ground_truth else None)
except Exception as viz_error:
import traceback
print(f"Warning: Error creating some visualizations: {str(viz_error)}")
print(traceback.format_exc())
# Create empty visualizations list if there's an error
viz_list = []
# Ensure we always return 15 visualizations
while len(viz_list) < 15:
viz_list.append(None)
# Return all outputs (summary, table, and 15 visualizations)
print(f"Returning results: summary length={len(summary)}, top_display shape={top_display.shape}, viz_count={len([v for v in viz_list if v is not None])}")
return (
summary,
top_display,
viz_list[0], # Probability Distribution
viz_list[1], # Risk Level Breakdown
viz_list[2], # Amount Analysis
viz_list[3], # Statistics
viz_list[4], # Time Series
viz_list[5], # Top 20 Suspicious
viz_list[6] if len(viz_list) > 6 else None, # Model Performance Metrics
viz_list[7] if len(viz_list) > 7 else None, # Confusion Matrix
viz_list[8] if len(viz_list) > 8 else None, # ROC Curve
viz_list[9] if len(viz_list) > 9 else None, # Precision-Recall Curve
viz_list[10] if len(viz_list) > 10 else None, # Amount Distribution (Box Plot)
viz_list[11] if len(viz_list) > 11 else None, # Probability Distribution (Violin)
viz_list[12] if len(viz_list) > 12 else None, # Correlation Heatmap
viz_list[13] if len(viz_list) > 13 else None, # Cumulative Fraud Detection
viz_list[14] if len(viz_list) > 14 else None, # Threshold Sensitivity
)
except Exception as e:
import traceback
error_msg = f"❌ Error: {str(e)}\n\n{traceback.format_exc()}"
print(f"FATAL ERROR: {error_msg}")
return (
error_msg,
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
# Apply GPU decorator to prediction function for ZeroGPU
# This allows GPU to be allocated only during prediction calls, not for the entire server
# ZeroGPU requires at least one @spaces.GPU decorated function to detect and allocate GPU
if SPACES_AVAILABLE:
print("Applying @spaces.GPU decorator to predict_fraud_enhanced for ZeroGPU support")
predict_fraud_enhanced = spaces.GPU(predict_fraud_enhanced)
# Create Gradio Interface
with gr.Blocks(
title="Credit Card Fraud Detection",
analytics_enabled=False
) as demo:
gr.Markdown("""
# πŸ›‘οΈ Credit Card Fraud Detection System
Upload a CSV file with transaction data to detect potential fraud. The system uses a calibrated
LightGBM model to predict fraud probabilities and provides comprehensive visualizations.
**Supported formats:** CSV files with numeric features (V1-V28, Amount, Time, etc.)
""")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(
label="πŸ“ Upload Transaction CSV",
file_count="single"
)
# Add button to use dataset file directly (avoids permission issues)
use_dataset_btn = gr.Button(
"πŸ“‚ Use dataset/fraudTest.csv (Skip Upload)",
variant="secondary",
size="sm"
)
# Add Sample button to use sample dataset
sample_btn = gr.Button(
"πŸ“Š Use Sample Dataset",
variant="secondary",
size="sm"
)
gr.Markdown("""
**πŸ’‘ Tip:** If you get permission errors, click the button above to use the dataset file directly!
**🎯 Try Sample:** Click "Use Sample Dataset" to test the model with sample data!
""")
with gr.Column(scale=1):
threshold = gr.Slider(
minimum=0.01,
maximum=0.99,
value=0.05,
step=0.01,
label="🎯 Fraud Detection Threshold",
info="⚠️ For imbalanced data, use 0.01-0.05 for better recall. Default 0.5 is too high!"
)
# Function to use dataset file directly
def use_dataset_file():
if os.path.exists(DATASET_PATH):
# Return the file path as a string - the predict function will handle it
return DATASET_PATH
else:
return None
# When button is clicked, trigger prediction with dataset file
def use_dataset_and_predict(threshold_val):
try:
print(f"DEBUG: Looking for dataset at: {DATASET_PATH}")
print(f"DEBUG: File exists: {os.path.exists(DATASET_PATH)}")
if os.path.exists(DATASET_PATH):
print(f"DEBUG: Found dataset, calling predict_fraud_enhanced...")
# Call predict function directly with dataset path
return predict_fraud_enhanced(DATASET_PATH, threshold_val)
else:
# Try relative path as fallback
fallback_path = "dataset/fraudTest.csv"
if os.path.exists(fallback_path):
print(f"DEBUG: Found dataset at relative path: {fallback_path}")
return predict_fraud_enhanced(fallback_path, threshold_val)
# Try absolute path from current directory
abs_fallback = os.path.abspath(fallback_path)
if os.path.exists(abs_fallback):
print(f"DEBUG: Found dataset at absolute fallback path: {abs_fallback}")
return predict_fraud_enhanced(abs_fallback, threshold_val)
return (
f"❌ Error: Dataset not found.\n\n"
f"**Expected locations:**\n"
f"- {DATASET_PATH}\n"
f"- {fallback_path}\n"
f"- {abs_fallback}\n\n"
f"Please upload a file instead.",
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
except Exception as e:
import traceback
error_msg = f"❌ Error loading dataset: {str(e)}\n\n{traceback.format_exc()}"
print(f"ERROR in use_dataset_and_predict: {error_msg}")
return (
error_msg,
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
# When Sample button is clicked, use sample dataset
def use_sample_and_predict(threshold_val):
try:
print(f"DEBUG: Looking for sample dataset at: {SAMPLE_DATASET_PATH}")
print(f"DEBUG: File exists: {os.path.exists(SAMPLE_DATASET_PATH)}")
print(f"DEBUG: Current working directory: {os.getcwd()}")
print(f"DEBUG: Script directory: {SCRIPT_DIR}")
if os.path.exists(SAMPLE_DATASET_PATH):
print(f"DEBUG: Found sample dataset, calling predict_fraud_enhanced...")
# Call predict function directly with sample path
return predict_fraud_enhanced(SAMPLE_DATASET_PATH, threshold_val)
else:
# Try relative path as fallback
fallback_path = "sample_transactions.csv"
if os.path.exists(fallback_path):
print(f"DEBUG: Found sample dataset at relative path: {fallback_path}")
return predict_fraud_enhanced(fallback_path, threshold_val)
# Try absolute path from current directory
abs_fallback = os.path.abspath(fallback_path)
if os.path.exists(abs_fallback):
print(f"DEBUG: Found sample dataset at absolute fallback path: {abs_fallback}")
return predict_fraud_enhanced(abs_fallback, threshold_val)
return (
f"❌ Error: Sample dataset not found.\n\n"
f"**Expected locations:**\n"
f"- {SAMPLE_DATASET_PATH}\n"
f"- {fallback_path}\n"
f"- {abs_fallback}\n\n"
f"**Current working directory:** {os.getcwd()}\n"
f"**Script directory:** {SCRIPT_DIR}\n\n"
f"Please ensure the sample_transactions.csv file exists in one of these locations.",
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
except Exception as e:
import traceback
error_msg = f"❌ Error loading sample dataset: {str(e)}\n\n{traceback.format_exc()}"
print(f"ERROR in use_sample_and_predict: {error_msg}")
return (
error_msg,
pd.DataFrame(),
None, None, None, None, None, None, None, None, None, None, None, None, None, None, None
)
btn = gr.Button("πŸ” Detect Fraud", variant="primary", size="lg")
with gr.Row():
summary_output = gr.Markdown(label="Summary")
with gr.Tabs():
with gr.Tab("πŸ“‹ Top Suspicious Transactions"):
output_table = gr.Dataframe(
label="Top 20 Most Suspicious Transactions",
wrap=True
)
with gr.Tab("πŸ“Š Probability Distribution"):
viz1 = gr.Plot(label="Fraud Probability Distribution")
with gr.Tab("πŸ₯§ Risk Level Breakdown"):
viz2 = gr.Plot(label="Risk Level Distribution")
with gr.Tab("πŸ’° Amount Analysis"):
viz3 = gr.Plot(label="Amount vs Fraud Probability")
with gr.Tab("πŸ“ˆ Statistics"):
viz4 = gr.Plot(label="Transaction Summary")
with gr.Tab("⏰ Time Series"):
viz5 = gr.Plot(label="Fraud Probability Over Time")
with gr.Tab("πŸ” Top 20 Suspicious"):
viz6 = gr.Plot(label="Top 20 Most Suspicious Transactions")
with gr.Tab("🎯 Model Performance"):
viz7 = gr.Plot(label="Model Performance Metrics")
with gr.Tab("πŸ“Š Confusion Matrix"):
viz8 = gr.Plot(label="Confusion Matrix")
with gr.Tab("πŸ“ˆ ROC Curve"):
viz9 = gr.Plot(label="ROC Curve")
with gr.Tab("πŸ“‰ Precision-Recall Curve"):
viz10 = gr.Plot(label="Precision-Recall Curve")
with gr.Tab("πŸ“¦ Amount Distribution"):
viz11 = gr.Plot(label="Amount Distribution Comparison")
with gr.Tab("🎻 Probability Distribution"):
viz12 = gr.Plot(label="Fraud Probability Distribution Comparison")
with gr.Tab("πŸ”₯ Correlation Heatmap"):
viz13 = gr.Plot(label="Feature Correlation Heatmap")
with gr.Tab("πŸ“Š Cumulative Detection"):
viz14 = gr.Plot(label="Cumulative Fraud Detection Over Time")
with gr.Tab("βš™οΈ Threshold Analysis"):
viz15 = gr.Plot(label="Threshold Sensitivity Analysis")
# Connect button to use dataset file (after outputs are defined)
use_dataset_btn.click(
fn=use_dataset_and_predict,
inputs=[threshold],
outputs=[
summary_output,
output_table,
viz1, viz2, viz3, viz4, viz5, viz6, viz7, viz8,
viz9, viz10, viz11, viz12, viz13, viz14, viz15
]
)
# Connect Sample button (after outputs are defined)
sample_btn.click(
fn=use_sample_and_predict,
inputs=[threshold],
outputs=[
summary_output,
output_table,
viz1, viz2, viz3, viz4, viz5, viz6, viz7, viz8,
viz9, viz10, viz11, viz12, viz13, viz14, viz15
]
)
btn.click(
fn=predict_fraud_enhanced,
inputs=[file_input, threshold],
outputs=[
summary_output,
output_table,
viz1, viz2, viz3, viz4, viz5, viz6, viz7, viz8,
viz9, viz10, viz11, viz12, viz13, viz14, viz15
]
# Removed api_name to avoid Gradio API info generation bug
)
gr.Markdown("""
---
### πŸ“ Notes:
- The model expects numeric features (V1-V28, Amount, optionally Time)
- If exact feature names are not found, the system will use available numeric columns
- Processing is limited to 10,000 rows for optimal performance
- Adjust the threshold to balance between false positives and false negatives
""")
# Module-level function for launching the app
def launch_app():
"""Launch the Gradio app."""
# Test GPU availability (optional - CuPy will use GPU automatically if available)
if CUPY_AVAILABLE:
try:
# Quick test to verify GPU is accessible
test_gpu = cp.array([1.0, 2.0, 3.0])
result = cp.sum(test_gpu)
gpu_result = float(result)
print(f"βœ… GPU available for data processing (test result: {gpu_result})")
except Exception as e:
print(f"⚠️ GPU test failed (will use CPU): {e}")
# Launch Gradio app
# Note: GPU operations happen in score_batch() using CuPy
# For ZeroGPU: GPU will be allocated automatically when CuPy operations are performed
# We don't use @spaces.GPU decorator on launch_app() to avoid GPU context timeout issues
demo.launch()
if __name__ == "__main__":
import os
print("Starting Gradio app...")
# Check if running on Hugging Face Spaces
is_spaces = os.getenv("SPACE_ID") is not None
if is_spaces:
print("Detected Hugging Face Spaces environment")
# On Spaces, launch without GPU decorator on launch_app
# GPU operations will happen in score_batch() using CuPy
# ZeroGPU will allocate GPU automatically when CuPy operations are performed
print("Note: GPU will be used automatically in prediction functions via CuPy")
launch_app()
else:
print("Running locally - server will be available at http://127.0.0.1:7860")
print("Watch this console for debug output when you upload files!")
demo.queue(api_open=False)
try:
demo.launch(
server_name="0.0.0.0", # Use 0.0.0.0 instead of 127.0.0.1 for better compatibility
server_port=7860,
share=False,
show_error=True,
quiet=False,
inbrowser=False,
)
except ValueError as e:
# Fallback if 0.0.0.0 doesn't work, try with share enabled
if "localhost is not accessible" in str(e):
print("⚠️ Localhost not accessible, trying with share=True...")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=True, # Enable share as fallback
show_error=True,
quiet=False,
inbrowser=False,
)
else:
raise