Spaces:
Paused
Paused
| """ | |
| ds_data_prep.py β Dynamic Segmentation Data Preparation | |
| Schema: | |
| aml_s_customers.csv id β customer_id | |
| aml_s_accounts.csv id β account_id | |
| aml_s_account_relationship.csv account_id, customer_id (many-to-many) | |
| aml_s_transactions.csv subject_id = account_id, timestamp, amount | |
| Output: docs/ds_segmentation_data.csv β one row per customer-account pair | |
| Usage: | |
| python ds_data_prep.py | |
| """ | |
| import os | |
| import pandas as pd | |
| import numpy as np | |
| SS_DIR = "ss_files" | |
| OUTPUT_FILE = "docs/ds_segmentation_data.csv" | |
| REF_DATE = pd.Timestamp.now() | |
| # Synth mode: when ARIA_SYNTH_DATA_DIR is set, read from aria_synth/ and write to separate file | |
| _SYNTH_DIR = os.getenv("ARIA_SYNTH_DATA_DIR", os.path.join(os.path.dirname(os.path.abspath(__file__)), "aria_synth")) | |
| _USE_SYNTH = bool(os.getenv("ARIA_SYNTH_DATA_DIR")) and os.path.isdir(_SYNTH_DIR) | |
| OUTPUT_SYNTH = "docs/ds_segmentation_synth.csv" | |
| def load(filename): | |
| path = os.path.join(SS_DIR, filename) | |
| print(f" Loading {filename} ...", end=" ") | |
| df = pd.read_csv(path) | |
| print(f"{len(df):,} rows, {len(df.columns)} cols") | |
| return df | |
| def _strip_aria(df): | |
| """Strip 'aria_' prefix from all column names.""" | |
| return df.rename(columns={c: c[5:] if c.startswith("aria_") else c for c in df.columns}) | |
| def _load_synth(): | |
| """Load the four aria_synth/ files and normalise them to match the production schema.""" | |
| def _read(fname): | |
| path = os.path.join(_SYNTH_DIR, fname) | |
| print(f" Loading {fname} ...", end=" ") | |
| df = pd.read_csv(path, low_memory=False) | |
| print(f"{len(df):,} rows, {len(df.columns)} cols") | |
| return _strip_aria(df) | |
| cust = _read("aria_customers.csv") | |
| # aria_customer_id β customer_id (after strip already done), rename to match idβcustomer_id path | |
| if "customer_id" in cust.columns and "id" not in cust.columns: | |
| cust = cust.rename(columns={"customer_id": "id"}) | |
| # residency_country β residency_country_id | |
| if "residency_country" in cust.columns and "residency_country_id" not in cust.columns: | |
| cust = cust.rename(columns={"residency_country": "residency_country_id"}) | |
| acct = _read("aria_accounts.csv") | |
| # aria_account_id β account_id (after strip), rename to id for the idβaccount_id rename | |
| if "account_id" in acct.columns and "id" not in acct.columns: | |
| acct = acct.rename(columns={"account_id": "id"}) | |
| rel = _read("aria_account_relationships.csv") | |
| # relationship_type β type (production code reads rel['type']) | |
| if "relationship_type" in rel.columns and "type" not in rel.columns: | |
| rel = rel.rename(columns={"relationship_type": "type"}) | |
| txn = _read("aria_transactions.csv") | |
| # subject_id, timestamp, amount, cash_direction β already correct after strip | |
| return cust, acct, rel, txn | |
| def normalize_account_type(x): | |
| """Normalize raw account type strings to 5 standard categories.""" | |
| if pd.isna(x): | |
| return 'Other' | |
| t = str(x).lower() | |
| if 'checking' in t: | |
| return 'Checking' | |
| if 'certificate' in t or 'cert' in t: | |
| return 'Certificate of Deposit' | |
| if 'saving' in t or 'money market' in t: | |
| return 'Savings' | |
| if any(k in t for k in [ | |
| 'loan', 'mortgage', 'line of credit', 'home equity', | |
| 'auto', 'credit card', 'visa', 'heloc', 'indirect', | |
| 'preapproved', 'personal', 'boat', 'motorcycle', | |
| 'mobile home', 'recreational', 'motorhome', 'prescreen', | |
| 'workout', 'disaster' | |
| ]): | |
| return 'Loan' | |
| return 'Other' | |
| def compute_txn_aggregates(txn): | |
| """Per-account transaction aggregates from transactions file.""" | |
| df = txn.copy() | |
| df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') | |
| df['amount'] = pd.to_numeric(df['amount'], errors='coerce') | |
| # Separate cash-in and cash-out | |
| df['is_cashin'] = (df['cash_direction'] == 'CashIn').astype(int) | |
| df['is_cashout'] = (df['cash_direction'] == 'CashOut').astype(int) | |
| grp = df.groupby('subject_id') | |
| # Date range for avg_trxns_week | |
| date_min = grp['timestamp'].min() | |
| date_max = grp['timestamp'].max() | |
| weeks_active = ((date_max - date_min).dt.days / 7).clip(lower=1) | |
| # Monthly amount: group by (account, year-month), then average across months | |
| df['_ym'] = df['timestamp'].dt.to_period('M') | |
| monthly_totals = df.groupby(['subject_id', '_ym'])['amount'].sum() | |
| avg_monthly = monthly_totals.groupby(level=0).mean().rename('avg_monthly_trxn_amt') | |
| agg = pd.DataFrame({ | |
| 'trxn_count': grp['amount'].count(), | |
| 'total_trxn_amt': grp['amount'].sum(), | |
| 'avg_trxn_amt': grp['amount'].mean(), | |
| 'max_trxn_amt': grp['amount'].max(), | |
| 'min_trxn_amt': grp['amount'].min(), | |
| 'std_trxn_amt': grp['amount'].std().fillna(0), | |
| 'cashin_count': grp['is_cashin'].sum(), | |
| 'cashout_count': grp['is_cashout'].sum(), | |
| 'avg_trxns_week': (grp['amount'].count() / weeks_active).round(2), | |
| 'avg_weekly_trxn_amt': (grp['amount'].sum() / weeks_active).round(2), | |
| 'weeks_active': weeks_active.round(1), | |
| }).reset_index().rename(columns={'subject_id': 'account_id'}) | |
| agg = agg.merge(avg_monthly.reset_index().rename(columns={'subject_id': 'account_id'}), | |
| on='account_id', how='left') | |
| # Cash-in ratio | |
| agg['cashin_ratio'] = (agg['cashin_count'] / agg['trxn_count'].clip(lower=1)).round(3) | |
| return agg | |
| def prepare_data(): | |
| print(f"\n{'='*60}") | |
| mode = "SYNTHETIC" if _USE_SYNTH else "PRODUCTION" | |
| print(f"Dynamic Segmentation Data Prep [{mode}]") | |
| print(f"{'='*60}\n") | |
| out_file = OUTPUT_SYNTH if _USE_SYNTH else OUTPUT_FILE | |
| # ββ 1. Load files ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("--- Loading files ---") | |
| if _USE_SYNTH: | |
| cust, acct, rel, txn = _load_synth() | |
| else: | |
| cust = load("aml_s_customers.csv") | |
| acct = load("aml_s_accounts.csv") | |
| rel = load("aml_s_account_relationship.csv") | |
| txn = load("aml_s_transactions.csv") | |
| # ββ 2. Rename IDs ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| cust = cust.rename(columns={'id': 'customer_id'}) | |
| acct = acct.rename(columns={'id': 'account_id'}) | |
| # ββ 3. Derive customer features ββββββββββββββββββββββββββββββββββ | |
| print("\n--- Deriving customer features ---") | |
| cust['birthdate'] = pd.to_datetime(cust['birthdate'], errors='coerce') | |
| cust['age'] = ((REF_DATE - cust['birthdate']).dt.days / 365.25).round(1) | |
| # Boolean flags β int | |
| bool_cols = ['internal_employee', 'entity', 'is_customer', 'exempt_cdd', | |
| 'exempt_sanctions', 'pep', 'subpeona', 'is_314b', 'negative_news', 'ofac'] | |
| for c in bool_cols: | |
| if c in cust.columns: | |
| cust[c] = cust[c].fillna(False).astype(int) | |
| cust_keep = ['customer_id', 'age', 'gender', 'marital_status', 'occupation', | |
| 'citizenship', 'gross_annual_income', 'nationality_country_id', | |
| 'residency_country_id', 'pep', 'is_314b', 'negative_news', 'ofac', | |
| 'exempt_cdd', 'exempt_sanctions', 'naics', 'sic_code', 'entity'] | |
| cust_keep = [c for c in cust_keep if c in cust.columns] | |
| cust = cust[cust_keep] | |
| print(f" Customer features kept: {cust_keep}") | |
| # ββ 4. Derive account features βββββββββββββββββββββββββββββββββββ | |
| print("\n--- Deriving account features ---") | |
| acct['open_date'] = pd.to_datetime(acct['open_date'], errors='coerce') | |
| acct['acct_age_years'] = ((REF_DATE - acct['open_date']).dt.days / 365.25).round(2) | |
| acct_keep = ['account_id', 'account_type', 'product_name', 'status', | |
| 'initial_balance', 'current_balance', 'acct_age_years', | |
| 'loan_amount'] | |
| acct_keep = [c for c in acct_keep if c in acct.columns] | |
| acct = acct[acct_keep] | |
| print(f" Account features kept: {acct_keep}") | |
| # Normalize account_type to standard categories using keyword matching | |
| acct['account_type'] = acct['account_type'].apply(normalize_account_type) | |
| print(f" Account type distribution: {acct['account_type'].value_counts().to_dict()}") | |
| # ββ 5. Transaction aggregates βββββββββββββββββββββββββββββββββββββ | |
| print("\n--- Computing transaction aggregates ---") | |
| txn_agg = compute_txn_aggregates(txn) | |
| print(f" Aggregates computed for {len(txn_agg):,} accounts") | |
| print(f" Cols: {[c for c in txn_agg.columns if c != 'account_id']}") | |
| # ββ 6. Join everything βββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n--- Joining ---") | |
| # relationship: keep HOLDER or all types | |
| print(f" Relationship types: {rel['type'].unique()}") | |
| # Use all relationship types (or filter to HOLDER if needed) | |
| mapping = rel[['account_id', 'customer_id', 'type']].drop_duplicates() | |
| df = mapping.merge(cust, on='customer_id', how='left') | |
| df = df.merge(acct, on='account_id', how='left') | |
| df = df.merge(txn_agg, on='account_id', how='left') | |
| print(f" Final: {len(df):,} customer-account rows, {len(df.columns)} columns") | |
| # ββ 7. Fill missing transaction data (accounts with no transactions) β | |
| txn_fill_cols = ['trxn_count', 'total_trxn_amt', 'avg_trxn_amt', 'avg_trxns_week', | |
| 'avg_monthly_trxn_amt', 'cashin_count', 'cashout_count', 'cashin_ratio'] | |
| for c in txn_fill_cols: | |
| if c in df.columns: | |
| df[c] = df[c].fillna(0) | |
| # ββ 8. Normalize column names to match perform_clustering schema ββ | |
| print("\n--- Normalizing column names ---") | |
| df = df.rename(columns={ | |
| 'avg_trxns_week': 'avg_num_trxns', | |
| 'avg_monthly_trxn_amt': 'trxn_amt_monthly', | |
| 'gross_annual_income': 'INCOME', | |
| 'current_balance': 'CURRENT_BALANCE', | |
| 'acct_age_years': 'ACCT_AGE_YEARS', | |
| 'age': 'AGE', | |
| 'account_type': 'ACCOUNT_TYPE', | |
| 'gender': 'GENDER', | |
| 'citizenship': 'CITIZENSHIP', | |
| 'residency_country_id': 'RESIDENCY_COUNTRY', | |
| 'is_314b': '314b', | |
| 'ofac': 'OFAC', | |
| }) | |
| # Derive customer_type and dynamic_segment from entity flag | |
| # entity=1 (True) β Business, entity=0 (False) β Individual | |
| df['customer_type'] = df['entity'].map({1: 'BUSINESS', 0: 'INDIVIDUAL'}).fillna('INDIVIDUAL') | |
| df['dynamic_segment'] = df['entity'].map({1: 0, 0: 1}).fillna(1).astype(int) | |
| print(f" customer_type distribution: {df['customer_type'].value_counts().to_dict()}") | |
| # Age category bins | |
| df['AGE_CATEGORY'] = pd.cut( | |
| df['AGE'], | |
| bins=[0, 30, 45, 60, 120], | |
| labels=['18-30', '31-45', '46-60', '60+'], | |
| right=True, | |
| ).astype(str).replace('nan', 'Unknown') | |
| # Income bands (mostly null in this dataset β filled as Unknown) | |
| df['INCOME_BAND'] = pd.cut( | |
| df['INCOME'], | |
| bins=[0, 50000, 100000, 200000, float('inf')], | |
| labels=['<50K', '50K-100K', '100K-200K', '200K+'], | |
| right=True, | |
| ).astype(str).replace('nan', 'Unknown') | |
| # Fill null GENDER for tree groupby | |
| df['GENDER'] = df['GENDER'].fillna('Unknown') | |
| # Account age category for business segmentation | |
| df['ACCOUNT_AGE_CATEGORY'] = pd.cut( | |
| df['ACCT_AGE_YEARS'], | |
| bins=[-0.01, 1.0, float('inf')], | |
| labels=['New (0-1yr)', 'Established (>1yr)'], | |
| right=True, | |
| ).astype(str).replace('nan', 'Unknown') | |
| # ββ 9. Summary βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\n--- Output Summary ---") | |
| print(f" Rows: {len(df):,}") | |
| print(f" Columns: {len(df.columns)}") | |
| num_cols = df.select_dtypes(include='number').columns.tolist() | |
| cat_cols = df.select_dtypes(include='object').columns.tolist() | |
| print(f" Numeric: {num_cols}") | |
| print(f" Categorical: {cat_cols}") | |
| nulls = df.isnull().sum() | |
| nulls = nulls[nulls > 0] | |
| if len(nulls): | |
| print(f" Nulls:\n{nulls.to_string()}") | |
| # ββ 10. Save βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| os.makedirs(os.path.dirname(out_file), exist_ok=True) | |
| df.to_csv(out_file, index=False) | |
| print(f"\n Saved -> {out_file}") | |
| print(f"{'='*60}\n") | |
| return df | |
| if __name__ == "__main__": | |
| prepare_data() | |