ishaq101's picture
feat/Planner Agent (#2)
81e5fe7
Raw
History Blame
5.23 kB
"""analyze_contribution — share-of-total per category (KM-608).
An analytical "family" tool: in ONE call it breaks a total down into each
category's contribution (absolute value, share of total, and running
cumulative share), sorted largest-first. This is a single snapshot — "who
makes up the total right now" — as opposed to analyze_comparison (two groups)
or analyze_trend (movement over time). Answers questions like "which region
drives most of revenue?" and supports Pareto (80/20) reasoning.
STATUS: compute layer only — the function takes an already-materialized
DataFrame. The wrapper layer (fetching data from the catalog via source_id,
the ToolOutput envelope, ToolSpec registration) is added once the Planner
seam (KM-418) is settled. Keeping compute separate from data-fetching makes
this function easy to unit-test in isolation and stable when wrapped.
"""
from __future__ import annotations
import pandas as pd
from src.tools.analytics.descriptive import ColumnNotFoundError
# Share-of-total is most meaningful for additive aggregates (sum/count), but
# mean/min/max are allowed; "total" is then the sum of the group aggregates.
SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median")
class UnsupportedAggregationError(ValueError):
"""The requested aggregation is not supported (maps to error_code UNSUPPORTED_AGG)."""
def _clean(value: object) -> object:
"""Convert numpy scalars to plain Python so the output is JSON-clean."""
if hasattr(value, "item"):
return value.item()
return value
# Prompt-style description read by the Planner to decide WHEN to pick this tool.
# Final destination is ToolSpec.description once the wrapper layer is built.
DESCRIPTION = """\
Summary: Breaks a single total into per-category contributions (share of total) \
in one snapshot, largest first, with cumulative share. Supports Pareto (80/20) \
reasoning. Optional top_n folds the long tail into "Others".
USE WHEN the question is about how a whole splits into parts, or which \
categories dominate. Trigger words: "contribution" (kontribusi), "share" \
(porsi/proporsi), "% of total" (persen dari total), "Pareto/80-20", "top \
contributors", "which ... make up most".
DON'T USE WHEN:
- it pits two specific groups against each other -> analyze_comparison
- it tracks change over time -> analyze_trend
- it just aggregates per group without share-of-total -> analyze_aggregate
Example questions:
- "which products contribute most to total sales?"
- "what share of revenue comes from each region?"
- "top 5 customers by contribution to profit"
- "do 20% of items make up 80% of revenue?"
"""
def analyze_contribution(
df: pd.DataFrame,
dimension: str,
value_column: str,
agg: str = "sum",
top_n: int | None = None,
) -> dict[str, object]:
"""Contribution (share of total) of each category, largest first.
Args:
df: already-materialized data (in the real system the wrapper fetches
this from a source_id).
dimension: categorical column to break the total down by.
value_column: numeric column to aggregate per category.
agg: how to aggregate within each category — one of SUPPORTED_AGGS.
top_n: if set, keep the top N categories and lump the remainder into a
single "Others" row.
Returns:
dict with:
dimension, value_column, agg — echo of the chosen settings
total — sum of all category aggregates
items — [{"category", "value", "share",
"cumulative_share"}] largest first
Raises:
ColumnNotFoundError: if dimension or value_column is absent.
UnsupportedAggregationError: if agg is not supported.
"""
missing = [c for c in (dimension, value_column) if c not in df.columns]
if missing:
raise ColumnNotFoundError(f"columns not found: {missing}")
if agg not in SUPPORTED_AGGS:
raise UnsupportedAggregationError(
f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}"
)
grouped = df.groupby(dimension, dropna=False)[value_column].agg(agg)
grouped = grouped.sort_values(ascending=False)
pairs = list(grouped.items())
# Optionally collapse the long tail into a single "Others" bucket.
if top_n is not None and len(pairs) > top_n:
head = pairs[:top_n]
others_value = sum(val for _, val in pairs[top_n:])
head.append(("Others", others_value))
pairs = head
total = sum(val for _, val in pairs)
items = []
cumulative = 0.0
for cat, val in pairs:
share = (val / total) if total else None
if share is not None:
cumulative += share
items.append(
{
"category": _clean(cat),
"value": _clean(val),
"share": share,
"cumulative_share": cumulative if total else None,
}
)
return {
"dimension": dimension,
"value_column": value_column,
"agg": agg,
"total": _clean(total),
"items": items,
}