Spaces:
Running
Running
File size: 2,597 Bytes
7d391cb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
def apply_detection(df):
"""
Apply Two-Layer Detection System.
"""
df = df.copy()
rule_flags_list = []
# Layer 1 - Rule-Based Flags
for idx, row in df.iterrows():
flags = []
amt = row['amount']
# Structuring
if 9000 <= amt <= 9999:
flags.append("Structuring")
# Rapid Fire Transactions
if row.get('transaction_velocity', 0) > 5:
flags.append("Rapid Fire Transactions")
# Large Cash Out
if row['transaction_type'] == 'CASH_OUT' and amt > 50000:
flags.append("Large Cash Out")
# Dormant Account Spike
if row.get('account_age_days', 365) < 30 and amt > 10000:
flags.append("Dormant Account Spike")
# International High Value
if row.get('is_international', 0) == 1 and amt > 25000:
flags.append("International High Value")
# Suspicious Round Amount
if amt % 10000 == 0 and amt > 0:
flags.append("Suspicious Round Amount")
rule_flags_list.append(flags)
df['rule_flags'] = rule_flags_list
# Layer 2 - Isolation Forest
features = ['amount_log', 'transaction_velocity', 'hour_of_day', 'is_international', 'account_age_days']
# fillna for safety
X = df[features].fillna(0)
iso_forest = IsolationForest(contamination=0.05, random_state=42)
# The anomaly score of the input samples. The lower, the more abnormal.
# We want a higher score to be more anomalous for consistency, so we invert it or just use predictions.
df['ml_anomaly_score'] = iso_forest.fit_predict(X)
df['ml_anomaly_score_raw'] = iso_forest.score_samples(X)
# IsolationForest returns -1 for outliers and 1 for inliers.
df['ml_anomaly_flag'] = (df['ml_anomaly_score'] == -1).astype(int)
# Combined Risk Score
def calc_risk(row):
rule_count = len(row['rule_flags'])
score = min((rule_count * 20) + (row['ml_anomaly_flag'] * 30), 100)
return score
df['risk_score'] = df.apply(calc_risk, axis=1)
def calc_level(score):
if score <= 30:
return "Low"
elif score <= 60:
return "Medium"
else:
return "High"
df['risk_level'] = df['risk_score'].apply(calc_level)
df['is_flagged'] = (df['risk_level'] != "Low").astype(int)
return df
|