from __future__ import annotations from dataclasses import dataclass from pathlib import Path import sys import numpy as np import pandas as pd from sklearn.model_selection import train_test_split ROOT_DIR = Path(__file__).resolve().parents[1] if str(ROOT_DIR) not in sys.path: sys.path.insert(0, str(ROOT_DIR)) from agents.common import DATA_DIR, SEED, ensure_runtime_dirs TRANSACTION_CATEGORIES = ["electronics", "grocery", "wire_transfer", "restaurant", "travel"] LOAN_PURPOSES = ["mortgage", "personal", "auto", "consolidation", "business"] COUNTRIES = [ "United States", "United Kingdom", "Canada", "United Arab Emirates", "Singapore", "Nigeria", "India", "Brazil", "Germany", "France", ] RISK_TYPES = ["SANCTIONS", "PEP", "WATCHLIST"] @dataclass(frozen=True) class DatasetSummary: name: str rows: int positive_rate: float def _clip(value: float, lower: float, upper: float) -> float: return float(np.clip(value, lower, upper)) def _clip_int(value: float, lower: int, upper: int) -> int: return int(np.clip(round(value), lower, upper)) def _weighted_choice(rng: np.random.Generator, values: list[str], weights: list[float]) -> str: normalized = np.asarray(weights, dtype=float) normalized = normalized / normalized.sum() return str(rng.choice(values, p=normalized)) def _generate_transaction_row(rng: np.random.Generator, is_fraud: int, idx: int) -> dict[str, object]: if is_fraud: if rng.random() < 0.55: base = float(rng.choice([500, 1000, 5000, 10000, 15000, 20000])) amount = _clip(base + rng.normal(0.0, max(20.0, base * 0.015)), 120.0, 50000.0) else: amount = _clip(rng.lognormal(mean=9.1, sigma=0.55), 150.0, 50000.0) hour_of_day = int(rng.choice([0, 1, 2, 3, 4, 5, 6, 7], p=[0.05, 0.07, 0.19, 0.22, 0.20, 0.14, 0.08, 0.05])) is_international = bool(rng.random() < 0.60) merchant_category = _weighted_choice( rng, TRANSACTION_CATEGORIES, [0.36, 0.06, 0.32, 0.08, 0.18], ) transaction_velocity_1h = _clip_int(rng.poisson(6.5) + 1, 1, 25) amount_vs_avg_ratio = _clip(rng.normal(4.6, 1.2), 1.4, 12.0) is_new_device = bool(rng.random() < 0.70) distance_from_home_km = _clip(rng.lognormal(mean=6.45, sigma=0.48), 60.0, 6000.0) failed_attempts_before = int(rng.integers(1, 4)) account_age_days = int(rng.integers(1, 45)) else: amount = _clip(rng.lognormal(mean=4.55, sigma=0.75), 5.0, 6500.0) if rng.random() < 0.08: amount = _clip(float(rng.choice([50, 100, 200, 500])) + rng.normal(0.0, 5.0), 5.0, 6500.0) safe_hour_weights = np.array( [0.02, 0.02, 0.015, 0.015, 0.015, 0.02, 0.04, 0.06, 0.08, 0.09, 0.08, 0.07, 0.06, 0.05, 0.05, 0.05, 0.055, 0.06, 0.06, 0.055, 0.05, 0.04, 0.03, 0.02], dtype=float, ) safe_hour_weights = safe_hour_weights / safe_hour_weights.sum() hour_of_day = int(rng.choice(np.arange(24), p=safe_hour_weights)) is_international = bool(rng.random() < 0.11) merchant_category = _weighted_choice( rng, TRANSACTION_CATEGORIES, [0.12, 0.40, 0.05, 0.26, 0.17], ) transaction_velocity_1h = _clip_int(rng.poisson(1.4), 0, 8) amount_vs_avg_ratio = _clip(rng.normal(1.15, 0.42), 0.2, 4.0) is_new_device = bool(rng.random() < 0.18) distance_from_home_km = _clip(rng.lognormal(mean=3.9, sigma=0.8), 0.5, 900.0) failed_attempts_before = 1 if rng.random() < 0.06 else 0 account_age_days = int(rng.integers(30, 3651)) if rng.random() < 0.04: amount_vs_avg_ratio = _clip(amount_vs_avg_ratio + rng.normal(0.0, 0.35), 0.2, 12.0) if rng.random() < 0.03: transaction_velocity_1h = _clip_int(transaction_velocity_1h + rng.integers(-1, 2), 0, 25) return { "transaction_id": f"TXN-{idx:05d}", "amount": round(amount, 2), "hour_of_day": hour_of_day, "is_international": is_international, "merchant_category": merchant_category, "transaction_velocity_1h": transaction_velocity_1h, "amount_vs_avg_ratio": round(amount_vs_avg_ratio, 3), "is_new_device": is_new_device, "distance_from_home_km": round(distance_from_home_km, 2), "failed_attempts_before": failed_attempts_before, "account_age_days": account_age_days, "is_fraud": int(is_fraud), } def _generate_credit_row(rng: np.random.Generator, is_default: int, idx: int) -> dict[str, object]: if is_default: credit_score = _clip_int(rng.normal(545, 42), 300, 720) debt_to_income_ratio = _clip(rng.beta(6.8, 3.6), 0.18, 0.98) employment_months = _clip_int(rng.gamma(1.6, 4.0), 0, 84) num_open_accounts = _clip_int(rng.normal(11.5, 3.0), 1, 22) payment_history_missed = _clip_int(rng.poisson(3.4), 0, 9) loan_amount = _clip(rng.lognormal(mean=10.9, sigma=0.42), 6000.0, 125000.0) revolving_utilization = _clip(rng.beta(8.4, 2.2), 0.25, 0.99) recent_hard_inquiries = _clip_int(rng.poisson(4.0), 0, 9) collateral_value = _clip(rng.normal(12000.0, 9000.0), 0.0, 120000.0) loan_purpose = _weighted_choice( rng, LOAN_PURPOSES, [0.08, 0.32, 0.10, 0.34, 0.16], ) else: credit_score = _clip_int(rng.normal(712, 58), 360, 850) debt_to_income_ratio = _clip(rng.beta(3.1, 7.2), 0.01, 0.82) employment_months = _clip_int(rng.gamma(5.8, 18.0), 1, 360) num_open_accounts = _clip_int(rng.normal(6.4, 2.8), 1, 18) payment_history_missed = _clip_int(rng.poisson(0.45), 0, 4) loan_amount = _clip(rng.lognormal(mean=10.55, sigma=0.52), 3000.0, 150000.0) revolving_utilization = _clip(rng.beta(2.2, 4.8), 0.01, 0.92) recent_hard_inquiries = _clip_int(rng.poisson(1.1), 0, 6) collateral_value = _clip(rng.normal(54000.0, 28000.0), 0.0, 300000.0) loan_purpose = _weighted_choice( rng, LOAN_PURPOSES, [0.38, 0.15, 0.18, 0.11, 0.18], ) if loan_purpose == "mortgage": collateral_value = max(collateral_value, loan_amount * rng.uniform(0.8, 1.4)) if loan_purpose in {"personal", "consolidation"} and not is_default: debt_to_income_ratio = _clip(debt_to_income_ratio + rng.normal(0.02, 0.03), 0.01, 0.82) return { "applicant_id": f"APP-{idx:05d}", "credit_score": credit_score, "debt_to_income_ratio": round(debt_to_income_ratio, 4), "employment_months": employment_months, "num_open_accounts": num_open_accounts, "payment_history_missed": payment_history_missed, "loan_amount": round(loan_amount, 2), "revolving_utilization": round(revolving_utilization, 4), "recent_hard_inquiries": recent_hard_inquiries, "collateral_value": round(collateral_value, 2), "loan_purpose": loan_purpose, "is_default": int(is_default), } def _generate_kyc_row(rng: np.random.Generator, is_anomaly: int, idx: int) -> dict[str, object]: if is_anomaly: id_document_age_days = int(rng.choice([rng.integers(1, 7), rng.integers(7300, 9500)])) address_match_score = _clip(rng.beta(1.2, 6.0), 0.01, 0.45) name_vs_id_match_score = _clip(rng.beta(1.8, 4.8), 0.05, 0.65) selfie_liveness_score = _clip(rng.beta(1.5, 5.2), 0.02, 0.55) num_accounts_same_address = _clip_int(rng.normal(5.2, 1.4), 3, 10) phone_age_days = _clip_int(rng.gamma(1.8, 4.0), 1, 60) email_domain_risk = int(rng.choice([1, 2, 3], p=[0.10, 0.70, 0.20])) ip_country_vs_id_country_match = bool(rng.random() < 0.18) velocity_applications_7d = _clip_int(rng.normal(6.8, 2.0), 2, 16) else: id_document_age_days = _clip_int(rng.gamma(4.8, 290.0), 20, 5400) address_match_score = _clip(rng.beta(8.5, 1.8), 0.45, 1.0) name_vs_id_match_score = _clip(rng.beta(8.2, 1.6), 0.55, 1.0) selfie_liveness_score = _clip(rng.beta(9.0, 1.6), 0.50, 1.0) num_accounts_same_address = _clip_int(rng.poisson(1.2), 0, 4) phone_age_days = _clip_int(rng.gamma(5.5, 120.0), 15, 4000) email_domain_risk = int(rng.choice([1, 2, 3], p=[0.62, 0.05, 0.33])) ip_country_vs_id_country_match = bool(rng.random() < 0.96) velocity_applications_7d = _clip_int(rng.poisson(1.0), 0, 5) return { "application_id": f"KYC-{idx:05d}", "id_document_age_days": id_document_age_days, "address_match_score": round(address_match_score, 4), "name_vs_id_match_score": round(name_vs_id_match_score, 4), "selfie_liveness_score": round(selfie_liveness_score, 4), "num_accounts_same_address": num_accounts_same_address, "phone_age_days": phone_age_days, "email_domain_risk": email_domain_risk, "ip_country_vs_id_country_match": ip_country_vs_id_country_match, "velocity_applications_7d": velocity_applications_7d, "is_anomaly": int(is_anomaly), } def _build_transaction_dataset() -> tuple[pd.DataFrame, pd.DataFrame]: rng = np.random.default_rng(SEED) total_rows = 12000 fraud_count = int(total_rows * 0.08) labels = np.array([1] * fraud_count + [0] * (total_rows - fraud_count)) rng.shuffle(labels) rows = [_generate_transaction_row(rng, int(label), idx + 1) for idx, label in enumerate(labels)] frame = pd.DataFrame(rows) train_df, test_df = train_test_split( frame, test_size=2000, random_state=SEED, stratify=frame["is_fraud"], ) return train_df.sort_values("transaction_id").reset_index(drop=True), test_df.sort_values("transaction_id").reset_index(drop=True) def _build_credit_dataset() -> tuple[pd.DataFrame, pd.DataFrame]: rng = np.random.default_rng(SEED + 1) total_rows = 10000 default_count = int(total_rows * 0.12) labels = np.array([1] * default_count + [0] * (total_rows - default_count)) rng.shuffle(labels) rows = [_generate_credit_row(rng, int(label), idx + 1) for idx, label in enumerate(labels)] frame = pd.DataFrame(rows) train_df, test_df = train_test_split( frame, test_size=2000, random_state=SEED, stratify=frame["is_default"], ) return train_df.sort_values("applicant_id").reset_index(drop=True), test_df.sort_values("applicant_id").reset_index(drop=True) def _build_kyc_dataset() -> tuple[pd.DataFrame, pd.DataFrame]: rng = np.random.default_rng(SEED + 2) total_rows = 6000 anomaly_count = int(total_rows * 0.05) labels = np.array([1] * anomaly_count + [0] * (total_rows - anomaly_count)) rng.shuffle(labels) rows = [_generate_kyc_row(rng, int(label), idx + 1) for idx, label in enumerate(labels)] frame = pd.DataFrame(rows) train_df, test_df = train_test_split( frame, test_size=1000, random_state=SEED, stratify=frame["is_anomaly"], ) train_df = train_df.sort_values("application_id").reset_index(drop=True) test_df = test_df.sort_values("application_id").reset_index(drop=True) return train_df.drop(columns=["is_anomaly"]), test_df def _alias_one(full_name: str) -> str: parts = full_name.split() if len(parts) < 2: return full_name return f"{parts[0]} {parts[-1][0]}." def _alias_two(full_name: str) -> str: parts = full_name.split() if len(parts) < 2: return full_name return f"{parts[-1]}, {parts[0]}" def _build_sanctions_dataset() -> pd.DataFrame: rng = np.random.default_rng(SEED + 3) ambiguous_names = [ "John Smith", "Mohammed Ali", "Chen Wei", "Maria Garcia", "David Kim", "Wei Chen", "Ahmed Hassan", "Michael Brown", "James Johnson", "Aisha Khan", "Juan Perez", "Fatima Noor", "Priya Sharma", "Mohamed Hassan", "Carlos Silva", "Sarah Ahmed", "Yusuf Khan", "Omar Ali", "Li Wei", "Ana Martinez", ] common_legitimate_names = [ "Emily Carter", "Olivia Turner", "Noah Bennett", "Liam Parker", "Mia Collins", "Ethan Brooks", "Sophia Reed", "Ava Morgan", "Lucas Hayes", "Charlotte Brooks", "Amelia Jenkins", "Benjamin Cooper", "Harper Diaz", "Elijah Ross", "Ella Murphy", "Grace Hughes", "Jack Foster", "Henry Price", "Lily Ward", "Mason Perry", ] first_names = [ "Abdul", "Amina", "Carlos", "Chen", "Dmitri", "Elena", "Farah", "Grace", "Hassan", "Ivan", "Jamal", "Karim", "Lina", "Marta", "Nadia", "Omar", "Pavel", "Qasim", "Rania", "Sergei", "Tariq", "Umar", "Viktor", "Wang", "Xiu", "Yara", "Zain", "Anya", "Boris", "Celine", "Diego", "Ebrahim", "Fiona", "Giorgio", "Helena", "Ismail", "Jelena", "Khalid", "Leila", "Nikolai", ] last_names = [ "Petrov", "Ivanov", "Haddad", "Rahman", "Mendoza", "Volkov", "Costa", "Akhtar", "Hussein", "Kim", "Zhang", "Garcia", "Morris", "Singh", "Kovacs", "Novak", "Rossi", "Dubois", "Silva", "Ibrahim", "Fischer", "Santos", "Ortega", "Khan", "Aliyev", "Pereira", "Muller", "Bennani", "Yilmaz", "Hassan", "Tan", "Lopes", "Sato", "Meyer", "Diallo", "Mensah", "Kassim", "Rahimi", "Saeed", "Ndlovu", ] names: list[str] = [] seen = set() for name in ambiguous_names + common_legitimate_names: if name not in seen: names.append(name) seen.add(name) while len(names) < 500: full_name = f"{rng.choice(first_names)} {rng.choice(last_names)}" if full_name in seen: continue seen.add(full_name) names.append(full_name) rows = [] for idx, full_name in enumerate(names[:500], start=1): risk_type = _weighted_choice(rng, RISK_TYPES, [0.50, 0.28, 0.22]) if risk_type == "SANCTIONS": risk_score = _clip(rng.normal(0.92, 0.05), 0.75, 1.0) elif risk_type == "PEP": risk_score = _clip(rng.normal(0.76, 0.08), 0.55, 0.95) else: risk_score = _clip(rng.normal(0.62, 0.10), 0.35, 0.85) rows.append( { "full_name": full_name, "alias_1": _alias_one(full_name), "alias_2": _alias_two(full_name), "country": str(rng.choice(COUNTRIES)), "risk_type": risk_type, "risk_score": round(risk_score, 3), "date_added": str(pd.Timestamp("2018-01-01") + pd.to_timedelta(int(rng.integers(0, 3000)), unit="D"))[:10], } ) return pd.DataFrame(rows) def main() -> None: ensure_runtime_dirs() DATA_DIR.mkdir(parents=True, exist_ok=True) transaction_train, transaction_test = _build_transaction_dataset() credit_train, credit_test = _build_credit_dataset() kyc_train, kyc_test = _build_kyc_dataset() sanctions_df = _build_sanctions_dataset() transaction_train.to_csv(DATA_DIR / "transaction_fraud_train.csv", index=False) transaction_test.to_csv(DATA_DIR / "transaction_fraud_test.csv", index=False) credit_train.to_csv(DATA_DIR / "credit_risk_train.csv", index=False) credit_test.to_csv(DATA_DIR / "credit_risk_test.csv", index=False) kyc_train.to_csv(DATA_DIR / "kyc_identity_train.csv", index=False) kyc_test.to_csv(DATA_DIR / "kyc_identity_test.csv", index=False) sanctions_df.to_csv(DATA_DIR / "sanctions_pep_list.csv", index=False) summaries = [ DatasetSummary("transaction_fraud_train", len(transaction_train), float(transaction_train["is_fraud"].mean())), DatasetSummary("transaction_fraud_test", len(transaction_test), float(transaction_test["is_fraud"].mean())), DatasetSummary("credit_risk_train", len(credit_train), float(credit_train["is_default"].mean())), DatasetSummary("credit_risk_test", len(credit_test), float(credit_test["is_default"].mean())), DatasetSummary("kyc_identity_test", len(kyc_test), float(kyc_test["is_anomaly"].mean())), ] for summary in summaries: print(f"{summary.name}: rows={summary.rows}, positive_rate={summary.positive_rate:.4f}") print(f"sanctions_pep_list: rows={len(sanctions_df)}") if __name__ == "__main__": main()