agentic-aml-demo / lambda_cluster_threshold.py
shawn420's picture
Sync aria-v6 full-app deployment: agents, app, startup rewrite
a610f79
"""
lambda_cluster_threshold.py β€” Per-cluster adaptive threshold analysis.
Joins K-Means behavioral clusters with SAR labels to compute cluster-specific
threshold recommendations. Demonstrates how behavioral segmentation adapts alert
sensitivity per customer group rather than applying a single uniform threshold.
Option 2 hook: pass target_sar_rate as a parameter (default 0.90).
Option 3 hook: persist results externally and call again to compare across runs.
"""
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from lambda_ds_performance import perform_clustering
# Map tool parameter names β†’ SAR CSV column names (mirrors SAR_COL_MAP in application.py)
_SAR_COL = {
"AVG_TRXNS_WEEK": "avg_num_trxns",
"AVG_TRXN_AMT": "avg_weekly_trxn_amt",
"TRXN_AMT_MONTHLY": "trxn_amt_monthly",
}
_COL_LABEL = {
"AVG_TRXNS_WEEK": "Avg Weekly Transactions",
"AVG_TRXN_AMT": "Avg Weekly Txn Amount",
"TRXN_AMT_MONTHLY": "Monthly Txn Volume",
}
_IS_DOLLAR = {"AVG_TRXN_AMT", "TRXN_AMT_MONTHLY"}
_RISK_LABELS = ["High Volume", "Mid-High", "Mid-Low", "Low Volume",
"Group 5", "Group 6"] # extra labels if n_clusters > 4
def _fmt(val, col):
return f"${val:,.0f}" if col in _IS_DOLLAR else f"{val:,.2f}"
def _sweep(df_cl, sar_col, target):
"""
Sweep threshold values for a cluster. Returns (recommended_row | None, rows).
Recommended = highest threshold where tp_rate >= target (most FP reduction possible
while still meeting the SAR catch target).
"""
sar_vals = df_cl.loc[df_cl["is_sar"] == 1, sar_col].dropna()
non_vals = df_cl.loc[df_cl["is_sar"] == 0, sar_col].dropna()
total_sar = len(sar_vals)
total_non = len(non_vals)
if total_sar == 0:
return None, []
all_vals = df_cl[sar_col].dropna()
pcts = [0, 10, 25, 40, 55, 70, 82, 91, 96]
thresholds = sorted({round(float(all_vals.quantile(p / 100)), 2) for p in pcts})
rows = []
for t in thresholds:
tp = int((sar_vals >= t).sum())
fp = int((non_vals >= t).sum())
tp_rate = tp / total_sar
precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
rows.append({"threshold": t, "tp": tp, "fp": fp,
"fn": total_sar - tp, "tn": total_non - fp,
"tp_rate": tp_rate, "precision": precision})
qualifying = [r for r in rows if r["tp_rate"] >= target]
recommended = qualifying[-1] if qualifying else rows[0]
return recommended, rows
def cluster_threshold_analysis(df_ss, df_sar, segment, threshold_column,
n_clusters=4, target_sar_rate=0.90):
"""
Run K-Means on df_ss, join with SAR labels in df_sar, sweep threshold per cluster.
Returns
-------
text : str pre-computed block for the model to copy verbatim
fig : go.Figure grouped bar chart comparing uniform vs adaptive FP counts
"""
sar_col = _SAR_COL.get(threshold_column)
if sar_col is None:
return f"Unknown threshold_column '{threshold_column}'.", None
if sar_col not in df_sar.columns:
return f"Column '{sar_col}' not found in SAR data.", None
# ── K-Means ───────────────────────────────────────────────────────────────
n_clusters = max(2, min(6, int(n_clusters)))
_, _, df_clustered = perform_clustering(df_ss, segment, n_clusters)
# df_clustered: customer_id, cluster (0-based KMeans labels), feature cols
# ── Filter SAR data by segment ────────────────────────────────────────────
seg_val = 0 if segment.lower() == "business" else 1
df_seg_sar = df_sar[df_sar["dynamic_segment"] == seg_val][
["customer_id", "is_sar", sar_col]
].dropna(subset=[sar_col]).copy()
# ── Join ──────────────────────────────────────────────────────────────────
df_joined = df_clustered[["customer_id", "cluster"]].merge(
df_seg_sar, on="customer_id", how="inner"
)
if df_joined.empty:
return f"No matching customers found for {segment} / {threshold_column}.", None
total_sar = int(df_joined["is_sar"].sum())
if total_sar == 0:
return f"No SAR-labeled customers in {segment} data for this column.", None
# ── Uniform threshold (whole segment at target_sar_rate) ─────────────────
_, uniform_rows = _sweep(df_joined, sar_col, target_sar_rate)
qualifying = [r for r in uniform_rows if r["tp_rate"] >= target_sar_rate]
uni_row = qualifying[-1] if qualifying else uniform_rows[0]
uni_t = uni_row["threshold"]
uni_tp = int((df_joined.loc[df_joined["is_sar"] == 1, sar_col] >= uni_t).sum())
uni_fp = int((df_joined.loc[df_joined["is_sar"] == 0, sar_col] >= uni_t).sum())
# ── Per-cluster sweep ─────────────────────────────────────────────────────
# Order clusters by median threshold column value descending β†’ Cluster 1 = highest activity
medians = df_joined.groupby("cluster")[sar_col].median().sort_values(ascending=False)
cluster_order = list(medians.index)
cluster_results = []
for rank, km_label in enumerate(cluster_order):
df_cl = df_joined[df_joined["cluster"] == km_label]
n_tot = len(df_cl)
n_sar = int(df_cl["is_sar"].sum())
rec, _ = _sweep(df_cl, sar_col, target_sar_rate)
# Baseline counts at the uniform threshold
base_tp = int((df_cl.loc[df_cl["is_sar"] == 1, sar_col] >= uni_t).sum())
base_fp = int((df_cl.loc[df_cl["is_sar"] == 0, sar_col] >= uni_t).sum())
cluster_results.append({
"rank": rank + 1,
"label": _RISK_LABELS[rank] if rank < len(_RISK_LABELS) else f"Group {rank+1}",
"n": n_tot,
"n_sar": n_sar,
"rec": rec,
"base_tp": base_tp,
"base_fp": base_fp,
})
# ── Aggregate ─────────────────────────────────────────────────────────────
adapt_tp = sum(r["rec"]["tp"] for r in cluster_results if r["rec"])
adapt_fp = sum(r["rec"]["fp"] for r in cluster_results if r["rec"])
fp_delta = adapt_fp - uni_fp # negative = fewer FPs (improvement)
sar_delta = adapt_tp - uni_tp # negative = fewer SARs caught
# ── Pre-computed text ─────────────────────────────────────────────────────
target_pct = int(target_sar_rate * 100)
lines = [
"=== PRE-COMPUTED CLUSTER THRESHOLD ANALYSIS (copy verbatim, do not alter numbers) ===",
f"Segment: {segment} | Column: {threshold_column} ({_COL_LABEL[threshold_column]}) "
f"| Clusters: {n_clusters} | Target SAR catch: β‰₯{target_pct}%",
"",
]
for r in cluster_results:
rec = r["rec"]
if rec is None:
lines.append(f"Cluster {r['rank']} β€” {r['label']} ({r['n']:,} customers): no SAR data")
lines.append("")
continue
fp_chg = rec["fp"] - r["base_fp"] # negative = fewer FPs (improvement)
tp_chg = rec["tp"] - r["base_tp"] # negative = fewer SARs caught
base_prec = (r["base_tp"] / (r["base_tp"] + r["base_fp"]) * 100
if r["base_tp"] + r["base_fp"] > 0 else 0)
base_tpr = r["base_tp"] / r["n_sar"] * 100 if r["n_sar"] > 0 else 0
lines += [
f"Cluster {r['rank']} β€” {r['label']} ({r['n']:,} customers, SAR pool: {r['n_sar']:,})",
f" Uniform {_fmt(uni_t, threshold_column)}: "
f"TP={r['base_tp']:,}, FP={r['base_fp']:,}, "
f"TP rate={base_tpr:.1f}%, precision={base_prec:.1f}%",
f" Recommended {_fmt(rec['threshold'], threshold_column)}: "
f"TP={rec['tp']:,}, FP={rec['fp']:,}, "
f"TP rate={rec['tp_rate']*100:.1f}%, precision={rec['precision']*100:.1f}%"
f" ({fp_chg:+,} FP, {tp_chg:+,} SAR)",
"",
]
lines += [
"ADAPTIVE SENSITIVITY SUMMARY",
f" Uniform threshold {_fmt(uni_t, threshold_column)} applied to all {segment}:",
f" TP={uni_tp:,}, FP={uni_fp:,}, TP rate={uni_tp/total_sar*100:.1f}%",
f" Cluster-adaptive thresholds:",
f" TP={adapt_tp:,}, FP={adapt_fp:,}, TP rate={adapt_tp/total_sar*100:.1f}%",
f" Net change: {fp_delta:+,} FP, {sar_delta:+,} SARs "
f"({adapt_tp/total_sar*100:.1f}% SAR retention)",
"=== END CLUSTER THRESHOLD ANALYSIS ===",
]
text = "\n".join(lines)
# ── Bar chart: uniform vs adaptive FP per cluster ─────────────────────────
c_names = [f"C{r['rank']}: {r['label']}" for r in cluster_results]
uni_fps = [r["base_fp"] for r in cluster_results]
adapt_fps = [r["rec"]["fp"] if r["rec"] else r["base_fp"] for r in cluster_results]
fig = go.Figure(data=[
go.Bar(name=f"Uniform {_fmt(uni_t, threshold_column)}",
x=c_names, y=uni_fps, marker_color="#EF553B"),
go.Bar(name="Cluster-Adaptive", x=c_names, y=adapt_fps,
marker_color="#636EFA"),
])
fig.update_layout(
barmode="group",
title=(f"False Positives: Uniform vs Adaptive β€” {segment} / "
f"{_COL_LABEL[threshold_column]}"),
xaxis_title="Behavioral Cluster",
yaxis_title="False Positives",
legend=dict(x=0.01, y=0.99),
)
return text, fig