Spaces:
Sleeping
Sleeping
| # scenario_engine.py | |
| from __future__ import annotations | |
| from typing import Dict, List, Any, Tuple, Optional | |
| import re, math, json, ast | |
| import numpy as np | |
| import pandas as pd | |
| from schema import ScenarioPlan, TaskPlan | |
| from column_resolver import resolve_cols | |
| # Allowed safe functions | |
| _ALLOWED_FUNCS = { | |
| "abs": abs, "round": round, "sqrt": math.sqrt, "log": math.log, "exp": math.exp, | |
| "min": np.minimum, "max": np.maximum, | |
| "mean": np.mean, "avg": np.mean, "median": np.median, "sum": np.sum, | |
| "count": lambda x: np.size(x), | |
| "p50": lambda x: np.percentile(x, 50), "p75": lambda x: np.percentile(x, 75), | |
| "p90": lambda x: np.percentile(x, 90), "p95": lambda x: np.percentile(x, 95), | |
| } | |
| # -------- SAFE EXPRESSION PARSER -------- | |
| class _SafeExpr(ast.NodeTransformer): | |
| def __init__(self, allowed: set): self.allowed = allowed | |
| def visit_Name(self, node): | |
| if node.id not in self.allowed and node.id not in ("True","False","None"): | |
| raise ValueError(f"Unknown name: {node.id}") | |
| return node | |
| def visit_Call(self, node): | |
| if not isinstance(node.func, ast.Name): | |
| raise ValueError("Only simple calls allowed") | |
| if node.func.id not in _ALLOWED_FUNCS: | |
| raise ValueError(f"Function not allowed: {node.func.id}") | |
| return self.generic_visit(node) | |
| def generic_visit(self, node): | |
| allowed = ( | |
| ast.Expression, ast.BoolOp, ast.BinOp, ast.UnaryOp, ast.Compare, ast.Call, ast.Name, | |
| ast.Load, ast.Constant, ast.And, ast.Or, ast.Not, ast.Add, ast.Sub, ast.Mult, ast.Div, | |
| ast.Mod, ast.Pow, ast.FloorDiv, ast.Eq, ast.NotEq, ast.Lt, ast.LtE, ast.Gt, ast.GtE, | |
| ast.USub, ast.UAdd | |
| ) | |
| if not isinstance(node, allowed): | |
| raise ValueError(f"Unsupported syntax: {type(node).__name__}") | |
| return super().generic_visit(node) | |
| def _eval_series_expr(expr: str, df: pd.DataFrame) -> pd.Series: | |
| names = set(df.columns) | {"True","False","None"} | |
| tree = ast.parse(expr, mode="eval") | |
| _SafeExpr(names).visit(tree) | |
| code = compile(tree, "<expr>", "eval") | |
| env = {**{k: df[k] for k in df.columns}, **_ALLOWED_FUNCS} | |
| return eval(code, {"__builtins__": {}}, env) | |
| # -------- COLUMN ROLE RESOLVER -------- | |
| SEMANTIC_ROLES = { | |
| "facility": ["facility", "hospital", "centre", "center", "clinic", "site", "settlement", "community"], | |
| "zone": ["zone", "region", "area", "district"], | |
| "specialty": ["specialty", "service", "program", "discipline"], | |
| "city": ["city", "town", "village"], | |
| "lat": ["latitude", "lat"], | |
| "lon": ["longitude", "lon", "lng"], | |
| } | |
| def resolve_role(df: pd.DataFrame, role: str) -> Optional[str]: | |
| """Find the best matching column for a semantic role.""" | |
| candidates = SEMANTIC_ROLES.get(role, []) | |
| lower_cols = {c.lower(): c for c in df.columns} | |
| for cand in candidates: | |
| for col_lc, col in lower_cols.items(): | |
| if cand in col_lc: | |
| return col | |
| return None | |
| # -------- MAIN ENGINE -------- | |
| class ScenarioEngine: | |
| def _as_df(v: Any) -> Optional[pd.DataFrame]: | |
| if isinstance(v, list): | |
| return pd.DataFrame(v) if v else pd.DataFrame() | |
| if isinstance(v, dict): | |
| return pd.DataFrame([v]) if all(isinstance(val, (int,float,str,bool,type(None))) for val in v.values()) else pd.DataFrame() | |
| if isinstance(v, pd.DataFrame): | |
| return v | |
| return None | |
| def execute_plan(plan: ScenarioPlan, datasets: Dict[str, Any]) -> str: | |
| sections: List[str] = ["# Scenario Output\n"] | |
| for t in plan.tasks: | |
| sections.append(ScenarioEngine._exec_task(t, datasets)) | |
| return "\n".join(sections).strip() | |
| def _get_df(datasets: Dict[str, Any], key: Optional[str]) -> Optional[pd.DataFrame]: | |
| if key and key in datasets: | |
| v = datasets[key] | |
| else: | |
| v = next((vv for vv in datasets.values() if isinstance(vv, (list, dict, pd.DataFrame))), None) | |
| return ScenarioEngine._as_df(v) if v is not None else None | |
| def _apply_filter(df: pd.DataFrame, expr: str) -> pd.DataFrame: | |
| m = _eval_series_expr(expr, df) | |
| return df.loc[m.astype(bool)].copy() | |
| def _apply_derive(df: pd.DataFrame, spec: str) -> pd.DataFrame: | |
| parts = re.split(r'[;,]\s*', spec) | |
| for p in parts: | |
| if "=" in p: | |
| col, expr = p.split("=", 1) | |
| df[col.strip()] = _eval_series_expr(expr.strip(), df) | |
| return df | |
| def _parse_aggs(spec: Optional[str]) -> List[Tuple[str, str]]: | |
| if not spec: return [] | |
| out = [] | |
| for it in [x.strip() for x in spec.split(",") if x.strip()]: | |
| if it.lower() in ("count","count(*)"): | |
| out.append(("count","count(*)")); continue | |
| m = re.match(r'([a-zA-Z_][a-zA-Z0-9_]*)\(([^)]+)\)', it) | |
| if not m: continue | |
| func, arg = m.group(1).lower(), m.group(2).strip() | |
| out.append((f"{func}_{arg}", f"{func}({arg})")) | |
| return out | |
| def _apply_agg_call(df: pd.DataFrame, call: str): | |
| call = call.strip() | |
| if call.lower() in ("count","count(*)"): return int(len(df)) | |
| m = re.match(r'([a-zA-Z_][a-zA-Z0-9_]*)\(([^)]+)\)', call) | |
| func, arg = m.group(1).lower(), m.group(2).strip() | |
| if arg not in df.columns: return None | |
| col = df[arg].dropna() | |
| if func in ("avg","mean"): return float(np.mean(col)) if len(col) else float("nan") | |
| if func == "median": return float(np.median(col)) if len(col) else float("nan") | |
| if func == "sum": return float(np.sum(col)) if len(col) else 0.0 | |
| if func in ("min","max"): return float(getattr(np, func)(col)) if len(col) else float("nan") | |
| if func.startswith("p") and func[1:].isdigit(): return float(np.percentile(col, int(func[1:]))) if len(col) else float("nan") | |
| return None | |
| def _group_agg(df: pd.DataFrame, group_by: Optional[List[str]], agg_spec: Optional[str]) -> pd.DataFrame: | |
| aggs = ScenarioEngine._parse_aggs(agg_spec) | |
| if not aggs and not group_by: return df | |
| if not group_by: | |
| return pd.DataFrame([{k: ScenarioEngine._apply_agg_call(df, call) for k, call in aggs}]) | |
| rows = [] | |
| gb = df.groupby(group_by, dropna=False) | |
| for keys, g in gb: | |
| if not isinstance(keys, tuple): keys = (keys,) | |
| rec = {group_by[i]: keys[i] for i in range(len(group_by))} | |
| for out_col, call in aggs: | |
| rec[out_col] = ScenarioEngine._apply_agg_call(g, call) | |
| rows.append(rec) | |
| return pd.DataFrame(rows) | |
| # -------- RENDERERS -------- | |
| def _render_table(df: pd.DataFrame) -> str: | |
| if df.empty: return "_No rows._" | |
| dff = df.copy() | |
| for c in dff.columns: | |
| dff[c] = dff[c].apply(lambda v: "NaN" if (isinstance(v,float) and math.isnan(v)) else f"{v:,.4g}" if isinstance(v,float) else v) | |
| header = "| " + " | ".join(dff.columns) + " |" | |
| sep = "|" + "|".join(["---"] * len(dff.columns)) + "|" | |
| rows = ["| " + " | ".join(map(str, r)) + " |" for r in dff.to_numpy().tolist()] | |
| return "\n".join([header, sep, *rows]) | |
| def _exec_task(t: TaskPlan, datasets: Dict[str, Any]) -> str: | |
| section = [f"## {t.title}\n"] | |
| df = ScenarioEngine._get_df(datasets, t.data_key) | |
| if df is None or df.empty: | |
| section.append("_No matching data for this task._") | |
| return "\n".join(section) | |
| # Resolve semantic roles dynamically | |
| if t.group_by: | |
| t.group_by = resolve_cols(t.group_by, df.columns.tolist()) | |
| if t.filter: df = ScenarioEngine._apply_filter(df, t.filter) | |
| if t.derive: | |
| for d in t.derive: df = ScenarioEngine._apply_derive(df, d) | |
| if t.group_by or t.agg: | |
| df = ScenarioEngine._group_agg(df, t.group_by, ", ".join(t.agg or [])) | |
| if t.sort_by and t.sort_by in df.columns: | |
| df = df.sort_values(by=t.sort_by, ascending=(t.sort_dir or "desc").lower()=="asc") | |
| if t.top and t.top > 0: | |
| df = df.head(t.top) | |
| if t.fields: | |
| cols = resolve_cols(t.fields, df.columns.tolist()) | |
| cols = [c for c in cols if c in df.columns] | |
| if cols: df = df[cols] | |
| section.append(ScenarioEngine._render_table(df)) | |
| return "\n".join(section) | |