Spaces:
Running
Running
| import pandas as pd | |
| import numpy as np | |
| from sklearn.ensemble import IsolationForest | |
| def apply_detection(df): | |
| """ | |
| Apply Two-Layer Detection System. | |
| """ | |
| df = df.copy() | |
| rule_flags_list = [] | |
| # Layer 1 - Rule-Based Flags | |
| for idx, row in df.iterrows(): | |
| flags = [] | |
| amt = row['amount'] | |
| # Structuring | |
| if 9000 <= amt <= 9999: | |
| flags.append("Structuring") | |
| # Rapid Fire Transactions | |
| if row.get('transaction_velocity', 0) > 5: | |
| flags.append("Rapid Fire Transactions") | |
| # Large Cash Out | |
| if row['transaction_type'] == 'CASH_OUT' and amt > 50000: | |
| flags.append("Large Cash Out") | |
| # Dormant Account Spike | |
| if row.get('account_age_days', 365) < 30 and amt > 10000: | |
| flags.append("Dormant Account Spike") | |
| # International High Value | |
| if row.get('is_international', 0) == 1 and amt > 25000: | |
| flags.append("International High Value") | |
| # Suspicious Round Amount | |
| if amt % 10000 == 0 and amt > 0: | |
| flags.append("Suspicious Round Amount") | |
| rule_flags_list.append(flags) | |
| df['rule_flags'] = rule_flags_list | |
| # Layer 2 - Isolation Forest | |
| features = ['amount_log', 'transaction_velocity', 'hour_of_day', 'is_international', 'account_age_days'] | |
| # fillna for safety | |
| X = df[features].fillna(0) | |
| iso_forest = IsolationForest(contamination=0.05, random_state=42) | |
| # The anomaly score of the input samples. The lower, the more abnormal. | |
| # We want a higher score to be more anomalous for consistency, so we invert it or just use predictions. | |
| df['ml_anomaly_score'] = iso_forest.fit_predict(X) | |
| df['ml_anomaly_score_raw'] = iso_forest.score_samples(X) | |
| # IsolationForest returns -1 for outliers and 1 for inliers. | |
| df['ml_anomaly_flag'] = (df['ml_anomaly_score'] == -1).astype(int) | |
| # Combined Risk Score | |
| def calc_risk(row): | |
| rule_count = len(row['rule_flags']) | |
| score = min((rule_count * 20) + (row['ml_anomaly_flag'] * 30), 100) | |
| return score | |
| df['risk_score'] = df.apply(calc_risk, axis=1) | |
| def calc_level(score): | |
| if score <= 30: | |
| return "Low" | |
| elif score <= 60: | |
| return "Medium" | |
| else: | |
| return "High" | |
| df['risk_level'] = df['risk_score'].apply(calc_level) | |
| df['is_flagged'] = (df['risk_level'] != "Low").astype(int) | |
| return df | |