| """analyze_aggregate — group-by aggregation (KM-608). |
| |
| An analytical "family" tool: in ONE call it groups rows by one or more keys |
| and computes aggregates (sum, mean, count, min, max, median, nunique) per |
| group. This is the deterministic compute twin of the Planner's |
| `retrieve_data` step — the wrapper layer later maps a QueryIR onto this. |
| |
| STATUS: compute layer only — the function takes an already-materialized |
| DataFrame. The wrapper layer (fetching data from the catalog via source_id, |
| the ToolOutput envelope, ToolSpec registration) is added once the Planner |
| seam (KM-418) is settled. Keeping compute separate from data-fetching makes |
| this function easy to unit-test in isolation and stable when wrapped. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import pandas as pd |
|
|
| from src.tools.analytics.descriptive import ColumnNotFoundError |
|
|
| |
| |
| SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median", "nunique") |
|
|
|
|
| class UnsupportedAggregationError(ValueError): |
| """Requested aggregation is not in SUPPORTED_AGGS (maps to UNSUPPORTED_AGG).""" |
|
|
|
|
| def _clean(value: object) -> object: |
| """Convert numpy/pandas scalars to plain Python so the output is JSON-clean. |
| |
| A `group_by` over a datetime column yields `pandas.Timestamp` group keys, |
| which (like numpy scalars) are not JSON-serializable — normalise those too. |
| """ |
| if isinstance(value, pd.Timestamp): |
| return value.isoformat() |
| if hasattr(value, "item"): |
| return value.item() |
| return value |
|
|
|
|
| |
| |
| DESCRIPTION = """\ |
| Summary: Group-by aggregation. Splits rows by one or more key columns and \ |
| computes aggregates per group (sum, mean, count, min, max, median, nunique). \ |
| Returns one row per group. |
| |
| USE WHEN the question groups a metric by a category — the tell-tale sign is \ |
| "per"/"each"/"by" a dimension. Trigger words: "per/each" (per/tiap), "by" \ |
| (berdasarkan), "breakdown", "total/average ... per ...". |
| |
| DON'T USE WHEN: |
| - it summarizes a column with no grouping -> analyze_descriptive |
| - it compares two specific groups (A vs B) -> analyze_comparison |
| - it splits a single total into shares -> analyze_contribution |
| - the grouping is over time periods -> analyze_trend |
| |
| Example questions: |
| - "total revenue per region" |
| - "average order value by customer segment" |
| - "how many distinct products were sold per store?" |
| - "count of orders for each status" |
| """ |
|
|
|
|
| def analyze_aggregate( |
| df: pd.DataFrame, |
| aggregations: dict[str, list[str]], |
| group_by: list[str] | None = None, |
| ) -> list[dict[str, object]]: |
| """Group-by aggregation over one or many keys. |
| |
| Args: |
| df: already-materialized data (in the real system the wrapper fetches |
| this from a source_id). |
| aggregations: which columns to aggregate and how, e.g. |
| {"revenue": ["sum", "mean"], "order_id": ["count"]}. |
| group_by: grouping keys. If None/empty, the whole table is aggregated |
| into a single row. |
| |
| Returns: |
| list[dict]: one row per group. Each row holds the group keys plus a |
| column per aggregate, named "<column>_<func>" (e.g. "revenue_sum"). |
| |
| Raises: |
| ColumnNotFoundError: if any group_by or aggregated column is absent. |
| UnsupportedAggregationError: if a requested function is not supported. |
| """ |
| group_by = group_by or [] |
|
|
| |
| referenced = list(group_by) + list(aggregations.keys()) |
| missing = [c for c in referenced if c not in df.columns] |
| if missing: |
| raise ColumnNotFoundError(f"columns not found: {missing}") |
|
|
| |
| for col, funcs in aggregations.items(): |
| bad = [f for f in funcs if f not in SUPPORTED_AGGS] |
| if bad: |
| raise UnsupportedAggregationError( |
| f"unsupported aggregation(s) {bad} for column '{col}'; " |
| f"supported: {list(SUPPORTED_AGGS)}" |
| ) |
|
|
| |
| named = { |
| f"{col}_{func}": (col, func) |
| for col, funcs in aggregations.items() |
| for func in funcs |
| } |
|
|
| |
| if not group_by: |
| row = {name: _clean(df[col].agg(func)) for name, (col, func) in named.items()} |
| return [row] |
|
|
| |
| grouped = df.groupby(group_by, dropna=False).agg(**named).reset_index() |
| return [{k: _clean(v) for k, v in record.items()} for record in grouped.to_dict("records")] |
|
|