""" lambda_rule_analysis.py — Rule-level FP/FN sweep using actual condition parameters Depends on docs/rule_sweep_data.csv produced by prepare_rule_sweep_data.py. Each rule has condition parameters that were set per-customer from their transaction history. The sweep varies those parameters (e.g. the $50K floor for Activity Deviation or the 5-std dev multiplier) and shows how SAR catch rate and FP count change. Public API (called from application.py): load_rule_sweep_data() -> df (call once at startup) list_rules_text(df) -> pre-computed text for the model compute_rule_sar_sweep(df, ...) -> pre-computed text for the model """ import json import math import os import pandas as pd import numpy as np _HERE = os.path.dirname(os.path.abspath(__file__)) _THRESHOLDS = os.path.join(_HERE, "rule_thresholds.json") _CATALOGUE = os.path.join(_HERE, "config", "rule_catalogue.json") from config import RULE_SWEEP_CSV as _CSV MAX_SWEEP_ROWS = 12 # ── Rule catalogue ───────────────────────────────────────────────────────────── # Loaded from config/rule_catalogue.json — add new rules there, no code changes needed. # Maps lower-cased keyword -> canonical risk_factor name + sweepable params def _load_catalogue() -> dict: with open(_CATALOGUE, "r", encoding="utf-8") as f: raw = json.load(f) # Convert default_2d lists to tuples (preserves original type used in code) for entry in raw.values(): if isinstance(entry.get("default_2d"), list): entry["default_2d"] = tuple(entry["default_2d"]) return raw RULE_CATALOGUE = _load_catalogue() # ── Legacy inline catalogue (kept for reference, no longer used) ─────────────── _LEGACY_CATALOGUE = { "activity deviation (ach)": { "name": "Activity Deviation (ACH)", "current": "Monthly Outgoing ACH >= $50K AND >= 5 std dev above 12-month profile mean", "sweep_params": { "floor_amount": { "col": "trigger_amt", "label": "monthly ACH floor ($)", "current": 50_000, "direction": "gte", "sweep_min": 10_000, "desc": "Minimum monthly Outgoing ACH sum to trigger (currently $50K)", }, "z_threshold": { "col": "z_score", "label": "std dev multiplier", "current": 5.0, "direction": "gte", "sweep_min": 0.0, "integer_axis": True, "desc": "Std-dev multiplier above 12-month ACH profile mean (currently 5)", }, }, "default_sweep": "floor_amount", "default_2d": ("floor_amount", "z_threshold"), }, "activity deviation (check)": { "name": "Activity Deviation (Check)", "current": "Monthly Outgoing Check >= $50K AND >= 2 std dev above 12-month profile mean", "sweep_params": { "floor_amount": { "col": "trigger_amt", "label": "monthly Check floor ($)", "current": 50_000, "direction": "gte", "sweep_min": 10_000, "desc": "Minimum monthly Outgoing Check sum to trigger (currently $50K)", }, "z_threshold": { "col": "z_score", "label": "std dev multiplier", "current": 2.0, "direction": "gte", "sweep_min": 0.0, "integer_axis": True, "desc": "Std-dev multiplier above 12-month Check profile mean (currently 2)", }, }, "default_sweep": "floor_amount", "default_2d": ("floor_amount", "z_threshold"), }, "elder abuse": { "name": "Elder Abuse", "current": "Age >= 60 AND 14-day outgoing >= $5K AND >= 3 std dev above 90-day mean", "sweep_params": { "floor_amount": { "col": "trigger_amt", "label": "14-day outgoing floor ($)", "current": 5_000, "direction": "gte", "sweep_min": 500, "desc": "Minimum 14-day aggregated outgoing to trigger (currently $5K)", }, "z_threshold": { "col": "z_score", "label": "std dev multiplier", "current": 3.0, "direction": "gte", "sweep_min": 0.0, "integer_axis": True, "desc": "Std-dev multiplier above 90-day mean (currently 3)", }, "age_threshold": { "col": "age", "label": "minimum age (years)", "current": 60, "direction": "gte", "sweep_min": 50, "integer_axis": True, "desc": "Minimum customer age to trigger (currently 60)", }, }, "default_sweep": "z_threshold", "default_2d": ("floor_amount", "age_threshold"), }, "velocity single": { "name": "Velocity Single", "current": ">=1 pair (in+out) within 14 days, out=90-110% of in, pair total >= $20K", "sweep_params": { "pair_total": { "col": "pair_total", "label": "pair total ($)", "current": 20_000, "direction": "gte", "sweep_min": 5_000, # sweep_max derived from data p95 — no catalogue cap "desc": "Minimum combined in+out pair total to trigger (currently $20K)", }, "ratio_tolerance": { "col": "ratio", "label": "ratio tolerance", "current": 0.10, "direction": "abs_lte", "format_pct": True, "values": [0.0, 0.025, 0.05, 0.075, 0.10, 0.125, 0.15, 0.175, 0.20], "desc": "Max deviation of out/in ratio from 1.0 to trigger (currently 10% = 90-110%)", }, }, "default_sweep": "pair_total", "default_2d": ("pair_total", "ratio_tolerance"), }, "detect excessive": { "name": "Detect Excessive Transaction Activity", "current": "5-day incoming Cash+Check sum > $10K", "sweep_params": { "floor_amount": { "col": "trigger_amt", "label": "sum threshold ($)", "current": 10_000, "direction": "gte", "sweep_min": 2_000, # sweep_max derived from data p95 — no catalogue cap "desc": "Minimum N-day incoming Cash+Check sum to trigger (currently $10K over 5 days)", }, "time_window": { "col": None, "label": "time window (days)", "current": 5, "direction": "window", "values": [3, 5, 7, 10, 14], "desc": "Aggregation window in days (currently 5 days); options: 3, 5, 7, 10, 14", }, }, "default_sweep": "floor_amount", "default_2d": ("floor_amount", "time_window"), }, "structuring (incoming cash)": { "name": "Structuring (Incoming Cash)", "current": "3 qualifying days within 14-day window, each day's Cash CashIn total $3K-$40K", "sweep_params": { "daily_floor": { "col": "trigger_amt", "label": "min daily Cash amount ($)", "current": 3_000, "direction": "gte", "sweep_min": 500, "desc": "Minimum daily Cash CashIn total for a qualifying day (currently $3K)", }, "days_required": { "col": "days_observed", "label": "qualifying days required", "current": 3, "direction": "gte", "sweep_min": 1, "integer_axis": True, "desc": "Minimum number of qualifying days in the window (currently 3)", }, }, "default_sweep": "daily_floor", "default_2d": ("daily_floor", "days_required"), }, "structuring (outgoing cash)": { "name": "Structuring (Outgoing Cash)", "current": "3 qualifying days within 14-day window, each day's Cash CashOut total $7K-$30K", "sweep_params": { "daily_floor": { "col": "trigger_amt", "label": "min daily Cash amount ($)", "current": 7_000, "direction": "gte", "sweep_min": 1_000, "desc": "Minimum daily Cash CashOut total for a qualifying day (currently $7K)", }, "days_required": { "col": "days_observed", "label": "qualifying days required", "current": 3, "direction": "gte", "sweep_min": 1, "integer_axis": True, "desc": "Minimum number of qualifying days in the window (currently 3)", }, }, "default_sweep": "daily_floor", "default_2d": ("daily_floor", "days_required"), }, "ctr client": { "name": "CTR Client", "current": "Cash + Currency Exchange in/out total > $10K", "sweep_params": { "floor_amount": { "col": "trigger_amt", "label": "total Cash threshold ($)", "current": 10_000, "direction": "gte", "sweep_min": 5_000, "desc": "Minimum Cash/Currency Exchange total to trigger (currently $10K)", }, }, "default_sweep": "floor_amount", "default_2d": ("floor_amount", "floor_amount"), }, "burst in originator activity": { "name": "Burst in Originator Activity", "current": "5-day Incoming Wire/ACH sum >= $5K, >= 3 transactions, >= 2 distinct counterparties", "sweep_params": { "floor_amount": { "col": "trigger_amt", "label": "5-day incoming sum ($)", "current": 5_000, "direction": "gte", "sweep_min": 1_000, "desc": "Minimum 5-day incoming Wire/ACH sum to trigger (currently $5K)", }, "min_transactions": { "col": "txn_count", "label": "minimum transaction count", "current": 3, "direction": "gte", "sweep_min": 1, "integer_axis": True, "desc": "Minimum number of transactions in the 5-day window (currently 3)", }, }, "default_sweep": "floor_amount", "default_2d": ("floor_amount", "min_transactions"), }, "burst in beneficiary activity": { "name": "Burst in Beneficiary Activity", "current": "5-day Outgoing Wire/ACH sum >= $5K, >= 3 transactions, >= 2 distinct counterparties", "sweep_params": { "floor_amount": { "col": "trigger_amt", "label": "5-day outgoing sum ($)", "current": 5_000, "direction": "gte", "sweep_min": 1_000, "desc": "Minimum 5-day outgoing Wire/ACH sum to trigger (currently $5K)", }, "min_transactions": { "col": "txn_count", "label": "minimum transaction count", "current": 3, "direction": "gte", "sweep_min": 1, "integer_axis": True, "desc": "Minimum number of transactions in the 5-day window (currently 3)", }, }, "default_sweep": "floor_amount", "default_2d": ("floor_amount", "min_transactions"), }, "risky international transfer": { "name": "Risky International Transfer", "current": "Single Wire to/from medium-risk country >= $300K or >= $500K", "sweep_params": { "floor_amount": { "col": "trigger_amt", "label": "single Wire floor ($)", "current": 300_000, "direction": "gte", "sweep_min": 100_000, "desc": "Minimum single Wire amount to a risky country to trigger (currently $300K)", }, }, "default_sweep": "floor_amount", "default_2d": ("floor_amount", "floor_amount"), }, } # end _LEGACY_CATALOGUE — not used; kept for reference only def _load_thresholds(): """ Patch RULE_CATALOGUE with current thresholds from rule_thresholds.json. When a rule's operational threshold changes, only the JSON needs updating. """ if not os.path.exists(_THRESHOLDS): return with open(_THRESHOLDS, "r") as f: config = json.load(f) for rule_key, rule_cfg in config.items(): if rule_key.startswith("_"): continue if rule_key not in RULE_CATALOGUE: continue entry = RULE_CATALOGUE[rule_key] if "current_condition" in rule_cfg: entry["current"] = rule_cfg["current_condition"] for param_name, value in rule_cfg.get("thresholds", {}).items(): if param_name in entry["sweep_params"]: entry["sweep_params"][param_name]["current"] = value _load_thresholds() def _match_rule(keyword): """Return (canonical_name, catalogue_entry) or (None, None).""" kw = keyword.strip().lower() for key, entry in RULE_CATALOGUE.items(): if kw in key or key in kw or kw in entry["name"].lower(): return entry["name"], entry return None, None # ── Load ─────────────────────────────────────────────────────────────────────── def load_rule_sweep_data(): """Load rule sweep data — production CSV or synthetic (when ARIA_SYNTH_DATA_DIR is set).""" if not os.path.exists(_CSV): return None df = pd.read_csv(_CSV) try: from column_mapper import normalize_columns df = normalize_columns(df, verbose=True) except Exception: pass # Normalise customer_id column name (synth uses aria_customer_id) if "aria_customer_id" in df.columns and "customer_id" not in df.columns: df = df.rename(columns={"aria_customer_id": "customer_id"}) # Derive dynamic_segment from customer_type if missing (needed for cluster filter) if "dynamic_segment" not in df.columns and "customer_type" in df.columns: df["dynamic_segment"] = df["customer_type"].str.upper().map( {"BUSINESS": 0, "INDIVIDUAL": 1} ) source = "synthetic" if os.getenv("ARIA_SYNTH_DATA_DIR") else "production" print(f"[rule_analysis] loaded {len(df):,} rows from {source} data ({_CSV})") return df # ── list_rules ───────────────────────────────────────────────────────────────── def list_rules_text(df): """ Pre-computed text listing available rules with SAR/FP counts and sweep options. """ lines = ["=== PRE-COMPUTED RULE LIST (copy this verbatim) ==="] lines.append("Available AML rules with SAR/FP performance (detailed table shown in chart below):") lines.append("NOTE: This is the COMPLETE list of rules in the system. Do NOT add or infer any rules not listed here.") for _, entry in RULE_CATALOGUE.items(): rf = entry["name"] grp = df[df["risk_factor"] == rf] if df is not None else pd.DataFrame() n = len(grp) sar = int(grp["is_sar"].sum()) if n > 0 else 0 fp = int((grp["is_sar"] == 0).sum()) if n > 0 else 0 prec = f"{round(100*sar/(sar+fp),1)}%" if (sar + fp) > 0 else "n/a" sweep_keys = ", ".join(entry["sweep_params"].keys()) lines.append(f" {rf}: alerts={n}, SAR={sar}, FP={fp}, precision={prec}, sweep_params=[{sweep_keys}]") lines.append("=== END RULE LIST ===") return "\n".join(lines) # ── cluster_rule_summary ─────────────────────────────────────────────────────── def cluster_rule_summary_text(df, cluster_n): """ Pre-computed SAR/FP/precision for ALL rules filtered to customers in one cluster. Parameters ---------- df : DataFrame already filtered to the target cluster by _filter_by_cluster() cluster_n : int — cluster number (1-based), used only for display """ if df is None or len(df) == 0: return ( f"=== PRE-COMPUTED CLUSTER RULE SUMMARY (copy this verbatim) ===\n" f"Cluster {cluster_n} — no alert data found for this cluster.\n" f"=== END CLUSTER RULE SUMMARY ===" ) n_customers = df["customer_id"].nunique() if "customer_id" in df.columns else len(df) lines = ["=== PRE-COMPUTED CLUSTER RULE SUMMARY (copy this verbatim) ==="] lines.append(f"Cluster {cluster_n} — {n_customers:,} customers in rule alert data") lines.append("SAR/FP performance for all rules filtered to this cluster:") lines.append("NOTE: alerts=0 means no alerts from this rule for customers in this cluster.") active_lines, inactive = [], [] for _, entry in RULE_CATALOGUE.items(): rf = entry["name"] grp = df[df["risk_factor"] == rf] n = len(grp) if n == 0: inactive.append(rf) continue sar = int(grp["is_sar"].sum()) fp = int((grp["is_sar"] == 0).sum()) prec = f"{round(100 * sar / (sar + fp), 1)}%" if (sar + fp) > 0 else "n/a" active_lines.append(f" {rf}: alerts={n}, SAR={sar}, FP={fp}, precision={prec}") lines.extend(active_lines) if inactive: lines.append(f"Rules with alerts=0 in Cluster {cluster_n}: {', '.join(inactive)}") lines.append("=== END CLUSTER RULE SUMMARY ===") return "\n".join(lines) # ── rule_sar_backtest ────────────────────────────────────────────────────────── def compute_rule_sar_sweep(df, risk_factor_keyword, sweep_param=None, max_rows=MAX_SWEEP_ROWS): """ Sweep a rule condition parameter and show SAR caught / FP remaining / SAR missed at each threshold level. Parameters ---------- df : DataFrame from load_rule_sweep_data() risk_factor_keyword : str — e.g. "Activity Deviation" or "elder" sweep_param : str — key from rule's sweep_params dict (or None for default) """ if df is None: return "Rule sweep data not loaded. Run python prepare_rule_sweep_data.py first." rf_name, entry = _match_rule(risk_factor_keyword) if entry is None: known = [e["name"] for e in RULE_CATALOGUE.values()] return ( f"No rule matched '{risk_factor_keyword}'. " f"Call list_rules to see available rules. Known: {known}" ) # Resolve sweep parameter — normalize spaces/hyphens to underscores if sweep_param is not None: sweep_param = str(sweep_param).replace(" ", "_").replace("-", "_") if sweep_param is None or sweep_param not in entry["sweep_params"]: sweep_param = entry["default_sweep"] sp = entry["sweep_params"][sweep_param] col = sp["col"] # Filter to this rule, only rows with SAR label and sweep column rule_df = df[df["risk_factor"] == rf_name].copy() known = rule_df.dropna(subset=["is_sar", col]).copy() total_sars = int((known["is_sar"] == 1).sum()) total_fps = int((known["is_sar"] == 0).sum()) total = len(known) precision = round(100 * total_sars / total, 1) if total > 0 else 0.0 header = [ "=== PRE-COMPUTED RULE SWEEP (copy this verbatim, do not alter numbers) ===", f"Rule: {rf_name}", f"Current condition: {entry['current']}", f"Sweep parameter: {sweep_param} - {sp['desc']}", f"Current value: {sp['current']:,}", f"Labeled population: {total} customers (TP+FN pool={total_sars} SAR, FP+TN pool={total_fps} non-SAR, precision={precision}%)", ] if total == 0: header.append("No labelled customers with this sweep column. Cannot sweep.") header.append("=== END RULE SAR SWEEP ===") return "\n".join(header) if total_sars == 0: header.append("No SAR customers in this rule's alerted population — all alerts are false positives.") header.append("=== END RULE SAR SWEEP ===") return "\n".join(header) # Use same nice-step logic as _sweep_points for consistency cur = float(sp["current"]) raw_points = _sweep_points(known, sp, n_steps=16) sweep = [] for t in raw_points: if sp["direction"] == "gte": mask_caught = (known[col] >= t) & (known["is_sar"] == 1) mask_fp = (known[col] >= t) & (known["is_sar"] == 0) else: # lte mask_caught = (known[col] <= t) & (known["is_sar"] == 1) mask_fp = (known[col] <= t) & (known["is_sar"] == 0) caught = int(mask_caught.sum()) fp_rem = int(mask_fp.sum()) missed = total_sars - caught sweep.append((round(t, 2), caught, fp_rem, missed)) # Key thresholds def last_point_above_rate(target_rate): target = int(total_sars * target_rate) candidates = [s for s in sweep if s[1] > target] return candidates[-1] if candidates else sweep[0] t90 = last_point_above_rate(0.90) t50 = last_point_above_rate(0.50) # Find current-condition row cur_row = next((s for s in sweep if s[0] == round(cur, 2)), None) def _prec(tp, fp): return round(100 * tp / (tp + fp), 1) if (tp + fp) > 0 else 0.0 # TP=caught SAR, FP=caught non-SAR, FN=missed SAR, TN=uncaught non-SAR first_tp = sweep[0][1]; first_fp = sweep[0][2] first_fn = total_sars - first_tp; first_tn = total_fps - first_fp first_pct = round(100 * first_tp / total_sars, 1) lines = header + [""] lines.append( f"At the lowest value ({sweep[0][0]:,.2f}): " f"TP={first_tp}, FP={first_fp}, FN={first_fn}, TN={first_tn} " f"(TP rate={first_pct}%, precision={_prec(first_tp, first_fp)}%)." ) if cur_row: cur_tp = cur_row[1]; cur_fp = cur_row[2] cur_fn = total_sars - cur_tp; cur_tn = total_fps - cur_fp cur_pct = round(100 * cur_tp / total_sars, 1) lines.append( f"At current condition ({cur_row[0]:,.2f}): " f"TP={cur_tp}, FP={cur_fp}, FN={cur_fn}, TN={cur_tn} " f"(TP rate={cur_pct}%, precision={_prec(cur_tp, cur_fp)}%)." ) t90_tp = t90[1]; t90_fp = t90[2]; t90_fn = total_sars - t90_tp; t90_tn = total_fps - t90_fp t50_tp = t50[1]; t50_fp = t50[2]; t50_fn = total_sars - t50_tp; t50_tn = total_fps - t50_fp lines.append(f"To keep TP rate >=90%: {sweep_param} <= {t90[0]:,.2f} => TP={t90_tp}, FP={t90_fp}, FN={t90_fn}, TN={t90_tn}, precision={_prec(t90_tp, t90_fp)}%.") lines.append(f"To keep TP rate >=50%: {sweep_param} <= {t50[0]:,.2f} => TP={t50_tp}, FP={t50_fp}, FN={t50_fn}, TN={t50_tn}, precision={_prec(t50_tp, t50_fp)}%.") last = sweep[-1] last_fn = total_sars - last[1]; last_tn = total_fps - last[2] lines.append(f"At the highest value ({last[0]:,.2f}): TP={last[1]}, FP={last[2]}, FN={last_fn}, TN={last_tn}, precision={_prec(last[1], last[2])}%.") lines.append("=== END RULE SWEEP ===") lines.append("(Detailed sweep table shown in the chart below.)") return "\n".join(lines) # ── rule_2d_sweep ────────────────────────────────────────────────────────────── def _get_mask(df, sp, val): """Return boolean mask for a single sweep param at a given value.""" direction = sp["direction"] if direction == "gte": return df[sp["col"]] >= val elif direction == "abs_lte": return (df[sp["col"]] - 1.0).abs() <= val elif direction == "window": col = f"max_rolling_{int(val)}d" if col not in df.columns: return pd.Series(False, index=df.index) return df[col] >= df["trigger_amt"] # fires if rolling sum >= current floor return pd.Series(False, index=df.index) def _sweep_points(df, sp, n_steps=15): """Return a list of sweep values for a param. For dollar-amount columns: generates nice round steps centered on the current operational threshold (e.g. current=$20K → 5K,10K,15K,20K,25K,30K,35K,40K). For z-score columns: fixed 0–10 integer range. For integer-axis columns (age, count): integer steps. For fixed-value params (ratio_tolerance, time_window): use catalogue values list. """ direction = sp["direction"] if direction == "window": return sp["values"] # Fixed value list (e.g. ratio_tolerance percentage steps) if "values" in sp: pts = [float(v) for v in sp["values"]] cur = float(sp["current"]) if cur not in pts: pts.append(cur) return sorted(set(pts)) col = sp["col"] raw_vals = df[col].dropna() if len(raw_vals) == 0: return [] cur = float(sp["current"]) # z-score: fixed 0–10 integer range if col in ("z_score",): i_max = 10 total_ints = i_max + 1 step = max(1, math.ceil(total_ints / (n_steps + 1))) pts = list(range(0, i_max + 1, step)) cur_i = int(round(cur)) if cur_i not in pts: pts.append(cur_i) return sorted(set(pts)) # Integer-axis params (age, count): integer steps centered on current if sp.get("integer_axis"): hard_min = int(sp.get("sweep_min", 1)) i_min = max(hard_min, int(round(cur)) - n_steps // 2) i_max = int(round(cur)) + n_steps // 2 total_ints = i_max - i_min + 1 step = max(1, math.ceil(total_ints / (n_steps + 1))) pts = list(range(i_min, i_max + 1, step)) cur_i = int(round(cur)) if cur_i not in pts: pts.append(cur_i) return sorted(set(pts)) # Dollar-amount columns: nice round steps centered on current threshold. # Step size is chosen to give ~8 steps either side of current. def _nice_step(value, half_steps=4): """Round step giving ~half_steps increments on each side of value.""" raw = value / half_steps if raw <= 0: return 1.0 mag = 10 ** math.floor(math.log10(raw)) n = raw / mag if n < 1.5: nice = 1 elif n < 3.5: nice = 2 elif n < 7.5: nice = 5 else: nice = 10 return nice * mag # If explicit sweep_min AND sweep_max are both given, distribute evenly between them. # Round step UP (not down) so the grid stays compact (~6–9 steps). if "sweep_min" in sp and "sweep_max" in sp: t_min = float(sp["sweep_min"]) t_max = float(sp["sweep_max"]) target = max(5, min(n_steps, 9)) raw_step = (t_max - t_min) / (target - 1) mag = 10 ** math.floor(math.log10(raw_step)) n = raw_step / mag if n <= 1.5: nice = 2 # round up from 1× elif n <= 3.5: nice = 5 # round up from 2× elif n <= 7.5: nice = 10 # round up from 5× else: nice = 10 step = nice * mag n_pts = int(round((t_max - t_min) / step)) + 1 pts = [int(round(t_min + i * step)) for i in range(n_pts)] if int(round(cur)) not in pts and t_min <= cur <= t_max: pts.append(int(round(cur))) return sorted(set(pts)) step = _nice_step(cur) hard_min = float(sp.get("sweep_min", step)) t_min = max(hard_min, cur - 4 * step) t_max = cur + 4 * step n_pts = int(round((t_max - t_min) / step)) + 1 pts = [int(round(t_min + i * step)) for i in range(n_pts)] if int(round(cur)) not in pts: pts.append(int(round(cur))) return sorted(set(pts)) def _fmt_cur(sp): """Format a param's current value cleanly: integers as int, pct as %, floats strip trailing zeros.""" v = sp["current"] if sp.get("format_pct"): pct = v * 100 return f"{pct:g}%" if sp.get("integer_axis") or (isinstance(v, (int, float)) and v == int(v)): return str(int(v)) return str(v) def _fmt_v(v, sp): """Format any sweep value for PRE-COMPUTED text output.""" if sp.get("format_pct"): pct = v * 100 return f"{pct:g}%" if sp.get("integer_axis"): return str(int(round(v))) if v == int(v): return f"{int(v):,}" return f"{v:,.2f}" def compute_rule_2d_sweep(df, risk_factor_keyword, param1=None, param2=None): """ 2D grid sweep: for each (param1_val, param2_val) combination, count SAR caught and FP remaining when both conditions are applied simultaneously. Returns (result_text, grid_dict) where grid_dict has keys: p1_vals, p2_vals, sar_grid (list of lists), fp_grid, total_sars, p1_label, p2_label """ if df is None: return "Rule sweep data not loaded.", None rf_name, entry = _match_rule(risk_factor_keyword) if entry is None: return f"No rule matched '{risk_factor_keyword}'. Call list_rules to see available rules.", None params = entry["sweep_params"] keys = list(params.keys()) # Resolve param1 / param2 default_2d = entry.get("default_2d", (keys[0], keys[1] if len(keys) > 1 else keys[0])) if param1 is None: param1 = default_2d[0] if param2 is None: param2 = default_2d[1] # Normalize param names: model may use spaces instead of underscores or pass integers if param1 is not None: param1 = str(param1).replace(" ", "_").replace("-", "_") if param2 is not None: param2 = str(param2).replace(" ", "_").replace("-", "_") if param1 not in params: return f"Unknown sweep_param_1 '{param1}'. Valid: {keys}", None if param2 not in params: return f"Unknown sweep_param_2 '{param2}'. Valid: {keys}", None if param1 == param2: return "sweep_param_1 and sweep_param_2 must be different.", None sp1 = params[param1] sp2 = params[param2] rule_df = df[df["risk_factor"] == rf_name].copy() # For window direction, need trigger_amt to be present needed = [] if sp1["direction"] != "window" and sp1["col"]: needed.append(sp1["col"]) if sp2["direction"] != "window" and sp2["col"]: needed.append(sp2["col"]) needed.append("is_sar") known = rule_df.dropna(subset=needed).copy() total_sars = int((known["is_sar"] == 1).sum()) total_fps = int((known["is_sar"] == 0).sum()) if total_sars == 0: return f"No SAR customers in {rf_name}. Cannot build 2D grid.", None p1_vals = _sweep_points(known, sp1) p2_vals = _sweep_points(known, sp2) # Build grids: rows = p1, cols = p2 sar_grid, fp_grid = [], [] for v1 in p1_vals: sar_row, fp_row = [], [] m1 = _get_mask(known, sp1, v1) for v2 in p2_vals: m2 = _get_mask(known, sp2, v2) both = m1 & m2 sar_row.append(int((both & (known["is_sar"] == 1)).sum())) fp_row.append(int((both & (known["is_sar"] == 0)).sum())) sar_grid.append(sar_row) fp_grid.append(fp_row) grid_dict = { "p1_vals": p1_vals, "p2_vals": p2_vals, "sar_grid": sar_grid, "fp_grid": fp_grid, "total_sars": total_sars, "total_fps": total_fps, "p1_label": sp1["label"], "p2_label": sp2["label"], "p1_current": sp1["current"], "p2_current": sp2["current"], "p1_format_pct": sp1.get("format_pct", False), "p2_format_pct": sp2.get("format_pct", False), "rf_name": rf_name, "param1": param1, "param2": param2, } # Text summary — compute current condition directly using actual thresholds # (not via nearest sweep point, since current may be outside the sweep range) m1_cur = _get_mask(known, sp1, float(sp1["current"])) m2_cur = _get_mask(known, sp2, float(sp2["current"])) cur_both = m1_cur & m2_cur cur_sar = int((cur_both & (known["is_sar"] == 1)).sum()) cur_fp = int((cur_both & (known["is_sar"] == 0)).sum()) cur_pct = round(100 * cur_sar / total_sars, 1) cur_fn = total_sars - cur_sar cur_tn = total_fps - cur_fp lines = [ "=== PRE-COMPUTED 2D SWEEP (copy this verbatim, do not alter numbers) ===", f"Rule: {rf_name}", f"Axis 1 ({param1}): {sp1['desc']}", f"Axis 2 ({param2}): {sp2['desc']}", f"Grid: {len(p1_vals)} x {len(p2_vals)} = {len(p1_vals)*len(p2_vals)} combinations", f"SAR pool: {total_sars}", f"Non-SAR pool: {total_fps}", "", f"At current condition ({param1}={_fmt_cur(sp1)}, {param2}={_fmt_cur(sp2)}): " f"TP={cur_sar}, FP={cur_fp}, FN={cur_fn}, TN={cur_tn} (TP rate={cur_pct}%).", ] # Best FP reduction cell: lowest FP count with TP rate >= 50% # (operationally: biggest workload reduction while keeping half of SARs) best_fp_val, best_v1, best_v2, best_tp, best_fp = float("inf"), None, None, 0, 0 for i, v1 in enumerate(p1_vals): for j, v2 in enumerate(p2_vals): tp, fp = sar_grid[i][j], fp_grid[i][j] if tp < total_sars * 0.5: continue if fp < best_fp_val: best_fp_val, best_v1, best_v2, best_tp, best_fp = fp, v1, v2, tp, fp if best_v1 is not None and best_v1 != p1_vals[0]: best_fn = total_sars - best_tp best_tn = total_fps - best_fp best_pct = round(100 * best_tp / total_sars, 1) best_prec = round(100 * best_tp / (best_tp + best_fp), 1) if (best_tp + best_fp) > 0 else 0 lines.append( f"Best FP reduction (TP rate >=50%): {param1}={_fmt_v(best_v1, sp1)}, {param2}={_fmt_v(best_v2, sp2)} " f"=> TP={best_tp}, FP={best_fp}, FN={best_fn}, TN={best_tn}, TP rate={best_pct}%, precision={best_prec}%." ) lines.append("=== END 2D SWEEP ===") lines.append("(Heatmap shown in the chart below.)") return "\n".join(lines), grid_dict def compute_2d_drilldown(df, risk_factor_keyword, param1, p1_val, param2, p2_val): """ For a specific (p1_val, p2_val) cell in the 2D grid, return the customer-level breakdown: - tp_df : SAR customers who ARE alerted (true positives) - fp_df : non-SAR customers who ARE alerted (false positives) - fn_df : SAR customers who are NOT alerted (false negatives — missed) - tn_df : non-SAR customers who are NOT alerted (true negatives) Also returns the column names for param1 / param2 for display. Returns (tp_df, fp_df, fn_df, tn_df, col1, col2) or (None,)*6 on error. """ rf_name, entry = _match_rule(risk_factor_keyword) if entry is None: return (None,) * 6 params = entry["sweep_params"] if param1 not in params or param2 not in params: return (None,) * 6 sp1 = params[param1] sp2 = params[param2] col1 = sp1.get("col", param1) col2 = sp2.get("col", param2) rule_df = df[df["risk_factor"] == rf_name].copy() needed = [c for c in [sp1.get("col"), sp2.get("col"), "is_sar"] if c] known = rule_df.dropna(subset=needed).copy() m1 = _get_mask(known, sp1, float(p1_val)) m2 = _get_mask(known, sp2, float(p2_val)) alerted = m1 & m2 is_sar = known["is_sar"] == 1 tp_df = known[ alerted & is_sar].copy() fp_df = known[ alerted & ~is_sar].copy() fn_df = known[~alerted & is_sar].copy() tn_df = known[~alerted & ~is_sar].copy() return tp_df, fp_df, fn_df, tn_df, col1, col2