Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import numpy as np | |
| # Column mapping — maps ANY known variant to internal name | |
| COLUMN_MAP = { | |
| # Normalize column names first (lowercase + strip) | |
| "transaction id": "transaction_id", | |
| "tx_id": "transaction_id", | |
| "txn_id": "transaction_id", | |
| # customer | |
| "person involved": "customer_id", | |
| "sender_account_id": "customer_id", | |
| "cust_id": "customer_id", | |
| # amount — handles spaces and special chars | |
| "amount (usd)": "amount", | |
| "tx_amount": "amount", | |
| "txn_amount": "amount", | |
| # timestamp | |
| "date of transaction": "timestamp", | |
| "transaction_date": "timestamp", | |
| "tx_date": "timestamp", | |
| # transaction type | |
| "transaction type": "transaction_type", | |
| "tx_type": "transaction_type", | |
| "txn_type": "transaction_type", | |
| # countries | |
| "country": "origin_country", | |
| "destination country": "dest_country", | |
| # extra AML fields | |
| "money laundering risk score": "ml_risk_score", | |
| "shell companies involved": "shell_companies", | |
| "tax haven country": "tax_haven", | |
| "source of money": "source_of_money", | |
| "reported by authority": "reported_by_authority", | |
| } | |
| def load_and_validate(file): | |
| """ | |
| Accept uploaded Streamlit file object. | |
| Map columns to internal schema, parse timestamps, coerce amounts. | |
| Return (cleaned_df, "OK") or (None, error_message). | |
| """ | |
| try: | |
| df = pd.read_csv(file) | |
| except Exception as e: | |
| return None, f"Failed to read CSV: {str(e)}" | |
| # Normalize column names: lowercase + strip spaces | |
| df.columns = df.columns.str.lower().str.strip() | |
| # Apply column mapping | |
| df.rename(columns=COLUMN_MAP, inplace=True) | |
| required_cols = [ | |
| 'transaction_id', 'customer_id', 'amount', 'timestamp', 'transaction_type' | |
| ] | |
| missing_cols = [c for c in required_cols if c not in df.columns] | |
| if missing_cols: | |
| return None, f"Missing required columns: {', '.join(missing_cols)}" | |
| # Add optional columns if missing | |
| if 'origin_country' not in df.columns: | |
| df['origin_country'] = 'US' | |
| if 'dest_country' not in df.columns: | |
| df['dest_country'] = 'US' | |
| if 'account_age_days' not in df.columns: | |
| df['account_age_days'] = 365 | |
| # Fill NA for optional columns | |
| df['origin_country'] = df['origin_country'].fillna('US') | |
| df['dest_country'] = df['dest_country'].fillna('US') | |
| df['account_age_days'] = df['account_age_days'].fillna(365) | |
| # Drop nulls for amount or timestamp | |
| df = df.dropna(subset=['amount', 'timestamp']) | |
| # Coerce data types | |
| try: | |
| df['amount'] = pd.to_numeric(df['amount'], errors='coerce') | |
| df = df.dropna(subset=['amount']) | |
| df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') | |
| df = df.dropna(subset=['timestamp']) | |
| except Exception as e: | |
| return None, f"Failed to parse amounts or timestamps: {str(e)}" | |
| return df, "OK" | |
| def engineer_features(df): | |
| """ | |
| Add engineered features. | |
| """ | |
| df = df.copy() | |
| # Time based | |
| df['hour_of_day'] = df['timestamp'].dt.hour | |
| df['day_of_week'] = df['timestamp'].dt.dayofweek | |
| # International | |
| df['is_international'] = (df['origin_country'] != df['dest_country']).astype(int) | |
| # Amount logic | |
| df['amount_log'] = np.log1p(df['amount']) | |
| df['structuring_flag'] = ((df['amount'] >= 9000) & (df['amount'] <= 9999)).astype(int) | |
| df['round_amount_flag'] = ((df['amount'] % 10000 == 0) & (df['amount'] > 0)).astype(int) | |
| # Sort for rolling calculations | |
| df = df.sort_values(by=['customer_id', 'timestamp']) | |
| df.set_index('timestamp', inplace=True) | |
| # transaction_velocity: rolling count of transactions per customer per 24-hour window | |
| velocity = df.groupby('customer_id').rolling('24H')['transaction_id'].count().reset_index() | |
| velocity = velocity.rename(columns={'transaction_id': 'transaction_velocity'}) | |
| # Merge back | |
| df = df.reset_index() | |
| df = pd.merge(df, velocity[['customer_id', 'timestamp', 'transaction_velocity']], on=['customer_id', 'timestamp'], how='left') | |
| return df | |