| import pandas as pd
|
|
|
| ALLOWED_OPS = {"mean", "median", "count"}
|
|
|
| def execute_plan(df: pd.DataFrame, plan: dict) -> pd.DataFrame:
|
| q = df.copy()
|
|
|
|
|
| for col, rule in (plan.get("filters") or {}).items():
|
| if col not in q.columns:
|
| raise ValueError("Unknown column: %s" % col)
|
| if not isinstance(rule, dict):
|
| raise ValueError("Bad filter rule for %s" % col)
|
| if "eq" in rule:
|
| q = q[q[col] == rule["eq"]]
|
| if "in" in rule:
|
| q = q[q[col].isin(rule["in"])]
|
| if "not_in" in rule:
|
| q = q[~q[col].isin(rule["not_in"])]
|
| if "gte" in rule:
|
| q = q[q[col] >= rule["gte"]]
|
| if "lte" in rule:
|
| q = q[q[col] <= rule["lte"]]
|
|
|
| groupby = plan.get("groupby") or []
|
| metrics = plan.get("metrics") or []
|
|
|
| if groupby:
|
| gb = q.groupby(groupby, dropna=False)
|
| agg_dict = {}
|
| for m in metrics:
|
| col, op = m.get("col"), m.get("op")
|
| label = m.get("label", "%s_%s" % (op, col))
|
| if op not in ALLOWED_OPS:
|
| raise ValueError("Unsupported op: %s" % op)
|
| if op == "count":
|
| agg_dict[label] = (col, "count")
|
| else:
|
| agg_dict[label] = (col, op)
|
| res = gb.agg(**agg_dict).reset_index() if agg_dict else gb.size().reset_index(name="count")
|
| else:
|
|
|
| rows = {}
|
| for m in metrics:
|
| col, op = m.get("col"), m.get("op")
|
| label = m.get("label", "%s_%s" % (op, col))
|
| if op not in ALLOWED_OPS:
|
| raise ValueError("Unsupported op: %s" % op)
|
| if op == "count":
|
| rows[label] = int(q[col].count())
|
| else:
|
| rows[label] = float(getattr(q[col], op)())
|
| res = pd.DataFrame([rows]) if rows else q.head(20)
|
|
|
| for s in (plan.get("sort_by") or []):
|
| res = res.sort_values(s.get("col"), ascending=bool(s.get("asc", True)))
|
|
|
| limit = min(int(plan.get("limit", 20)), 50)
|
| return res.head(limit)
|
|
|