Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import re | |
| import os | |
| import numpy as np | |
| from datetime import datetime, timedelta | |
| from huggingface_hub import InferenceClient, HfApi | |
| import io | |
| import base64 | |
| import unicodedata | |
| import warnings | |
| from requests.exceptions import HTTPError | |
| # Optional: phone normalization (graceful fallback if missing) | |
| try: | |
| import phonenumbers | |
| except Exception: | |
| phonenumbers = None | |
| # Read from Space secret - check both common environment variables | |
| HF_TOKEN = (os.getenv("HF_TOKEN", "") or os.getenv("HF_SPACES", "")).strip() | |
| # Initialize API clients | |
| api = HfApi(token=HF_TOKEN if HF_TOKEN else None) | |
| client = InferenceClient(token=HF_TOKEN) if HF_TOKEN else InferenceClient() | |
| # Your target models | |
| DEFAULT_MODEL_ID = "tiiuae/falcon-7b-instruct" | |
| SECONDARY_MODEL_ID = "HuggingFaceH4/zephyr-7b-beta" | |
| def hf_inference_supported(model_id: str, task: str = "text-generation") -> bool: | |
| """Check if model is supported on HF's Serverless Inference API""" | |
| try: | |
| info = api.model_info(model_id, expand=["inferenceProviderMapping"]) | |
| mapping = getattr(info, "inference_provider_mapping", None) or getattr(info, "inferenceProviderMapping", None) | |
| if not mapping: | |
| return False | |
| # mapping is a dict: { provider_name: { tasks: [...] , ... } } | |
| return any( | |
| prov.lower() in {"hf-inference", "hf_serverless", "hf-inference-api"} and | |
| (task in (details.get("tasks") or [])) | |
| for prov, details in mapping.items() | |
| ) | |
| except Exception as e: | |
| print(f"[HF] Provider check failed for {model_id}: {e}") | |
| return False | |
| def _hf_sanity_check(): | |
| """Enhanced sanity check with provider validation""" | |
| print("[HF] Token present:", bool(HF_TOKEN), "len:", len(HF_TOKEN) if HF_TOKEN else 0) | |
| if not HF_TOKEN: | |
| print("[HF] No token provided - API calls will fail") | |
| return False | |
| # Test models and their provider support | |
| test_models = [DEFAULT_MODEL_ID, SECONDARY_MODEL_ID] | |
| for model in test_models: | |
| supported = hf_inference_supported(model) | |
| print(f"[HF] {model}: {'✓ Supported' if supported else '✗ Not available'} on HF Serverless") | |
| if supported: | |
| try: | |
| # Quick test call | |
| response = client.text_generation( | |
| model=model, | |
| prompt="Hello", | |
| max_new_tokens=5 | |
| ) | |
| print(f"[HF] ✓ {model} API test successful") | |
| return True | |
| except HTTPError as e: | |
| code = getattr(e.response, "status_code", None) | |
| if code == 401: | |
| print(f"[HF] ✗ {model}: Invalid token") | |
| elif code == 403: | |
| print(f"[HF] ✗ {model}: License not accepted") | |
| else: | |
| print(f"[HF] ✗ {model}: HTTP {code}") | |
| except Exception as e: | |
| print(f"[HF] ✗ {model}: {e}") | |
| return False | |
| # Run sanity check | |
| _hf_sanity_check() | |
| # -------------------------- | |
| # Utility Functions | |
| # -------------------------- | |
| def generate_summary(prompt: str, model_id: str = DEFAULT_MODEL_ID): | |
| """Generate AI-powered analysis using HuggingFace models with proper error handling""" | |
| if not HF_TOKEN: | |
| return "⚠️ HF token missing. Set HF_TOKEN in Space Secrets and restart the Space." | |
| # Check if model is available on HF Serverless | |
| if not hf_inference_supported(model_id): | |
| return (f"⚠️ Model not available on HF's Serverless Inference API: {model_id}\n\n" | |
| f"This model either:\n" | |
| f"• Requires a Dedicated Inference Endpoint\n" | |
| f"• Is not supported for text-generation tasks\n" | |
| f"• Has restricted access\n\n" | |
| f"Check the HuggingFace Models Support Matrix for alternatives, " | |
| f"or deploy a Dedicated Inference Endpoint.") | |
| try: | |
| # Use the correct parameter name: prompt (not inputs) | |
| response = client.text_generation( | |
| model=model_id, | |
| prompt=prompt, | |
| max_new_tokens=500, | |
| temperature=0.7, | |
| do_sample=True, | |
| ) | |
| return response | |
| except HTTPError as e: | |
| code = getattr(e.response, "status_code", None) | |
| if code == 401: | |
| return "⚠️ HF token missing/invalid. Set HF_TOKEN in Space Secrets and restart the Space." | |
| elif code == 403: | |
| return (f"⚠️ Access denied to {model_id}.\n\n" | |
| f"Required action:\n" | |
| f"• Visit: https://huggingface.co/{model_id}\n" | |
| f"• Accept the model's license with the same HF account as your token\n" | |
| f"• Or use a model with open access") | |
| elif code == 503: | |
| return (f"⚠️ Model {model_id} is currently loading or unavailable.\n" | |
| f"This is common with CPU-only inference. Try again in a few minutes,\n" | |
| f"or consider GPU-accelerated inference for better reliability.") | |
| else: | |
| return f"⚠️ Inference error (HTTP {code}): {e}" | |
| except Exception as e: | |
| return f"⚠️ Inference error: {e}" | |
| def create_download_link(df, filename): | |
| """Create downloadable CSV from DataFrame""" | |
| csv = df.to_csv(index=False) | |
| b64 = base64.b64encode(csv.encode()).decode() | |
| return f'<a href="data:file/csv;base64,{b64}" download="{filename}">📥 Download {filename}</a>' | |
| # Update the Gradio interface section for the AI chatbot: | |
| # AI Consultant Module | |
| with gr.Column(visible=False) as ai_section: | |
| gr.Markdown("## AI-Powered Risk Consultant") | |
| gr.Markdown("### Chat with our AI expert about fraud detection and risk management") | |
| model_choice = gr.Dropdown( | |
| choices=[DEFAULT_MODEL_ID, SECONDARY_MODEL_ID], | |
| label="Choose AI Model", | |
| value=DEFAULT_MODEL_ID, | |
| info="Select the language model for analysis" | |
| ) | |
| # Add model status indicator | |
| with gr.Row(): | |
| gr.Markdown(""" | |
| **Model Status**: | |
| - ✅ = Available on HF Serverless | |
| - ⚠️ = Requires license acceptance | |
| - ❌ = Not supported | |
| """) | |
| chatbot = gr.Chatbot( | |
| label="Risk Management Consultant", | |
| height=500 | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="Ask about fraud detection, risk assessment, compliance...", | |
| placeholder="e.g., How can I improve my transaction fraud detection?", | |
| scale=4 | |
| ) | |
| submit_btn = gr.Button("Send", scale=1, variant="primary") | |
| msg.submit( | |
| chatbot_respond, | |
| inputs=[msg, chatbot, model_choice], | |
| outputs=[chatbot, msg] | |
| ) | |
| submit_btn.click( | |
| chatbot_respond, | |
| inputs=[msg, chatbot, model_choice], | |
| outputs=[chatbot, msg] | |
| ) | |
| # ------------------------- | |
| # Data Validation & Normalization Helpers (Point 1) | |
| # ------------------------- | |
| def _standardize_columns(df: pd.DataFrame) -> pd.DataFrame: | |
| df = df.copy() | |
| df.columns = [re.sub(r"\s+", "_", str(c).strip().lower()) for c in df.columns] | |
| return df | |
| def _norm_str(s: str) -> str: | |
| if pd.isna(s): | |
| return "" | |
| s = unicodedata.normalize("NFKC", str(s)) | |
| s = re.sub(r"\s+", " ", s).strip() | |
| return s | |
| def _norm_name(name: str) -> str: | |
| n = _norm_str(name) | |
| parts = [p if len(p) <= 3 and p.isupper() else p.title() for p in n.split(" ")] | |
| return " ".join(parts) | |
| def _norm_email(email: str): | |
| e = _norm_str(email).lower() | |
| if not e: | |
| return "", "missing_email" | |
| if not re.match(r"^[a-z0-9._%+\-]+@[a-z0-9.\-]+\.[a-z]{2,}$", e): | |
| return e, "invalid_email" | |
| return e, None | |
| def _norm_phone(phone: str, default_region: str = "IN"): | |
| raw = _norm_str(phone) | |
| if not raw: | |
| return "", "missing_phone" | |
| if phonenumbers: | |
| try: | |
| num = phonenumbers.parse(raw, default_region) | |
| if not (phonenumbers.is_possible_number(num) and phonenumbers.is_valid_number(num)): | |
| return raw, "invalid_phone" | |
| return phonenumbers.format_number(num, phonenumbers.PhoneNumberFormat.E164), None | |
| except Exception: | |
| return raw, "invalid_phone" | |
| digits = re.sub(r"\D", "", raw) | |
| if len(digits) < 8: | |
| return raw, "invalid_phone" | |
| return "+" + digits, None | |
| def _to_datetime(series, errors="coerce"): | |
| return pd.to_datetime(series, errors=errors, utc=False, infer_datetime_format=True) | |
| def _to_numeric(series): | |
| return pd.to_numeric(series, errors="coerce") | |
| def _new_issues_list(): | |
| return [] | |
| def _add_issue(issues, row_idx, field, issue, value): | |
| issues.append({ | |
| "row": int(row_idx) if pd.notna(row_idx) else None, | |
| "field": field, | |
| "issue": issue, | |
| "value": None if pd.isna(value) else str(value) | |
| }) | |
| def _issues_df(issues): | |
| return pd.DataFrame(issues, columns=["row", "field", "issue", "value"]) if issues else pd.DataFrame(columns=["row", "field", "issue", "value"]) | |
| # ---------- Per-module preparation ---------- | |
| def _prepare_transactions_df(df_raw: pd.DataFrame): | |
| issues = _new_issues_list() | |
| df = _standardize_columns(df_raw) | |
| required = {"customer_id", "amount", "timestamp"} | |
| missing = required - set(df.columns) | |
| if missing: | |
| return None, _issues_df([{"row": None, "field": "/".join(sorted(missing)), "issue": "missing_required_columns", "value": ""}]), f"Missing required columns: {sorted(missing)}" | |
| df["customer_id"] = df["customer_id"].astype(str).apply(_norm_str) | |
| df["amount"] = _to_numeric(df["amount"]) | |
| df["timestamp"] = _to_datetime(df["timestamp"]) | |
| for c in ["merchant_category", "merchant", "country", "device_id", "ip_address"]: | |
| if c in df.columns: | |
| df[c] = df[c].astype(str).apply(_norm_str) | |
| for idx, v in df["amount"].items(): | |
| if pd.isna(v): | |
| _add_issue(issues, idx, "amount", "non_numeric_amount", df_raw.loc[idx, "amount"]) | |
| for idx, v in df["timestamp"].items(): | |
| if pd.isna(v): | |
| _add_issue(issues, idx, "timestamp", "unparseable_timestamp", df_raw.loc[idx, "timestamp"]) | |
| clean = df.dropna(subset=["customer_id", "amount", "timestamp"]).copy() | |
| kept = len(clean); total = len(df_raw) | |
| quality = f"Validated {total} rows → kept {kept}, dropped {total - kept} due to invalid amount/timestamp/customer_id." | |
| return clean, _issues_df(issues), quality | |
| def _prepare_kyc_df(df_raw: pd.DataFrame): | |
| issues = _new_issues_list() | |
| df = _standardize_columns(df_raw) | |
| required = {"customer_id", "name", "email", "dob"} | |
| missing = required - set(df.columns) | |
| if missing: | |
| return None, _issues_df([{"row": None, "field": "/".join(sorted(missing)), "issue": "missing_required_columns", "value": ""}]), f"Missing required columns: {sorted(missing)}" | |
| df["customer_id"] = df["customer_id"].astype(str).apply(_norm_str) | |
| df["name"] = df["name"].astype(str).apply(_norm_name) | |
| emails = [] | |
| for idx, e in df["email"].items(): | |
| ne, err = _norm_email(e) | |
| emails.append(ne) | |
| if err: | |
| _add_issue(issues, idx, "email", err, e) | |
| df["email"] = emails | |
| if "phone" in df.columns: | |
| phones = [] | |
| for idx, p in df["phone"].items(): | |
| np_, err = _norm_phone(p, default_region="IN") | |
| phones.append(np_) | |
| if err: | |
| _add_issue(issues, idx, "phone", err, p) | |
| df["phone"] = phones | |
| df["dob_parsed"] = _to_datetime(df["dob"]) | |
| now = pd.Timestamp.now(tz=None) | |
| too_old_cutoff = now - pd.DateOffset(years=120) | |
| for idx, d in df["dob_parsed"].items(): | |
| if pd.isna(d): | |
| _add_issue(issues, idx, "dob", "unparseable_dob", df_raw.loc[idx, "dob"]) | |
| elif d > now: | |
| _add_issue(issues, idx, "dob", "future_dob", df_raw.loc[idx, "dob"]) | |
| elif d < too_old_cutoff: | |
| _add_issue(issues, idx, "dob", "age_over_120", df_raw.loc[idx, "dob"]) | |
| clean = df.dropna(subset=["customer_id", "name", "email", "dob_parsed"]).copy() | |
| kept = len(clean); total = len(df_raw) | |
| quality = f"KYC validated {total} rows → kept {kept}, dropped {total - kept} due to email/phone/DOB issues." | |
| return clean, _issues_df(issues), quality | |
| def _prepare_sanctions_customer_df(df_raw: pd.DataFrame): | |
| issues = _new_issues_list() | |
| df = _standardize_columns(df_raw) | |
| required = {"customer_id", "name"} | |
| missing = required - set(df.columns) | |
| if missing: | |
| return None, _issues_df([{"row": None, "field": "/".join(sorted(missing)), "issue": "missing_required_columns", "value": ""}]), f"Missing required columns: {sorted(missing)}" | |
| df["customer_id"] = df["customer_id"].astype(str).apply(_norm_str) | |
| df["name"] = df["name"].astype(str).apply(_norm_name) | |
| if "dob" in df.columns: | |
| df["dob_parsed"] = _to_datetime(df["dob"]) | |
| for idx, d in df["dob_parsed"].items(): | |
| if pd.isna(d): | |
| _add_issue(issues, idx, "dob", "unparseable_dob", df_raw.loc[idx, "dob"]) | |
| if "country" in df.columns: | |
| df["country"] = df["country"].astype(str).apply(_norm_str) | |
| clean = df.dropna(subset=["customer_id", "name"]).copy() | |
| quality = f"Sanctions input validated {len(df_raw)} rows → kept {len(clean)}, dropped {len(df_raw)-len(clean)}." | |
| return clean, _issues_df(issues), quality | |
| def _prepare_sanctions_list_df(sanctions_file): | |
| if sanctions_file is None: | |
| return None, pd.DataFrame(), "Using built-in demo sanctions list." | |
| try: | |
| raw = pd.read_csv(sanctions_file.name) | |
| df = _standardize_columns(raw) | |
| if "name" not in df.columns: | |
| msg_df = _issues_df([{"row": None, "field": "name", "issue": "missing_required_columns", "value": ""}]) | |
| return None, msg_df, "Uploaded sanctions list missing required 'name' column. Using demo list." | |
| df["name"] = df["name"].astype(str).apply(_norm_name) | |
| return df, pd.DataFrame(), "Using uploaded sanctions list." | |
| except Exception as e: | |
| warn = _issues_df([{"row": None, "field": "file", "issue": "read_error", "value": str(e)}]) | |
| return None, warn, "Failed to read uploaded sanctions list. Using demo list." | |
| def _prepare_credit_df(df_raw: pd.DataFrame): | |
| issues = _new_issues_list() | |
| df = _standardize_columns(df_raw) | |
| required = {"customer_id"} | |
| missing = required - set(df.columns) | |
| if missing: | |
| return None, _issues_df([{"row": None, "field": "/".join(sorted(missing)), "issue": "missing_required_columns", "value": ""}]), f"Missing required columns: {sorted(missing)}" | |
| df["customer_id"] = df["customer_id"].astype(str).apply(_norm_str) | |
| numeric_cols = ["credit_score", "utilization_rate", "debt_to_income", "income", "recent_defaults"] | |
| for c in numeric_cols: | |
| if c in df.columns: | |
| df[c] = _to_numeric(df[c]) | |
| if "credit_score" in df.columns: | |
| for idx, v in df["credit_score"].items(): | |
| if pd.isna(v): | |
| _add_issue(issues, idx, "credit_score", "non_numeric", df_raw.loc[idx, "credit_score"]) | |
| elif not (250 <= v <= 950): | |
| _add_issue(issues, idx, "credit_score", "out_of_range_250_950", v) | |
| if "utilization_rate" in df.columns: | |
| for idx, v in df["utilization_rate"].items(): | |
| if pd.isna(v): | |
| _add_issue(issues, idx, "utilization_rate", "non_numeric", df_raw.loc[idx, "utilization_rate"]) | |
| elif not (0 <= v <= 1.5): | |
| _add_issue(issues, idx, "utilization_rate", "out_of_range_0_1.5", v) | |
| if "debt_to_income" in df.columns: | |
| for idx, v in df["debt_to_income"].items(): | |
| if pd.isna(v): | |
| _add_issue(issues, idx, "debt_to_income", "non_numeric", df_raw.loc[idx, "debt_to_income"]) | |
| elif not (0 <= v <= 2): | |
| _add_issue(issues, idx, "debt_to_income", "out_of_range_0_2", v) | |
| if "income" in df.columns: | |
| for idx, v in df["income"].items(): | |
| if pd.isna(v): | |
| _add_issue(issues, idx, "income", "non_numeric", df_raw.loc[idx, "income"]) | |
| elif v < 0: | |
| _add_issue(issues, idx, "income", "negative_income", v) | |
| if "recent_defaults" in df.columns: | |
| for idx, v in df["recent_defaults"].items(): | |
| if pd.isna(v): | |
| _add_issue(issues, idx, "recent_defaults", "non_numeric", df_raw.loc[idx, "recent_defaults"]) | |
| elif v < 0: | |
| _add_issue(issues, idx, "recent_defaults", "negative_count", v) | |
| clean = df.dropna(subset=["customer_id"]).copy() | |
| kept = len(clean); total = len(df_raw) | |
| quality = f"Credit input validated {total} rows → kept {kept}, dropped {total - kept} (non-critical fields coerced with issues recorded)." | |
| return clean, _issues_df(issues), quality | |
| # ------------------------- | |
| # 1. Transaction Fraud (Enhanced + Validation) | |
| # ------------------------- | |
| def process_transaction_file(file): | |
| """Process transaction data for fraud detection""" | |
| try: | |
| df_raw = pd.read_csv(file.name) | |
| df, dq_issues, quality = _prepare_transactions_df(df_raw) | |
| if df is None: | |
| return pd.DataFrame(), quality, "", dq_issues | |
| # Enhanced fraud detection rules (on clean df) | |
| high_risk_mc = (df["merchant_category"] == "HIGH_RISK") if "merchant_category" in df.columns else False | |
| suspicious_conditions = ( | |
| (df['amount'] > 10000) | | |
| (df['amount'] < 0) | | |
| (high_risk_mc) | | |
| (df.groupby('customer_id')['amount'].transform('sum') > 50000) | |
| ) | |
| suspicious = df[suspicious_conditions].copy() | |
| def _reason(x): | |
| if x['amount'] > 10000: return 'Large Amount' | |
| if x['amount'] < 0: return 'Negative Amount' | |
| if ('merchant_category' in df.columns and x.get('merchant_category') == 'HIGH_RISK'): return 'High Risk Merchant' | |
| return 'Daily Limit Exceeded' | |
| if not suspicious.empty: | |
| suspicious['risk_reason'] = suspicious.apply(_reason, axis=1) | |
| prompt = f"""You are a financial fraud analyst. Analyze these suspicious transactions: | |
| Sample: | |
| {df.head(10).to_string()} | |
| Suspicious Found: {len(suspicious)} | |
| Data Quality: {quality} | |
| Provide a risk assessment and recommended actions.""" | |
| summary = generate_summary(prompt) | |
| stats = f"{quality}\nFound {len(suspicious)} suspicious transactions out of {len(df)} usable rows." | |
| return suspicious, summary, stats, dq_issues | |
| except Exception as e: | |
| return pd.DataFrame(), f"Error processing file: {str(e)}", "", pd.DataFrame() | |
| # ------------------------- | |
| # 2. KYC Fraud Analysis (Enhanced + Validation) | |
| # ------------------------- | |
| def process_kyc_file(file): | |
| """Process KYC data for identity fraud detection""" | |
| try: | |
| df_raw = pd.read_csv(file.name) | |
| df, dq_issues, quality = _prepare_kyc_df(df_raw) | |
| if df is None: | |
| return pd.DataFrame(), quality, "", dq_issues | |
| flagged_records = [] | |
| dup_email = df[df.duplicated('email', keep=False)] | |
| if not dup_email.empty: | |
| dup_email = dup_email.copy(); dup_email['flag_reason'] = 'Duplicate Email' | |
| flagged_records.append(dup_email) | |
| if 'phone' in df.columns: | |
| dup_phone = df[df.duplicated('phone', keep=False)] | |
| if not dup_phone.empty: | |
| dup_phone = dup_phone.copy(); dup_phone['flag_reason'] = 'Duplicate Phone' | |
| flagged_records.append(dup_phone) | |
| if 'name' in df.columns: | |
| suspicious_names = df[ | |
| df['name'].str.contains(r'^[A-Z]+$', na=False) | | |
| df['name'].str.contains(r'\d', na=False) | | |
| (df['name'].str.len() < 3) | |
| ].copy() | |
| if not suspicious_names.empty: | |
| suspicious_names['flag_reason'] = 'Suspicious Name Pattern' | |
| flagged_records.append(suspicious_names) | |
| flagged_df = pd.concat(flagged_records, ignore_index=True).drop_duplicates() if flagged_records else pd.DataFrame() | |
| prompt = f"""You are a KYC fraud analyst. Review identity records for potential fraud. | |
| Total: {len(df)} | |
| Flagged: {len(flagged_df)} | |
| Data Quality: {quality} | |
| Flag reasons distribution: | |
| {flagged_df['flag_reason'].value_counts().to_string() if not flagged_df.empty else 'None'} | |
| Recommend verification steps.""" | |
| summary = generate_summary(prompt) | |
| stats = f"{quality}\nFlagged {len(flagged_df)} suspicious KYC records out of {len(df)} usable rows." | |
| return flagged_df, summary, stats, dq_issues | |
| except Exception as e: | |
| return pd.DataFrame(), f"Error processing KYC file: {str(e)}", "", pd.DataFrame() | |
| # ------------------------- | |
| # 3. Sanctions Check (Enhanced + Validation) | |
| # ------------------------- | |
| def process_sanctions_file(file, sanctions_file=None): | |
| """Process customer data against sanctions/PEP lists""" | |
| try: | |
| customer_raw = pd.read_csv(file.name) | |
| customers, cust_issues, cust_quality = _prepare_sanctions_customer_df(customer_raw) | |
| sanc_df, sanc_issues, sanc_quality = _prepare_sanctions_list_df(sanctions_file) | |
| if sanc_df is None: | |
| default_sanctions = [ | |
| "John Doe", "Jane Smith", "Muhammad Ali", "Vladimir Putin", | |
| "Kim Jong Un", "Alexander Petrov", "Maria Gonzalez" | |
| ] | |
| sanc_df = pd.DataFrame({"name": [_norm_name(x) for x in default_sanctions]}) | |
| dq_issues = pd.concat([cust_issues, sanc_issues], ignore_index=True) if not cust_issues.empty or not sanc_issues.empty else pd.DataFrame() | |
| if customers is None: | |
| msg = f"{cust_quality} | {sanc_quality}" | |
| return pd.DataFrame(), msg, "", dq_issues | |
| # Exact matches | |
| exact = customers[customers["name"].isin(set(sanc_df["name"]))].copy() | |
| if not exact.empty: | |
| exact["match_type"] = "Exact Match" | |
| # Simple fuzzy: shared >=2 tokens | |
| sanc_tokens = [set(n.lower().split()) for n in sanc_df["name"]] | |
| fuzzy_idx = [] | |
| for idx, row in customers.iterrows(): | |
| tokens = set(str(row["name"]).lower().split()) | |
| for st in sanc_tokens: | |
| if len(tokens & st) >= 2: | |
| fuzzy_idx.append(idx); break | |
| fuzzy = customers.loc[sorted(set(fuzzy_idx))].copy() if fuzzy_idx else pd.DataFrame() | |
| if not fuzzy.empty: | |
| fuzzy["match_type"] = "Fuzzy Match" | |
| flagged = pd.concat([exact, fuzzy]).drop_duplicates() | |
| prompt = f"""You are a compliance officer conducting sanctions screening. | |
| Customers screened: {len(customers)} | |
| Potential matches: {len(flagged)} | |
| Customer sample: | |
| {customers.head(5).to_string()} | |
| Data Quality: {cust_quality} | {sanc_quality} | |
| Assess risk and recommend EDD steps for any matches.""" | |
| summary = generate_summary(prompt) | |
| stats = f"{cust_quality} | {sanc_quality}\nFound {len(flagged)} potential matches out of {len(customers)} customers." | |
| return flagged, summary, stats, dq_issues | |
| except Exception as e: | |
| return pd.DataFrame(), f"Error processing sanctions check: {str(e)}", "", pd.DataFrame() | |
| # ------------------------- | |
| # 4. Credit Risk Analysis (Enhanced + Validation) | |
| # ------------------------- | |
| def process_credit_file(file): | |
| """Process credit data for risk assessment""" | |
| try: | |
| df_raw = pd.read_csv(file.name) | |
| df, dq_issues, quality = _prepare_credit_df(df_raw) | |
| if df is None: | |
| return pd.DataFrame(), quality, "", dq_issues | |
| masks = [] | |
| if 'credit_score' in df.columns: | |
| masks.append(df['credit_score'] < 600) | |
| if 'utilization_rate' in df.columns: | |
| masks.append(df['utilization_rate'] > 0.8) | |
| if 'debt_to_income' in df.columns: | |
| masks.append(df['debt_to_income'] > 0.4) | |
| if 'recent_defaults' in df.columns: | |
| masks.append(df['recent_defaults'] > 0) | |
| if 'income' in df.columns: | |
| masks.append(df['income'] < 30000) | |
| if masks: | |
| risk_mask = pd.concat(masks, axis=1).any(axis=1) | |
| risky_customers = df[risk_mask].copy() | |
| risky_customers['risk_score'] = 0 | |
| for m in masks: | |
| risky_customers.loc[m, 'risk_score'] += 1 | |
| risky_customers['risk_level'] = risky_customers['risk_score'].apply( | |
| lambda x: 'High' if x >= 3 else 'Medium' if x >= 2 else 'Low' | |
| ) | |
| else: | |
| risky_customers = pd.DataFrame() | |
| prompt = f"""You are a credit risk analyst. Assess these customer credit profiles: | |
| Total Customers: {len(df)} | |
| High-Risk Customers: {len(risky_customers)} | |
| Risk Distribution: | |
| {risky_customers['risk_level'].value_counts().to_string() if not risky_customers.empty else 'No high-risk customers identified'} | |
| Data Quality: {quality} | |
| Provide risk assessment insights and recommend credit policies or monitoring actions.""" | |
| summary = generate_summary(prompt) | |
| stats = f"{quality}\nIdentified {len(risky_customers)} high-risk customers out of {len(df)} usable rows." | |
| return risky_customers, summary, stats, dq_issues | |
| except Exception as e: | |
| return pd.DataFrame(), f"Error processing credit risk file: {str(e)}", "", pd.DataFrame() | |
| # ------------------------- | |
| # 5. Chatbot (Enhanced) | |
| # ------------------------- | |
| def chatbot_respond(message, history, model_choice): | |
| """Enhanced chatbot for fraud and risk analysis queries""" | |
| conversation = "" | |
| for msg, response in history: | |
| conversation += f"User: {msg}\nAssistant: {response}\n\n" | |
| prompt = f"""You are an expert fraud analyst and risk management consultant. Help users with: | |
| - Transaction fraud detection | |
| - KYC/Identity verification | |
| - Sanctions screening | |
| - Credit risk assessment | |
| - Regulatory compliance | |
| - Financial crime prevention | |
| Previous conversation: | |
| {conversation} | |
| User: {message} | |
| Assistant:""" | |
| try: | |
| response = generate_summary(prompt, model_id=model_choice) | |
| history.append((message, response)) | |
| return history, "" | |
| except Exception as e: | |
| error_response = f"I apologize, but I encountered an error: {str(e)}" | |
| history.append((message, error_response)) | |
| return history, "" | |
| # ------------------------- | |
| # Navigation Functions (Updated) | |
| # ------------------------- | |
| def update_section(section): | |
| """Updated navigation function to properly show/hide sections""" | |
| # Set all sections to hidden first | |
| visibility = { | |
| "overview": False, | |
| "transaction": False, | |
| "kyc": False, | |
| "sanctions": False, | |
| "credit": False, | |
| "ai": False | |
| } | |
| # Show only the requested section | |
| visibility[section] = True | |
| return ( | |
| gr.update(visible=visibility["overview"]), | |
| gr.update(visible=visibility["transaction"]), | |
| gr.update(visible=visibility["kyc"]), | |
| gr.update(visible=visibility["sanctions"]), | |
| gr.update(visible=visibility["credit"]), | |
| gr.update(visible=visibility["ai"]), | |
| gr.update(visible=(section != "overview")) # Show back button only when not on overview | |
| ) | |
| # ------------------------- | |
| # Gradio Interface | |
| # ------------------------- | |
| with gr.Blocks(theme=gr.themes.Soft(), title="🛡️ Fraud Detector Analyst") as demo: | |
| # Overview/Front Page | |
| with gr.Column(visible=True) as overview_section: | |
| gr.Markdown( | |
| """ | |
| # 🛡️ Fraud Detector Analyst | |
| ## Multi-Module Risk Intelligence Platform | |
| Welcome to the comprehensive fraud detection and risk management platform powered by AI. | |
| Choose a module below to get started with your risk analysis. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 🔍 Detection Modules") | |
| transaction_btn = gr.Button( | |
| "📊 Transaction Fraud Detection", | |
| size="lg", | |
| variant="primary" | |
| ) | |
| gr.Markdown("*Detect suspicious transaction patterns and anomalies*") | |
| kyc_btn = gr.Button( | |
| "🆔 KYC Identity Fraud Analysis", | |
| size="lg", | |
| variant="primary" | |
| ) | |
| gr.Markdown("*Identify duplicate accounts and synthetic identities*") | |
| sanctions_btn = gr.Button( | |
| "🌍 Sanctions & PEP Screening", | |
| size="lg", | |
| variant="primary" | |
| ) | |
| gr.Markdown("*Screen customers against global sanctions lists*") | |
| with gr.Column(): | |
| gr.Markdown("### 🎯 Assessment Tools") | |
| credit_btn = gr.Button( | |
| "💳 Credit Risk Assessment", | |
| size="lg", | |
| variant="primary" | |
| ) | |
| gr.Markdown("*Evaluate credit risk and default probability*") | |
| ai_btn = gr.Button( | |
| "💬 AI-Powered Risk Consultant", | |
| size="lg", | |
| variant="secondary" | |
| ) | |
| gr.Markdown("*Get expert guidance on fraud and risk management*") | |
| gr.Markdown( | |
| """ | |
| --- | |
| ### 🚀 Quick Start Guide | |
| 1. **Choose a module** from the buttons above based on your analysis needs | |
| 2. **Upload your CSV data** following the format requirements | |
| 3. **Review AI-powered insights** and flagged records | |
| 4. **Download results** for further investigation | |
| ### 📋 Supported Data Formats | |
| - **Transactions**: `customer_id, amount, merchant_category, timestamp` | |
| - **KYC Records**: `customer_id, name, email, phone, dob, address` | |
| - **Customer Lists**: `customer_id, name, dob, country` | |
| - **Credit Profiles**: `customer_id, credit_score, utilization_rate, income` | |
| """ | |
| ) | |
| # Navigation Back Button (for all modules) | |
| back_btn = gr.Button("← Back to Main Menu", visible=False, variant="secondary") | |
| # Transaction Fraud Module | |
| with gr.Column(visible=False) as transaction_section: | |
| gr.Markdown("## 📊 Transaction Fraud Detection") | |
| gr.Markdown("### Upload transaction data to detect fraudulent patterns") | |
| with gr.Row(): | |
| trans_file = gr.File( | |
| label="Upload Transaction CSV", | |
| file_types=[".csv"], | |
| type="filepath" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| trans_summary = gr.Textbox( | |
| label="AI Analysis Summary", | |
| lines=8, | |
| interactive=False | |
| ) | |
| with gr.Column(): | |
| trans_stats = gr.Textbox( | |
| label="Detection Statistics", | |
| lines=3, | |
| interactive=False | |
| ) | |
| trans_results = gr.Dataframe( | |
| label="Suspicious Transactions", | |
| interactive=False | |
| ) | |
| trans_issues = gr.Dataframe( | |
| label="Data Quality Issues", | |
| interactive=False | |
| ) | |
| trans_file.upload( | |
| process_transaction_file, | |
| inputs=[trans_file], | |
| outputs=[trans_results, trans_summary, trans_stats, trans_issues] | |
| ) | |
| # KYC Fraud Module | |
| with gr.Column(visible=False) as kyc_section: | |
| gr.Markdown("## 🆔 KYC Identity Fraud Analysis") | |
| gr.Markdown("### Detect identity fraud and synthetic accounts in customer onboarding data") | |
| with gr.Row(): | |
| kyc_file = gr.File( | |
| label="Upload KYC Customer Data CSV", | |
| file_types=[".csv"], | |
| type="filepath" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| kyc_summary = gr.Textbox( | |
| label="KYC Fraud Analysis", | |
| lines=8, | |
| interactive=False | |
| ) | |
| with gr.Column(): | |
| kyc_stats = gr.Textbox( | |
| label="KYC Statistics", | |
| lines=3, | |
| interactive=False | |
| ) | |
| kyc_results = gr.Dataframe( | |
| label="Flagged KYC Records", | |
| interactive=False | |
| ) | |
| kyc_issues = gr.Dataframe( | |
| label="Data Quality Issues", | |
| interactive=False | |
| ) | |
| kyc_file.upload( | |
| process_kyc_file, | |
| inputs=[kyc_file], | |
| outputs=[kyc_results, kyc_summary, kyc_stats, kyc_issues] | |
| ) | |
| # Sanctions Check Module | |
| with gr.Column(visible=False) as sanctions_section: | |
| gr.Markdown("## 🌍 Sanctions & PEP Screening") | |
| gr.Markdown("### Screen customers against sanctions lists and PEP databases") | |
| with gr.Row(): | |
| sanctions_customer_file = gr.File( | |
| label="Upload Customer List CSV", | |
| file_types=[".csv"], | |
| type="filepath" | |
| ) | |
| sanctions_list_file = gr.File( | |
| label="Upload Sanctions List CSV (Optional)", | |
| file_types=[".csv"], | |
| type="filepath" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| sanctions_summary = gr.Textbox( | |
| label="Sanctions Screening Results", | |
| lines=8, | |
| interactive=False | |
| ) | |
| with gr.Column(): | |
| sanctions_stats = gr.Textbox( | |
| label="Screening Statistics", | |
| lines=3, | |
| interactive=False | |
| ) | |
| sanctions_results = gr.Dataframe( | |
| label="Flagged Customers", | |
| interactive=False | |
| ) | |
| sanctions_issues = gr.Dataframe( | |
| label="Data Quality Issues", | |
| interactive=False | |
| ) | |
| sanctions_customer_file.upload( | |
| lambda f1, f2: process_sanctions_file(f1, f2), | |
| inputs=[sanctions_customer_file, sanctions_list_file], | |
| outputs=[sanctions_results, sanctions_summary, sanctions_stats, sanctions_issues] | |
| ) | |
| # Credit Risk Module | |
| with gr.Column(visible=False) as credit_section: | |
| gr.Markdown("## 💳 Credit Risk Assessment") | |
| gr.Markdown("### Assess credit risk and default probability for loan applicants") | |
| with gr.Row(): | |
| credit_file = gr.File( | |
| label="Upload Credit Profile CSV", | |
| file_types=[".csv"], | |
| type="filepath" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| credit_summary = gr.Textbox( | |
| label="Credit Risk Analysis", | |
| lines=8, | |
| interactive=False | |
| ) | |
| with gr.Column(): | |
| credit_stats = gr.Textbox( | |
| label="Risk Statistics", | |
| lines=3, | |
| interactive=False | |
| ) | |
| credit_results = gr.Dataframe( | |
| label="High-Risk Customers", | |
| interactive=False | |
| ) | |
| credit_issues = gr.Dataframe( | |
| label="Data Quality Issues", | |
| interactive=False | |
| ) | |
| credit_file.upload( | |
| process_credit_file, | |
| inputs=[credit_file], | |
| outputs=[credit_results, credit_summary, credit_stats, credit_issues] | |
| ) | |
| # AI Consultant Module | |
| with gr.Column(visible=False) as ai_section: | |
| gr.Markdown("## 💬 AI-Powered Risk Consultant") | |
| gr.Markdown("### Chat with our AI expert about fraud detection and risk management") | |
| model_choice = gr.Dropdown( | |
| choices=[DEFAULT_MODEL_ID, SECONDARY_MODEL_ID], | |
| label="Choose AI Model", | |
| value=DEFAULT_MODEL_ID, | |
| info="Select the language model for analysis" | |
| ) | |
| chatbot = gr.Chatbot( | |
| label="Risk Management Consultant", | |
| height=500 | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="Ask about fraud detection, risk assessment, compliance...", | |
| placeholder="e.g., How can I improve my transaction fraud detection?", | |
| scale=4 | |
| ) | |
| submit_btn = gr.Button("Send", scale=1, variant="primary") | |
| msg.submit( | |
| chatbot_respond, | |
| inputs=[msg, chatbot, model_choice], | |
| outputs=[chatbot, msg] | |
| ) | |
| submit_btn.click( | |
| chatbot_respond, | |
| inputs=[msg, chatbot, model_choice], | |
| outputs=[chatbot, msg] | |
| ) | |
| # Button Event Handlers - Fixed to properly navigate between sections | |
| transaction_btn.click( | |
| fn=lambda: update_section("transaction"), | |
| inputs=[], | |
| outputs=[overview_section, transaction_section, kyc_section, sanctions_section, credit_section, ai_section, back_btn] | |
| ) | |
| kyc_btn.click( | |
| fn=lambda: update_section("kyc"), | |
| inputs=[], | |
| outputs=[overview_section, transaction_section, kyc_section, sanctions_section, credit_section, ai_section, back_btn] | |
| ) | |
| sanctions_btn.click( | |
| fn=lambda: update_section("sanctions"), | |
| inputs=[], | |
| outputs=[overview_section, transaction_section, kyc_section, sanctions_section, credit_section, ai_section, back_btn] | |
| ) | |
| credit_btn.click( | |
| fn=lambda: update_section("credit"), | |
| inputs=[], | |
| outputs=[overview_section, transaction_section, kyc_section, sanctions_section, credit_section, ai_section, back_btn] | |
| ) | |
| ai_btn.click( | |
| fn=lambda: update_section("ai"), | |
| inputs=[], | |
| outputs=[overview_section, transaction_section, kyc_section, sanctions_section, credit_section, ai_section, back_btn] | |
| ) | |
| # Back button handler - Returns to overview page | |
| back_btn.click( | |
| fn=lambda: update_section("overview"), | |
| inputs=[], | |
| outputs=[overview_section, transaction_section, kyc_section, sanctions_section, credit_section, ai_section, back_btn] | |
| ) | |
| # Footer (visible on all pages) | |
| with gr.Row(): | |
| gr.Markdown( | |
| """ | |
| --- | |
| **⚠️ Disclaimer:** This tool is for demonstration purposes. Always validate results with domain experts and comply with relevant regulations. | |
| """ | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| share=True, | |
| server_name="0.0.0.0", | |
| server_port=7860 | |
| ) |