ishaq101's picture
feat/Planner Agent (#2)
81e5fe7
Raw
History Blame
5.83 kB
"""analyze_segment — bucket rows into segments (KM-608).
An analytical "family" tool: in ONE call it bins a numeric column into
segments and reports how rows distribute across them (count, and optionally an
aggregate of another column per segment). Two binning modes: explicit cut
"edges" (e.g. age 0-18-35-60) or equal-frequency "quantile" buckets (quartiles,
deciles). Answers questions like "split customers into age brackets" or "bucket
orders into value tiers".
STATUS: compute layer only — the function takes an already-materialized
DataFrame. The wrapper layer (fetching data from the catalog via source_id,
the ToolOutput envelope, ToolSpec registration) is added once the Planner
seam (KM-418) is settled. Keeping compute separate from data-fetching makes
this function easy to unit-test in isolation and stable when wrapped.
"""
from __future__ import annotations
import math
import pandas as pd
from src.tools.analytics.descriptive import ColumnNotFoundError
# Binning strategies.
SUPPORTED_METHODS = ("edges", "quantile")
# How to aggregate the value column within each segment.
SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median")
class InvalidMethodError(ValueError):
"""The requested binning method is not supported (maps to INVALID_METHOD)."""
class NonNumericColumnError(ValueError):
"""The column to segment on is not numeric (maps to NON_NUMERIC_COLUMN)."""
class UnsupportedAggregationError(ValueError):
"""The requested aggregation is not supported (maps to UNSUPPORTED_AGG)."""
def _clean(value: object) -> object:
"""Convert numpy scalars to plain Python; NaN -> None for JSON-clean output."""
if value is None:
return None
if hasattr(value, "item"):
value = value.item()
if isinstance(value, float) and math.isnan(value):
return None
return value
# Prompt-style description read by the Planner to decide WHEN to pick this tool.
# Final destination is ToolSpec.description once the wrapper layer is built.
DESCRIPTION = """\
Summary: Bins a NUMERIC column into segments and counts how rows distribute \
across them (optionally aggregating another column per segment). Two modes: \
explicit cut edges (e.g. age 0-18-35-60) or equal-frequency quantile buckets \
(quartiles, deciles).
USE WHEN the question asks to bucket/bracket a continuous number into ranges. \
Trigger words: "segment" (segmen), "bucket/bracket" (kelompokkan ke rentang), \
"age groups/tiers" (kelompok umur/tingkatan), "quartiles/deciles", "bins".
DON'T USE WHEN:
- the category already exists (no binning needed) -> analyze_contribution
- it aggregates by an existing key -> analyze_aggregate
- it compares two named groups -> analyze_comparison
Example questions:
- "split customers into age brackets 0-18, 18-35, 35-60"
- "bucket orders into value tiers"
- "divide users into spending quartiles"
- "how many customers fall in each income band?"
"""
def analyze_segment(
df: pd.DataFrame,
column: str,
bins: list[float] | int,
method: str = "edges",
labels: list[str] | None = None,
value_column: str | None = None,
agg: str = "sum",
) -> dict[str, object]:
"""Segment rows by binning a numeric column.
Args:
df: already-materialized data (in the real system the wrapper fetches
this from a source_id).
column: numeric column to bin on.
bins: for method "edges", the list of cut boundaries (e.g.
[0, 18, 35, 60]); for method "quantile", the number of equal-
frequency buckets (e.g. 4 for quartiles).
method: "edges" (explicit boundaries) or "quantile" (equal frequency).
labels: optional segment names; for "edges" there must be
len(bins) - 1 of them.
value_column: if given, also aggregate this column per segment.
agg: how to aggregate value_column — one of SUPPORTED_AGGS.
Returns:
dict with:
column, method — echo of the chosen settings
agg — present only when value_column is given
segments — [{"segment", "count", ("value")}], in bin order
Raises:
ColumnNotFoundError: if column or value_column is absent.
NonNumericColumnError: if column is not numeric.
InvalidMethodError: if method is unknown.
UnsupportedAggregationError: if agg is not supported.
"""
referenced = [column] + ([value_column] if value_column else [])
missing = [c for c in referenced if c not in df.columns]
if missing:
raise ColumnNotFoundError(f"columns not found: {missing}")
if not pd.api.types.is_numeric_dtype(df[column]):
raise NonNumericColumnError(f"column '{column}' is not numeric")
if method not in SUPPORTED_METHODS:
raise InvalidMethodError(
f"unknown method '{method}'; supported: {list(SUPPORTED_METHODS)}"
)
if value_column is not None and agg not in SUPPORTED_AGGS:
raise UnsupportedAggregationError(
f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}"
)
if method == "edges":
cats = pd.cut(df[column], bins=bins, labels=labels, include_lowest=True)
else: # quantile
cats = pd.qcut(df[column], q=bins, labels=labels, duplicates="drop")
grouped = df.groupby(cats, observed=False)
counts = grouped.size()
segments = []
for seg in counts.index:
row = {"segment": str(seg), "count": int(counts[seg])}
if value_column is not None:
row["value"] = _clean(grouped[value_column].agg(agg).get(seg))
segments.append(row)
out: dict[str, object] = {"column": column, "method": method, "segments": segments}
if value_column is not None:
out["agg"] = agg
return out