AML_Shield / modules /etl.py
AJAY KASU
Feature: Expanded column mapping with NLP variants and extra AML fields
7b4b1dd
import pandas as pd
import numpy as np
# Column mapping — maps ANY known variant to internal name
COLUMN_MAP = {
# Normalize column names first (lowercase + strip)
"transaction id": "transaction_id",
"tx_id": "transaction_id",
"txn_id": "transaction_id",
# customer
"person involved": "customer_id",
"sender_account_id": "customer_id",
"cust_id": "customer_id",
# amount — handles spaces and special chars
"amount (usd)": "amount",
"tx_amount": "amount",
"txn_amount": "amount",
# timestamp
"date of transaction": "timestamp",
"transaction_date": "timestamp",
"tx_date": "timestamp",
# transaction type
"transaction type": "transaction_type",
"tx_type": "transaction_type",
"txn_type": "transaction_type",
# countries
"country": "origin_country",
"destination country": "dest_country",
# extra AML fields
"money laundering risk score": "ml_risk_score",
"shell companies involved": "shell_companies",
"tax haven country": "tax_haven",
"source of money": "source_of_money",
"reported by authority": "reported_by_authority",
}
def load_and_validate(file):
"""
Accept uploaded Streamlit file object.
Map columns to internal schema, parse timestamps, coerce amounts.
Return (cleaned_df, "OK") or (None, error_message).
"""
try:
df = pd.read_csv(file)
except Exception as e:
return None, f"Failed to read CSV: {str(e)}"
# Normalize column names: lowercase + strip spaces
df.columns = df.columns.str.lower().str.strip()
# Apply column mapping
df.rename(columns=COLUMN_MAP, inplace=True)
required_cols = [
'transaction_id', 'customer_id', 'amount', 'timestamp', 'transaction_type'
]
missing_cols = [c for c in required_cols if c not in df.columns]
if missing_cols:
return None, f"Missing required columns: {', '.join(missing_cols)}"
# Add optional columns if missing
if 'origin_country' not in df.columns:
df['origin_country'] = 'US'
if 'dest_country' not in df.columns:
df['dest_country'] = 'US'
if 'account_age_days' not in df.columns:
df['account_age_days'] = 365
# Fill NA for optional columns
df['origin_country'] = df['origin_country'].fillna('US')
df['dest_country'] = df['dest_country'].fillna('US')
df['account_age_days'] = df['account_age_days'].fillna(365)
# Drop nulls for amount or timestamp
df = df.dropna(subset=['amount', 'timestamp'])
# Coerce data types
try:
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
df = df.dropna(subset=['amount'])
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
df = df.dropna(subset=['timestamp'])
except Exception as e:
return None, f"Failed to parse amounts or timestamps: {str(e)}"
return df, "OK"
def engineer_features(df):
"""
Add engineered features.
"""
df = df.copy()
# Time based
df['hour_of_day'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
# International
df['is_international'] = (df['origin_country'] != df['dest_country']).astype(int)
# Amount logic
df['amount_log'] = np.log1p(df['amount'])
df['structuring_flag'] = ((df['amount'] >= 9000) & (df['amount'] <= 9999)).astype(int)
df['round_amount_flag'] = ((df['amount'] % 10000 == 0) & (df['amount'] > 0)).astype(int)
# Sort for rolling calculations
df = df.sort_values(by=['customer_id', 'timestamp'])
df.set_index('timestamp', inplace=True)
# transaction_velocity: rolling count of transactions per customer per 24-hour window
velocity = df.groupby('customer_id').rolling('24H')['transaction_id'].count().reset_index()
velocity = velocity.rename(columns={'transaction_id': 'transaction_velocity'})
# Merge back
df = df.reset_index()
df = pd.merge(df, velocity[['customer_id', 'timestamp', 'transaction_velocity']], on=['customer_id', 'timestamp'], how='left')
return df