uk_housing / plan_executor.py
seanerons's picture
Upload 7 files
3512a86 verified
import pandas as pd
ALLOWED_OPS = {"mean", "median", "count"}
def execute_plan(df: pd.DataFrame, plan: dict) -> pd.DataFrame:
q = df.copy()
# filters
for col, rule in (plan.get("filters") or {}).items():
if col not in q.columns:
raise ValueError("Unknown column: %s" % col)
if not isinstance(rule, dict):
raise ValueError("Bad filter rule for %s" % col)
if "eq" in rule:
q = q[q[col] == rule["eq"]]
if "in" in rule:
q = q[q[col].isin(rule["in"])]
if "not_in" in rule:
q = q[~q[col].isin(rule["not_in"])]
if "gte" in rule:
q = q[q[col] >= rule["gte"]]
if "lte" in rule:
q = q[q[col] <= rule["lte"]]
groupby = plan.get("groupby") or []
metrics = plan.get("metrics") or []
if groupby:
gb = q.groupby(groupby, dropna=False)
agg_dict = {}
for m in metrics:
col, op = m.get("col"), m.get("op")
label = m.get("label", "%s_%s" % (op, col))
if op not in ALLOWED_OPS:
raise ValueError("Unsupported op: %s" % op)
if op == "count":
agg_dict[label] = (col, "count")
else:
agg_dict[label] = (col, op)
res = gb.agg(**agg_dict).reset_index() if agg_dict else gb.size().reset_index(name="count")
else:
# global summary
rows = {}
for m in metrics:
col, op = m.get("col"), m.get("op")
label = m.get("label", "%s_%s" % (op, col))
if op not in ALLOWED_OPS:
raise ValueError("Unsupported op: %s" % op)
if op == "count":
rows[label] = int(q[col].count())
else:
rows[label] = float(getattr(q[col], op)())
res = pd.DataFrame([rows]) if rows else q.head(20)
for s in (plan.get("sort_by") or []):
res = res.sort_values(s.get("col"), ascending=bool(s.get("asc", True)))
limit = min(int(plan.get("limit", 20)), 50)
return res.head(limit)