ishaq101's picture
feat/Planner Agent (#2)
81e5fe7
Raw
History Blame
5.38 kB
"""analyze_comparison β€” compare a metric across two groups (KM-608).
An analytical "family" tool: in ONE call it aggregates a value for two groups
of a dimension (e.g. region "A" vs "B", channel "online" vs "store") and
reports the gap between them β€” absolute difference, percent difference, and
direction. group_a is treated as the baseline. Answers questions like
"how does revenue in region A compare to region B?".
STATUS: compute layer only β€” the function takes an already-materialized
DataFrame. The wrapper layer (fetching data from the catalog via source_id,
the ToolOutput envelope, ToolSpec registration) is added once the Planner
seam (KM-418) is settled. Keeping compute separate from data-fetching makes
this function easy to unit-test in isolation and stable when wrapped.
"""
from __future__ import annotations
import pandas as pd
from src.tools.analytics.descriptive import ColumnNotFoundError
# How to aggregate the value within each group before comparing.
SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median")
class UnsupportedAggregationError(ValueError):
"""The requested aggregation is not supported (maps to error_code UNSUPPORTED_AGG)."""
class GroupNotFoundError(ValueError):
"""A requested group value does not occur in the dimension column (maps to GROUP_NOT_FOUND)."""
# Prompt-style description read by the Planner to decide WHEN to pick this tool.
# Final destination is ToolSpec.description once the wrapper layer is built.
DESCRIPTION = """\
Summary: Head-to-head comparison of one aggregated metric between TWO specific \
groups of a dimension (group_a is the baseline). Reports each group's value, \
the absolute and percent difference, and which side is higher.
USE WHEN the question pits two named groups against each other. Trigger words: \
"vs"/"versus", "compare" (bandingkan), "A or B", "difference between" \
(selisih/beda antara), "higher/lower than".
SETTING GROUPS: group_a is the BASELINE (the reference). The "comparison" field \
reads as "group_b is {higher/lower/equal} than group_a", and diff = value_b - \
value_a. Put the reference/older/expected side in group_a. E.g. "is this year \
higher than last year" -> group_a=last year, group_b=this year.
DON'T USE WHEN:
- it aggregates across many groups at once -> analyze_aggregate
- it splits a single total into shares -> analyze_contribution
- it tracks change over time -> analyze_trend
Example questions:
- "compare revenue between Jakarta and Surabaya"
- "is the average order value higher for members or non-members?"
- "difference in churn between plan A and plan B"
- "male vs female average spend"
"""
def analyze_comparison(
df: pd.DataFrame,
dimension: str,
value_column: str,
group_a: object,
group_b: object,
agg: str = "sum",
) -> dict[str, object]:
"""Compare one aggregated metric between two groups of a dimension.
Args:
df: already-materialized data (in the real system the wrapper fetches
this from a source_id).
dimension: the categorical column whose values define the two groups.
value_column: numeric column to aggregate for each group.
group_a: baseline group value (the "from").
group_b: comparison group value (the "to").
agg: how to aggregate within each group β€” one of SUPPORTED_AGGS.
Returns:
dict with:
dimension, value_column, agg β€” echo of the chosen settings
group_a, value_a β€” baseline group + its aggregate
group_b, value_b β€” comparison group + its aggregate
diff_abs β€” value_b - value_a
diff_pct β€” diff_abs / value_a, or None if value_a == 0
comparison β€” "higher" | "lower" | "equal" (b relative to a)
Raises:
ColumnNotFoundError: if dimension or value_column is absent.
UnsupportedAggregationError: if agg is not supported.
GroupNotFoundError: if group_a or group_b has no rows.
"""
missing = [c for c in (dimension, value_column) if c not in df.columns]
if missing:
raise ColumnNotFoundError(f"columns not found: {missing}")
if agg not in SUPPORTED_AGGS:
raise UnsupportedAggregationError(
f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}"
)
rows_a = df.loc[df[dimension] == group_a, value_column]
rows_b = df.loc[df[dimension] == group_b, value_column]
empty = [g for g, rows in ((group_a, rows_a), (group_b, rows_b)) if rows.empty]
if empty:
raise GroupNotFoundError(
f"no rows for group(s) {empty} in column '{dimension}'"
)
value_a = float(rows_a.agg(agg))
value_b = float(rows_b.agg(agg))
diff_abs = value_b - value_a
diff_pct = (diff_abs / value_a) if value_a != 0 else None
if diff_abs > 0:
comparison = "higher"
elif diff_abs < 0:
comparison = "lower"
else:
comparison = "equal"
return {
"dimension": dimension,
"value_column": value_column,
"agg": agg,
"group_a": group_a,
"value_a": value_a,
"group_b": group_b,
"value_b": value_b,
"diff_abs": diff_abs,
"diff_pct": diff_pct,
"comparison": comparison,
}