import pandas as pd import numpy as np # Column mapping — maps ANY known variant to internal name COLUMN_MAP = { # Normalize column names first (lowercase + strip) "transaction id": "transaction_id", "tx_id": "transaction_id", "txn_id": "transaction_id", # customer "person involved": "customer_id", "sender_account_id": "customer_id", "cust_id": "customer_id", # amount — handles spaces and special chars "amount (usd)": "amount", "tx_amount": "amount", "txn_amount": "amount", # timestamp "date of transaction": "timestamp", "transaction_date": "timestamp", "tx_date": "timestamp", # transaction type "transaction type": "transaction_type", "tx_type": "transaction_type", "txn_type": "transaction_type", # countries "country": "origin_country", "destination country": "dest_country", # extra AML fields "money laundering risk score": "ml_risk_score", "shell companies involved": "shell_companies", "tax haven country": "tax_haven", "source of money": "source_of_money", "reported by authority": "reported_by_authority", } def load_and_validate(file): """ Accept uploaded Streamlit file object. Map columns to internal schema, parse timestamps, coerce amounts. Return (cleaned_df, "OK") or (None, error_message). """ try: df = pd.read_csv(file) except Exception as e: return None, f"Failed to read CSV: {str(e)}" # Normalize column names: lowercase + strip spaces df.columns = df.columns.str.lower().str.strip() # Apply column mapping df.rename(columns=COLUMN_MAP, inplace=True) required_cols = [ 'transaction_id', 'customer_id', 'amount', 'timestamp', 'transaction_type' ] missing_cols = [c for c in required_cols if c not in df.columns] if missing_cols: return None, f"Missing required columns: {', '.join(missing_cols)}" # Add optional columns if missing if 'origin_country' not in df.columns: df['origin_country'] = 'US' if 'dest_country' not in df.columns: df['dest_country'] = 'US' if 'account_age_days' not in df.columns: df['account_age_days'] = 365 # Fill NA for optional columns df['origin_country'] = df['origin_country'].fillna('US') df['dest_country'] = df['dest_country'].fillna('US') df['account_age_days'] = df['account_age_days'].fillna(365) # Drop nulls for amount or timestamp df = df.dropna(subset=['amount', 'timestamp']) # Coerce data types try: df['amount'] = pd.to_numeric(df['amount'], errors='coerce') df = df.dropna(subset=['amount']) df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') df = df.dropna(subset=['timestamp']) except Exception as e: return None, f"Failed to parse amounts or timestamps: {str(e)}" return df, "OK" def engineer_features(df): """ Add engineered features. """ df = df.copy() # Time based df['hour_of_day'] = df['timestamp'].dt.hour df['day_of_week'] = df['timestamp'].dt.dayofweek # International df['is_international'] = (df['origin_country'] != df['dest_country']).astype(int) # Amount logic df['amount_log'] = np.log1p(df['amount']) df['structuring_flag'] = ((df['amount'] >= 9000) & (df['amount'] <= 9999)).astype(int) df['round_amount_flag'] = ((df['amount'] % 10000 == 0) & (df['amount'] > 0)).astype(int) # Sort for rolling calculations df = df.sort_values(by=['customer_id', 'timestamp']) df.set_index('timestamp', inplace=True) # transaction_velocity: rolling count of transactions per customer per 24-hour window velocity = df.groupby('customer_id').rolling('24H')['transaction_id'].count().reset_index() velocity = velocity.rename(columns={'transaction_id': 'transaction_velocity'}) # Merge back df = df.reset_index() df = pd.merge(df, velocity[['customer_id', 'timestamp', 'transaction_velocity']], on=['customer_id', 'timestamp'], how='left') return df