sofhiaazzhr's picture
[NOTICKET] Adopt verb-first skill naming
2d6406d
Raw
History Blame
4.89 kB
"""analyze_aggregate — group-by aggregation (KM-608).
An analytical "family" tool: in ONE call it groups rows by one or more keys
and computes aggregates (sum, mean, count, min, max, median, nunique) per
group. This is the deterministic compute twin of the Planner's
`retrieve_data` step — the wrapper layer later maps a QueryIR onto this.
STATUS: compute layer only — the function takes an already-materialized
DataFrame. The wrapper layer (fetching data from the catalog via source_id,
the ToolOutput envelope, ToolSpec registration) is added once the Planner
seam (KM-418) is settled. Keeping compute separate from data-fetching makes
this function easy to unit-test in isolation and stable when wrapped.
"""
from __future__ import annotations
import pandas as pd
from src.tools.analytics.descriptive import ColumnNotFoundError
# Aggregation functions the tool understands. Whitelisted on purpose so an
# unknown function fails loudly instead of silently doing the wrong thing.
SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median", "nunique")
class UnsupportedAggregationError(ValueError):
"""Requested aggregation is not in SUPPORTED_AGGS (maps to UNSUPPORTED_AGG)."""
def _clean(value: object) -> object:
"""Convert numpy/pandas scalars to plain Python so the output is JSON-clean.
A `group_by` over a datetime column yields `pandas.Timestamp` group keys,
which (like numpy scalars) are not JSON-serializable — normalise those too.
"""
if isinstance(value, pd.Timestamp):
return value.isoformat()
if hasattr(value, "item"):
return value.item()
return value
# Prompt-style description read by the Planner to decide WHEN to pick this tool.
# Final destination is ToolSpec.description once the wrapper layer is built.
DESCRIPTION = """\
Summary: Group-by aggregation. Splits rows by one or more key columns and \
computes aggregates per group (sum, mean, count, min, max, median, nunique). \
Returns one row per group.
USE WHEN the question groups a metric by a category — the tell-tale sign is \
"per"/"each"/"by" a dimension. Trigger words: "per/each" (per/tiap), "by" \
(berdasarkan), "breakdown", "total/average ... per ...".
DON'T USE WHEN:
- it summarizes a column with no grouping -> analyze_descriptive
- it compares two specific groups (A vs B) -> analyze_comparison
- it splits a single total into shares -> analyze_contribution
- the grouping is over time periods -> analyze_trend
Example questions:
- "total revenue per region"
- "average order value by customer segment"
- "how many distinct products were sold per store?"
- "count of orders for each status"
"""
def analyze_aggregate(
df: pd.DataFrame,
aggregations: dict[str, list[str]],
group_by: list[str] | None = None,
) -> list[dict[str, object]]:
"""Group-by aggregation over one or many keys.
Args:
df: already-materialized data (in the real system the wrapper fetches
this from a source_id).
aggregations: which columns to aggregate and how, e.g.
{"revenue": ["sum", "mean"], "order_id": ["count"]}.
group_by: grouping keys. If None/empty, the whole table is aggregated
into a single row.
Returns:
list[dict]: one row per group. Each row holds the group keys plus a
column per aggregate, named "<column>_<func>" (e.g. "revenue_sum").
Raises:
ColumnNotFoundError: if any group_by or aggregated column is absent.
UnsupportedAggregationError: if a requested function is not supported.
"""
group_by = group_by or []
# Validate columns first (fail-fast on caller mistakes).
referenced = list(group_by) + list(aggregations.keys())
missing = [c for c in referenced if c not in df.columns]
if missing:
raise ColumnNotFoundError(f"columns not found: {missing}")
# Validate aggregation functions.
for col, funcs in aggregations.items():
bad = [f for f in funcs if f not in SUPPORTED_AGGS]
if bad:
raise UnsupportedAggregationError(
f"unsupported aggregation(s) {bad} for column '{col}'; "
f"supported: {list(SUPPORTED_AGGS)}"
)
# Build named aggregations: {"revenue_sum": ("revenue", "sum"), ...}
named = {
f"{col}_{func}": (col, func)
for col, funcs in aggregations.items()
for func in funcs
}
# No grouping → aggregate the entire table into a single row.
if not group_by:
row = {name: _clean(df[col].agg(func)) for name, (col, func) in named.items()}
return [row]
# dropna=False keeps groups whose key is null so completeness is honest.
grouped = df.groupby(group_by, dropna=False).agg(**named).reset_index()
return [{k: _clean(v) for k, v in record.items()} for record in grouped.to_dict("records")]