agentic-aml-demo / ds_data_prep.py
shawn420's picture
Sync app to latest: scatter offcanvas, stop fix, doc upload KB, DS_CSV rename, aria-v2
04d8646
"""
ds_data_prep.py β€” Dynamic Segmentation Data Preparation
Schema:
aml_s_customers.csv id β†’ customer_id
aml_s_accounts.csv id β†’ account_id
aml_s_account_relationship.csv account_id, customer_id (many-to-many)
aml_s_transactions.csv subject_id = account_id, timestamp, amount
Output: docs/ds_segmentation_data.csv β€” one row per customer-account pair
Usage:
python ds_data_prep.py
"""
import os
import pandas as pd
import numpy as np
SS_DIR = "ss_files"
OUTPUT_FILE = "docs/ds_segmentation_data.csv"
REF_DATE = pd.Timestamp.now()
# Synth mode: when ARIA_SYNTH_DATA_DIR is set, read from aria_synth/ and write to separate file
_SYNTH_DIR = os.getenv("ARIA_SYNTH_DATA_DIR", os.path.join(os.path.dirname(os.path.abspath(__file__)), "aria_synth"))
_USE_SYNTH = bool(os.getenv("ARIA_SYNTH_DATA_DIR")) and os.path.isdir(_SYNTH_DIR)
OUTPUT_SYNTH = "docs/ds_segmentation_synth.csv"
def load(filename):
path = os.path.join(SS_DIR, filename)
print(f" Loading {filename} ...", end=" ")
df = pd.read_csv(path)
print(f"{len(df):,} rows, {len(df.columns)} cols")
return df
def _strip_aria(df):
"""Strip 'aria_' prefix from all column names."""
return df.rename(columns={c: c[5:] if c.startswith("aria_") else c for c in df.columns})
def _load_synth():
"""Load the four aria_synth/ files and normalise them to match the production schema."""
def _read(fname):
path = os.path.join(_SYNTH_DIR, fname)
print(f" Loading {fname} ...", end=" ")
df = pd.read_csv(path, low_memory=False)
print(f"{len(df):,} rows, {len(df.columns)} cols")
return _strip_aria(df)
cust = _read("aria_customers.csv")
# aria_customer_id → customer_id (after strip already done), rename to match id→customer_id path
if "customer_id" in cust.columns and "id" not in cust.columns:
cust = cust.rename(columns={"customer_id": "id"})
# residency_country β†’ residency_country_id
if "residency_country" in cust.columns and "residency_country_id" not in cust.columns:
cust = cust.rename(columns={"residency_country": "residency_country_id"})
acct = _read("aria_accounts.csv")
# aria_account_id → account_id (after strip), rename to id for the id→account_id rename
if "account_id" in acct.columns and "id" not in acct.columns:
acct = acct.rename(columns={"account_id": "id"})
rel = _read("aria_account_relationships.csv")
# relationship_type β†’ type (production code reads rel['type'])
if "relationship_type" in rel.columns and "type" not in rel.columns:
rel = rel.rename(columns={"relationship_type": "type"})
txn = _read("aria_transactions.csv")
# subject_id, timestamp, amount, cash_direction β€” already correct after strip
return cust, acct, rel, txn
def normalize_account_type(x):
"""Normalize raw account type strings to 5 standard categories."""
if pd.isna(x):
return 'Other'
t = str(x).lower()
if 'checking' in t:
return 'Checking'
if 'certificate' in t or 'cert' in t:
return 'Certificate of Deposit'
if 'saving' in t or 'money market' in t:
return 'Savings'
if any(k in t for k in [
'loan', 'mortgage', 'line of credit', 'home equity',
'auto', 'credit card', 'visa', 'heloc', 'indirect',
'preapproved', 'personal', 'boat', 'motorcycle',
'mobile home', 'recreational', 'motorhome', 'prescreen',
'workout', 'disaster'
]):
return 'Loan'
return 'Other'
def compute_txn_aggregates(txn):
"""Per-account transaction aggregates from transactions file."""
df = txn.copy()
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
# Separate cash-in and cash-out
df['is_cashin'] = (df['cash_direction'] == 'CashIn').astype(int)
df['is_cashout'] = (df['cash_direction'] == 'CashOut').astype(int)
grp = df.groupby('subject_id')
# Date range for avg_trxns_week
date_min = grp['timestamp'].min()
date_max = grp['timestamp'].max()
weeks_active = ((date_max - date_min).dt.days / 7).clip(lower=1)
# Monthly amount: group by (account, year-month), then average across months
df['_ym'] = df['timestamp'].dt.to_period('M')
monthly_totals = df.groupby(['subject_id', '_ym'])['amount'].sum()
avg_monthly = monthly_totals.groupby(level=0).mean().rename('avg_monthly_trxn_amt')
agg = pd.DataFrame({
'trxn_count': grp['amount'].count(),
'total_trxn_amt': grp['amount'].sum(),
'avg_trxn_amt': grp['amount'].mean(),
'max_trxn_amt': grp['amount'].max(),
'min_trxn_amt': grp['amount'].min(),
'std_trxn_amt': grp['amount'].std().fillna(0),
'cashin_count': grp['is_cashin'].sum(),
'cashout_count': grp['is_cashout'].sum(),
'avg_trxns_week': (grp['amount'].count() / weeks_active).round(2),
'avg_weekly_trxn_amt': (grp['amount'].sum() / weeks_active).round(2),
'weeks_active': weeks_active.round(1),
}).reset_index().rename(columns={'subject_id': 'account_id'})
agg = agg.merge(avg_monthly.reset_index().rename(columns={'subject_id': 'account_id'}),
on='account_id', how='left')
# Cash-in ratio
agg['cashin_ratio'] = (agg['cashin_count'] / agg['trxn_count'].clip(lower=1)).round(3)
return agg
def prepare_data():
print(f"\n{'='*60}")
mode = "SYNTHETIC" if _USE_SYNTH else "PRODUCTION"
print(f"Dynamic Segmentation Data Prep [{mode}]")
print(f"{'='*60}\n")
out_file = OUTPUT_SYNTH if _USE_SYNTH else OUTPUT_FILE
# ── 1. Load files ────────────────────────────────────────────────
print("--- Loading files ---")
if _USE_SYNTH:
cust, acct, rel, txn = _load_synth()
else:
cust = load("aml_s_customers.csv")
acct = load("aml_s_accounts.csv")
rel = load("aml_s_account_relationship.csv")
txn = load("aml_s_transactions.csv")
# ── 2. Rename IDs ────────────────────────────────────────────────
cust = cust.rename(columns={'id': 'customer_id'})
acct = acct.rename(columns={'id': 'account_id'})
# ── 3. Derive customer features ──────────────────────────────────
print("\n--- Deriving customer features ---")
cust['birthdate'] = pd.to_datetime(cust['birthdate'], errors='coerce')
cust['age'] = ((REF_DATE - cust['birthdate']).dt.days / 365.25).round(1)
# Boolean flags β†’ int
bool_cols = ['internal_employee', 'entity', 'is_customer', 'exempt_cdd',
'exempt_sanctions', 'pep', 'subpeona', 'is_314b', 'negative_news', 'ofac']
for c in bool_cols:
if c in cust.columns:
cust[c] = cust[c].fillna(False).astype(int)
cust_keep = ['customer_id', 'age', 'gender', 'marital_status', 'occupation',
'citizenship', 'gross_annual_income', 'nationality_country_id',
'residency_country_id', 'pep', 'is_314b', 'negative_news', 'ofac',
'exempt_cdd', 'exempt_sanctions', 'naics', 'sic_code', 'entity']
cust_keep = [c for c in cust_keep if c in cust.columns]
cust = cust[cust_keep]
print(f" Customer features kept: {cust_keep}")
# ── 4. Derive account features ───────────────────────────────────
print("\n--- Deriving account features ---")
acct['open_date'] = pd.to_datetime(acct['open_date'], errors='coerce')
acct['acct_age_years'] = ((REF_DATE - acct['open_date']).dt.days / 365.25).round(2)
acct_keep = ['account_id', 'account_type', 'product_name', 'status',
'initial_balance', 'current_balance', 'acct_age_years',
'loan_amount']
acct_keep = [c for c in acct_keep if c in acct.columns]
acct = acct[acct_keep]
print(f" Account features kept: {acct_keep}")
# Normalize account_type to standard categories using keyword matching
acct['account_type'] = acct['account_type'].apply(normalize_account_type)
print(f" Account type distribution: {acct['account_type'].value_counts().to_dict()}")
# ── 5. Transaction aggregates ─────────────────────────────────────
print("\n--- Computing transaction aggregates ---")
txn_agg = compute_txn_aggregates(txn)
print(f" Aggregates computed for {len(txn_agg):,} accounts")
print(f" Cols: {[c for c in txn_agg.columns if c != 'account_id']}")
# ── 6. Join everything ───────────────────────────────────────────
print("\n--- Joining ---")
# relationship: keep HOLDER or all types
print(f" Relationship types: {rel['type'].unique()}")
# Use all relationship types (or filter to HOLDER if needed)
mapping = rel[['account_id', 'customer_id', 'type']].drop_duplicates()
df = mapping.merge(cust, on='customer_id', how='left')
df = df.merge(acct, on='account_id', how='left')
df = df.merge(txn_agg, on='account_id', how='left')
print(f" Final: {len(df):,} customer-account rows, {len(df.columns)} columns")
# ── 7. Fill missing transaction data (accounts with no transactions) ─
txn_fill_cols = ['trxn_count', 'total_trxn_amt', 'avg_trxn_amt', 'avg_trxns_week',
'avg_monthly_trxn_amt', 'cashin_count', 'cashout_count', 'cashin_ratio']
for c in txn_fill_cols:
if c in df.columns:
df[c] = df[c].fillna(0)
# ── 8. Normalize column names to match perform_clustering schema ──
print("\n--- Normalizing column names ---")
df = df.rename(columns={
'avg_trxns_week': 'avg_num_trxns',
'avg_monthly_trxn_amt': 'trxn_amt_monthly',
'gross_annual_income': 'INCOME',
'current_balance': 'CURRENT_BALANCE',
'acct_age_years': 'ACCT_AGE_YEARS',
'age': 'AGE',
'account_type': 'ACCOUNT_TYPE',
'gender': 'GENDER',
'citizenship': 'CITIZENSHIP',
'residency_country_id': 'RESIDENCY_COUNTRY',
'is_314b': '314b',
'ofac': 'OFAC',
})
# Derive customer_type and dynamic_segment from entity flag
# entity=1 (True) β†’ Business, entity=0 (False) β†’ Individual
df['customer_type'] = df['entity'].map({1: 'BUSINESS', 0: 'INDIVIDUAL'}).fillna('INDIVIDUAL')
df['dynamic_segment'] = df['entity'].map({1: 0, 0: 1}).fillna(1).astype(int)
print(f" customer_type distribution: {df['customer_type'].value_counts().to_dict()}")
# Age category bins
df['AGE_CATEGORY'] = pd.cut(
df['AGE'],
bins=[0, 30, 45, 60, 120],
labels=['18-30', '31-45', '46-60', '60+'],
right=True,
).astype(str).replace('nan', 'Unknown')
# Income bands (mostly null in this dataset β€” filled as Unknown)
df['INCOME_BAND'] = pd.cut(
df['INCOME'],
bins=[0, 50000, 100000, 200000, float('inf')],
labels=['<50K', '50K-100K', '100K-200K', '200K+'],
right=True,
).astype(str).replace('nan', 'Unknown')
# Fill null GENDER for tree groupby
df['GENDER'] = df['GENDER'].fillna('Unknown')
# Account age category for business segmentation
df['ACCOUNT_AGE_CATEGORY'] = pd.cut(
df['ACCT_AGE_YEARS'],
bins=[-0.01, 1.0, float('inf')],
labels=['New (0-1yr)', 'Established (>1yr)'],
right=True,
).astype(str).replace('nan', 'Unknown')
# ── 9. Summary ───────────────────────────────────────────────────
print("\n--- Output Summary ---")
print(f" Rows: {len(df):,}")
print(f" Columns: {len(df.columns)}")
num_cols = df.select_dtypes(include='number').columns.tolist()
cat_cols = df.select_dtypes(include='object').columns.tolist()
print(f" Numeric: {num_cols}")
print(f" Categorical: {cat_cols}")
nulls = df.isnull().sum()
nulls = nulls[nulls > 0]
if len(nulls):
print(f" Nulls:\n{nulls.to_string()}")
# ── 10. Save ─────────────────────────────────────────────────────
os.makedirs(os.path.dirname(out_file), exist_ok=True)
df.to_csv(out_file, index=False)
print(f"\n Saved -> {out_file}")
print(f"{'='*60}\n")
return df
if __name__ == "__main__":
prepare_data()