| """analyze_segment — bucket rows into segments (KM-608). |
| |
| An analytical "family" tool: in ONE call it bins a numeric column into |
| segments and reports how rows distribute across them (count, and optionally an |
| aggregate of another column per segment). Two binning modes: explicit cut |
| "edges" (e.g. age 0-18-35-60) or equal-frequency "quantile" buckets (quartiles, |
| deciles). Answers questions like "split customers into age brackets" or "bucket |
| orders into value tiers". |
| |
| STATUS: compute layer only — the function takes an already-materialized |
| DataFrame. The wrapper layer (fetching data from the catalog via source_id, |
| the ToolOutput envelope, ToolSpec registration) is added once the Planner |
| seam (KM-418) is settled. Keeping compute separate from data-fetching makes |
| this function easy to unit-test in isolation and stable when wrapped. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
|
|
| import pandas as pd |
|
|
| from src.tools.analytics.descriptive import ColumnNotFoundError |
|
|
| |
| SUPPORTED_METHODS = ("edges", "quantile") |
|
|
| |
| SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median") |
|
|
|
|
| class InvalidMethodError(ValueError): |
| """The requested binning method is not supported (maps to INVALID_METHOD).""" |
|
|
|
|
| class NonNumericColumnError(ValueError): |
| """The column to segment on is not numeric (maps to NON_NUMERIC_COLUMN).""" |
|
|
|
|
| class UnsupportedAggregationError(ValueError): |
| """The requested aggregation is not supported (maps to UNSUPPORTED_AGG).""" |
|
|
|
|
| def _clean(value: object) -> object: |
| """Convert numpy scalars to plain Python; NaN -> None for JSON-clean output.""" |
| if value is None: |
| return None |
| if hasattr(value, "item"): |
| value = value.item() |
| if isinstance(value, float) and math.isnan(value): |
| return None |
| return value |
|
|
|
|
| |
| |
| DESCRIPTION = """\ |
| Summary: Bins a NUMERIC column into segments and counts how rows distribute \ |
| across them (optionally aggregating another column per segment). Two modes: \ |
| explicit cut edges (e.g. age 0-18-35-60) or equal-frequency quantile buckets \ |
| (quartiles, deciles). |
| |
| USE WHEN the question asks to bucket/bracket a continuous number into ranges. \ |
| Trigger words: "segment" (segmen), "bucket/bracket" (kelompokkan ke rentang), \ |
| "age groups/tiers" (kelompok umur/tingkatan), "quartiles/deciles", "bins". |
| |
| DON'T USE WHEN: |
| - the category already exists (no binning needed) -> analyze_contribution |
| - it aggregates by an existing key -> analyze_aggregate |
| - it compares two named groups -> analyze_comparison |
| |
| Example questions: |
| - "split customers into age brackets 0-18, 18-35, 35-60" |
| - "bucket orders into value tiers" |
| - "divide users into spending quartiles" |
| - "how many customers fall in each income band?" |
| """ |
|
|
|
|
| def analyze_segment( |
| df: pd.DataFrame, |
| column: str, |
| bins: list[float] | int, |
| method: str = "edges", |
| labels: list[str] | None = None, |
| value_column: str | None = None, |
| agg: str = "sum", |
| ) -> dict[str, object]: |
| """Segment rows by binning a numeric column. |
| |
| Args: |
| df: already-materialized data (in the real system the wrapper fetches |
| this from a source_id). |
| column: numeric column to bin on. |
| bins: for method "edges", the list of cut boundaries (e.g. |
| [0, 18, 35, 60]); for method "quantile", the number of equal- |
| frequency buckets (e.g. 4 for quartiles). |
| method: "edges" (explicit boundaries) or "quantile" (equal frequency). |
| labels: optional segment names; for "edges" there must be |
| len(bins) - 1 of them. |
| value_column: if given, also aggregate this column per segment. |
| agg: how to aggregate value_column — one of SUPPORTED_AGGS. |
| |
| Returns: |
| dict with: |
| column, method — echo of the chosen settings |
| agg — present only when value_column is given |
| segments — [{"segment", "count", ("value")}], in bin order |
| |
| Raises: |
| ColumnNotFoundError: if column or value_column is absent. |
| NonNumericColumnError: if column is not numeric. |
| InvalidMethodError: if method is unknown. |
| UnsupportedAggregationError: if agg is not supported. |
| """ |
| referenced = [column] + ([value_column] if value_column else []) |
| missing = [c for c in referenced if c not in df.columns] |
| if missing: |
| raise ColumnNotFoundError(f"columns not found: {missing}") |
| if not pd.api.types.is_numeric_dtype(df[column]): |
| raise NonNumericColumnError(f"column '{column}' is not numeric") |
| if method not in SUPPORTED_METHODS: |
| raise InvalidMethodError( |
| f"unknown method '{method}'; supported: {list(SUPPORTED_METHODS)}" |
| ) |
| if value_column is not None and agg not in SUPPORTED_AGGS: |
| raise UnsupportedAggregationError( |
| f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}" |
| ) |
|
|
| if method == "edges": |
| cats = pd.cut(df[column], bins=bins, labels=labels, include_lowest=True) |
| else: |
| cats = pd.qcut(df[column], q=bins, labels=labels, duplicates="drop") |
|
|
| grouped = df.groupby(cats, observed=False) |
| counts = grouped.size() |
|
|
| segments = [] |
| for seg in counts.index: |
| row = {"segment": str(seg), "count": int(counts[seg])} |
| if value_column is not None: |
| row["value"] = _clean(grouped[value_column].agg(agg).get(seg)) |
| segments.append(row) |
|
|
| out: dict[str, object] = {"column": column, "method": method, "segments": segments} |
| if value_column is not None: |
| out["agg"] = agg |
| return out |
|
|