""" utils/scenario_engine_ng.py — Version 2.1 Upgrades: • Reads feature_metadata.json for range/type validation • Clips values automatically to safe ranges • Adds structured error handling & clean audit output """ import re import json import pandas as pd from typing import List, Dict, Any, Tuple, Optional from utils.models import load_model # your existing loader # ------------------------------------------------------------ # 🔖 Load Feature Metadata # ------------------------------------------------------------ def _load_metadata(path: str = "data/feature_metadata.json") -> Dict[str, Any]: try: with open(path, "r") as f: return json.load(f) except FileNotFoundError: print("⚠️ Metadata file not found, proceeding without validation.") return {} except Exception as e: print(f"⚠️ Could not read metadata: {e}") return {} FEATURE_META = _load_metadata() # ------------------------------------------------------------ # 🔢 Regex helpers for numeric parsing # ------------------------------------------------------------ PCT_RE = re.compile(r"^\s*([+-]?\d+(\.\d+)?)\s*%\s*$") NUM_RE = re.compile(r"^\s*([+-]?\d+(\.\d+)?)\s*$") def _parse_value(v: Any) -> Tuple[str, float]: """Parse a string like '+10%', '-5', or '1.2' → ('percent'|'absolute', number)""" s = str(v).strip().lower() if m := PCT_RE.match(s): return ("percent", float(m.group(1)) / 100.0) if m := NUM_RE.match(s): return ("absolute", float(m.group(1))) # last-resort float extraction nums = re.findall(r"[-+]?\d*\.\d+|[-+]?\d+", s) if nums: return ("absolute", float(nums[0])) raise ValueError(f"Unsupported value format: {v!r}") # ------------------------------------------------------------ # 🧮 Validation Helpers # ------------------------------------------------------------ def _ensure_numeric(df: pd.DataFrame, col: str): if col not in df.columns: raise KeyError(f"Column '{col}' not in dataset.") if not pd.api.types.is_numeric_dtype(df[col]): raise TypeError(f"Column '{col}' must be numeric; got dtype {df[col].dtype}.") def _subset(df: pd.DataFrame, where: Optional[str]) -> pd.Index: if not where: return df.index try: return df.query(where).index except Exception as e: raise ValueError(f"Invalid filter: {where!r} → {e}") def _apply_metadata_limits(df: pd.DataFrame, col: str): """Clip column values based on metadata min/max.""" meta = FEATURE_META.get(col) if not meta or "min" not in meta or "max" not in meta: return before = df[col].copy() df[col] = df[col].clip(lower=meta["min"], upper=meta["max"]) if not before.equals(df[col]): print(f"🔒 '{col}' clipped to range [{meta['min']}, {meta['max']}]") return df # ------------------------------------------------------------ # ⚙️ Apply a Single Operation # ------------------------------------------------------------ def _apply_op(df: pd.DataFrame, op: Dict[str, Any]) -> Dict[str, Any]: """ Apply a single operation {'op','col','value','where?','min?','max?'}. Returns a small audit dict describing the change. """ kind = op.get("op") col = op.get("col") val = op.get("value") where = op.get("where") if kind not in {"scale", "shift", "set", "clip"}: raise ValueError(f"Unsupported op '{kind}'.") if not col: raise ValueError("Missing 'col' in operation.") idx = _subset(df, where) _ensure_numeric(df, col) # ----- Clip ----- if kind == "clip": min_v = op.get("min") max_v = op.get("max") before = df.loc[idx, col].copy() df.loc[idx, col] = df.loc[idx, col].clip(lower=min_v, upper=max_v) _apply_metadata_limits(df, col) return { "op": kind, "col": col, "where": where, "count": len(idx), "min": min_v, "max": max_v, "delta_mean": float(df.loc[idx, col].mean() - before.mean()), } # ----- Scale / Shift / Set ----- mode, num = _parse_value(val) before = df.loc[idx, col].copy() if kind == "scale": factor = (1.0 + num) if mode == "percent" else float(num) df.loc[idx, col] = (df.loc[idx, col].astype(float) * factor).astype(df[col].dtype) elif kind == "shift": shift = num if mode == "absolute" else df.loc[idx, col] * num df.loc[idx, col] += shift elif kind == "set": if mode != "absolute": raise ValueError("For 'set', provide a numeric value (e.g., 3.2), not a percent.") df.loc[idx, col] = float(num) # Apply metadata clipping (safety net) _apply_metadata_limits(df, col) after = df.loc[idx, col] return { "op": kind, "col": col, "where": where, "value": val, "count": len(idx), "before_mean": float(before.mean()) if len(before) else None, "after_mean": float(after.mean()) if len(after) else None, "delta_mean": float(after.mean() - before.mean()) if len(after) else None, } # ------------------------------------------------------------ # 📊 Model Feature Utilities # ------------------------------------------------------------ def _expected_features(model, df: pd.DataFrame, target: str = "churn") -> List[str]: """Return model features using .feature_names_in_ or numeric fallback.""" names = getattr(model, "feature_names_in_", None) if names is not None and len(names): return list(names) numeric_cols = list(df.select_dtypes(include="number").columns) bad = {target, "userid", "user_id", "id", "label"} return [c for c in numeric_cols if c not in bad] # ------------------------------------------------------------ # 🚀 Public Simulation API # ------------------------------------------------------------ def simulate_plan( plan: List[Dict[str, Any]], data_path: str = "data/data_randomforest.csv", model_path: str = "app_best.joblib", target_col: str = "churn", ) -> Dict[str, Any]: """ Apply a list of generic operations to the dataset, then recompute churn with the trained model. """ df = pd.read_csv(data_path) model = load_model(model_path) feats = _expected_features(model, df, target=target_col) # --- Baseline --- try: X0 = df[feats] base_prob = model.predict_proba(X0)[:, 1] base_rate = float(base_prob.mean() * 100) except Exception as e: return {"summary": f"⚠️ Baseline prediction error: {e}", "df": df} # --- Apply Ops --- audit = [] try: for i, op in enumerate(plan, 1): res = _apply_op(df, op) res["index"] = i audit.append(res) except Exception as e: return {"summary": f"⚠️ Plan application error: {e}", "df": df, "audit": audit} # --- Post-change Predictions --- try: X1 = df[feats] new_prob = model.predict_proba(X1)[:, 1] new_rate = float(new_prob.mean() * 100) except Exception as e: return {"summary": f"⚠️ Post-change prediction error: {e}", "df": df, "audit": audit} # --- Summary --- delta = new_rate - base_rate dir_emoji = "📉" if delta < 0 else "📈" if delta > 0 else "➖" summary = ( f"{dir_emoji} Churn changed from {base_rate:.2f}% → {new_rate:.2f}% " f"({delta:+.2f} pts) after applying {len(plan)} operation(s)." ) return { "summary": summary, "df": df, "audit": audit, "metrics": { "baseline_churn_rate": base_rate, "new_churn_rate": new_rate, "delta_churn_rate": delta, }, "model_features_used": feats, }