Spaces:
Sleeping
Sleeping
| """ | |
| utils/scenario_engine_ng.py โ Version 2.1 | |
| Upgrades: | |
| โข Reads feature_metadata.json for range/type validation | |
| โข Clips values automatically to safe ranges | |
| โข Adds structured error handling & clean audit output | |
| """ | |
| import re | |
| import json | |
| import pandas as pd | |
| from typing import List, Dict, Any, Tuple, Optional | |
| from utils.models import load_model # your existing loader | |
| # ------------------------------------------------------------ | |
| # ๐ Load Feature Metadata | |
| # ------------------------------------------------------------ | |
| def _load_metadata(path: str = "data/feature_metadata.json") -> Dict[str, Any]: | |
| try: | |
| with open(path, "r") as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| print("โ ๏ธ Metadata file not found, proceeding without validation.") | |
| return {} | |
| except Exception as e: | |
| print(f"โ ๏ธ Could not read metadata: {e}") | |
| return {} | |
| FEATURE_META = _load_metadata() | |
| # ------------------------------------------------------------ | |
| # ๐ข Regex helpers for numeric parsing | |
| # ------------------------------------------------------------ | |
| PCT_RE = re.compile(r"^\s*([+-]?\d+(\.\d+)?)\s*%\s*$") | |
| NUM_RE = re.compile(r"^\s*([+-]?\d+(\.\d+)?)\s*$") | |
| def _parse_value(v: Any) -> Tuple[str, float]: | |
| """Parse a string like '+10%', '-5', or '1.2' โ ('percent'|'absolute', number)""" | |
| s = str(v).strip().lower() | |
| if m := PCT_RE.match(s): | |
| return ("percent", float(m.group(1)) / 100.0) | |
| if m := NUM_RE.match(s): | |
| return ("absolute", float(m.group(1))) | |
| # last-resort float extraction | |
| nums = re.findall(r"[-+]?\d*\.\d+|[-+]?\d+", s) | |
| if nums: | |
| return ("absolute", float(nums[0])) | |
| raise ValueError(f"Unsupported value format: {v!r}") | |
| # ------------------------------------------------------------ | |
| # ๐งฎ Validation Helpers | |
| # ------------------------------------------------------------ | |
| def _ensure_numeric(df: pd.DataFrame, col: str): | |
| if col not in df.columns: | |
| raise KeyError(f"Column '{col}' not in dataset.") | |
| if not pd.api.types.is_numeric_dtype(df[col]): | |
| raise TypeError(f"Column '{col}' must be numeric; got dtype {df[col].dtype}.") | |
| def _subset(df: pd.DataFrame, where: Optional[str]) -> pd.Index: | |
| if not where: | |
| return df.index | |
| try: | |
| return df.query(where).index | |
| except Exception as e: | |
| raise ValueError(f"Invalid filter: {where!r} โ {e}") | |
| def _apply_metadata_limits(df: pd.DataFrame, col: str): | |
| """Clip column values based on metadata min/max.""" | |
| meta = FEATURE_META.get(col) | |
| if not meta or "min" not in meta or "max" not in meta: | |
| return | |
| before = df[col].copy() | |
| df[col] = df[col].clip(lower=meta["min"], upper=meta["max"]) | |
| if not before.equals(df[col]): | |
| print(f"๐ '{col}' clipped to range [{meta['min']}, {meta['max']}]") | |
| return df | |
| # ------------------------------------------------------------ | |
| # โ๏ธ Apply a Single Operation | |
| # ------------------------------------------------------------ | |
| def _apply_op(df: pd.DataFrame, op: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Apply a single operation {'op','col','value','where?','min?','max?'}. | |
| Returns a small audit dict describing the change. | |
| """ | |
| kind = op.get("op") | |
| col = op.get("col") | |
| val = op.get("value") | |
| where = op.get("where") | |
| if kind not in {"scale", "shift", "set", "clip"}: | |
| raise ValueError(f"Unsupported op '{kind}'.") | |
| if not col: | |
| raise ValueError("Missing 'col' in operation.") | |
| idx = _subset(df, where) | |
| _ensure_numeric(df, col) | |
| # ----- Clip ----- | |
| if kind == "clip": | |
| min_v = op.get("min") | |
| max_v = op.get("max") | |
| before = df.loc[idx, col].copy() | |
| df.loc[idx, col] = df.loc[idx, col].clip(lower=min_v, upper=max_v) | |
| _apply_metadata_limits(df, col) | |
| return { | |
| "op": kind, | |
| "col": col, | |
| "where": where, | |
| "count": len(idx), | |
| "min": min_v, | |
| "max": max_v, | |
| "delta_mean": float(df.loc[idx, col].mean() - before.mean()), | |
| } | |
| # ----- Scale / Shift / Set ----- | |
| mode, num = _parse_value(val) | |
| before = df.loc[idx, col].copy() | |
| if kind == "scale": | |
| factor = (1.0 + num) if mode == "percent" else float(num) | |
| df.loc[idx, col] = (df.loc[idx, col].astype(float) * factor).astype(df[col].dtype) | |
| elif kind == "shift": | |
| shift = num if mode == "absolute" else df.loc[idx, col] * num | |
| df.loc[idx, col] += shift | |
| elif kind == "set": | |
| if mode != "absolute": | |
| raise ValueError("For 'set', provide a numeric value (e.g., 3.2), not a percent.") | |
| df.loc[idx, col] = float(num) | |
| # Apply metadata clipping (safety net) | |
| _apply_metadata_limits(df, col) | |
| after = df.loc[idx, col] | |
| return { | |
| "op": kind, | |
| "col": col, | |
| "where": where, | |
| "value": val, | |
| "count": len(idx), | |
| "before_mean": float(before.mean()) if len(before) else None, | |
| "after_mean": float(after.mean()) if len(after) else None, | |
| "delta_mean": float(after.mean() - before.mean()) if len(after) else None, | |
| } | |
| # ------------------------------------------------------------ | |
| # ๐ Model Feature Utilities | |
| # ------------------------------------------------------------ | |
| def _expected_features(model, df: pd.DataFrame, target: str = "churn") -> List[str]: | |
| """Return model features using .feature_names_in_ or numeric fallback.""" | |
| names = getattr(model, "feature_names_in_", None) | |
| if names is not None and len(names): | |
| return list(names) | |
| numeric_cols = list(df.select_dtypes(include="number").columns) | |
| bad = {target, "userid", "user_id", "id", "label"} | |
| return [c for c in numeric_cols if c not in bad] | |
| # ------------------------------------------------------------ | |
| # ๐ Public Simulation API | |
| # ------------------------------------------------------------ | |
| def simulate_plan( | |
| plan: List[Dict[str, Any]], | |
| data_path: str = "data/data_randomforest.csv", | |
| model_path: str = "app_best.joblib", | |
| target_col: str = "churn", | |
| ) -> Dict[str, Any]: | |
| """ | |
| Apply a list of generic operations to the dataset, then recompute churn with the trained model. | |
| """ | |
| df = pd.read_csv(data_path) | |
| model = load_model(model_path) | |
| feats = _expected_features(model, df, target=target_col) | |
| # --- Baseline --- | |
| try: | |
| X0 = df[feats] | |
| base_prob = model.predict_proba(X0)[:, 1] | |
| base_rate = float(base_prob.mean() * 100) | |
| except Exception as e: | |
| return {"summary": f"โ ๏ธ Baseline prediction error: {e}", "df": df} | |
| # --- Apply Ops --- | |
| audit = [] | |
| try: | |
| for i, op in enumerate(plan, 1): | |
| res = _apply_op(df, op) | |
| res["index"] = i | |
| audit.append(res) | |
| except Exception as e: | |
| return {"summary": f"โ ๏ธ Plan application error: {e}", "df": df, "audit": audit} | |
| # --- Post-change Predictions --- | |
| try: | |
| X1 = df[feats] | |
| new_prob = model.predict_proba(X1)[:, 1] | |
| new_rate = float(new_prob.mean() * 100) | |
| except Exception as e: | |
| return {"summary": f"โ ๏ธ Post-change prediction error: {e}", "df": df, "audit": audit} | |
| # --- Summary --- | |
| delta = new_rate - base_rate | |
| dir_emoji = "๐" if delta < 0 else "๐" if delta > 0 else "โ" | |
| summary = ( | |
| f"{dir_emoji} Churn changed from {base_rate:.2f}% โ {new_rate:.2f}% " | |
| f"({delta:+.2f} pts) after applying {len(plan)} operation(s)." | |
| ) | |
| return { | |
| "summary": summary, | |
| "df": df, | |
| "audit": audit, | |
| "metrics": { | |
| "baseline_churn_rate": base_rate, | |
| "new_churn_rate": new_rate, | |
| "delta_churn_rate": delta, | |
| }, | |
| "model_features_used": feats, | |
| } | |