"""analyze_segment — bucket rows into segments (KM-608). An analytical "family" tool: in ONE call it bins a numeric column into segments and reports how rows distribute across them (count, and optionally an aggregate of another column per segment). Two binning modes: explicit cut "edges" (e.g. age 0-18-35-60) or equal-frequency "quantile" buckets (quartiles, deciles). Answers questions like "split customers into age brackets" or "bucket orders into value tiers". STATUS: compute layer only — the function takes an already-materialized DataFrame. The wrapper layer (fetching data from the catalog via source_id, the ToolOutput envelope, ToolSpec registration) is added once the Planner seam (KM-418) is settled. Keeping compute separate from data-fetching makes this function easy to unit-test in isolation and stable when wrapped. """ from __future__ import annotations import math import pandas as pd from src.tools.analytics.descriptive import ColumnNotFoundError # Binning strategies. SUPPORTED_METHODS = ("edges", "quantile") # How to aggregate the value column within each segment. SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median") class InvalidMethodError(ValueError): """The requested binning method is not supported (maps to INVALID_METHOD).""" class NonNumericColumnError(ValueError): """The column to segment on is not numeric (maps to NON_NUMERIC_COLUMN).""" class UnsupportedAggregationError(ValueError): """The requested aggregation is not supported (maps to UNSUPPORTED_AGG).""" def _clean(value: object) -> object: """Convert numpy scalars to plain Python; NaN -> None for JSON-clean output.""" if value is None: return None if hasattr(value, "item"): value = value.item() if isinstance(value, float) and math.isnan(value): return None return value # Prompt-style description read by the Planner to decide WHEN to pick this tool. # Final destination is ToolSpec.description once the wrapper layer is built. DESCRIPTION = """\ Summary: Bins a NUMERIC column into segments and counts how rows distribute \ across them (optionally aggregating another column per segment). Two modes: \ explicit cut edges (e.g. age 0-18-35-60) or equal-frequency quantile buckets \ (quartiles, deciles). USE WHEN the question asks to bucket/bracket a continuous number into ranges. \ Trigger words: "segment" (segmen), "bucket/bracket" (kelompokkan ke rentang), \ "age groups/tiers" (kelompok umur/tingkatan), "quartiles/deciles", "bins". DON'T USE WHEN: - the category already exists (no binning needed) -> analyze_contribution - it aggregates by an existing key -> analyze_aggregate - it compares two named groups -> analyze_comparison Example questions: - "split customers into age brackets 0-18, 18-35, 35-60" - "bucket orders into value tiers" - "divide users into spending quartiles" - "how many customers fall in each income band?" """ def analyze_segment( df: pd.DataFrame, column: str, bins: list[float] | int, method: str = "edges", labels: list[str] | None = None, value_column: str | None = None, agg: str = "sum", ) -> dict[str, object]: """Segment rows by binning a numeric column. Args: df: already-materialized data (in the real system the wrapper fetches this from a source_id). column: numeric column to bin on. bins: for method "edges", the list of cut boundaries (e.g. [0, 18, 35, 60]); for method "quantile", the number of equal- frequency buckets (e.g. 4 for quartiles). method: "edges" (explicit boundaries) or "quantile" (equal frequency). labels: optional segment names; for "edges" there must be len(bins) - 1 of them. value_column: if given, also aggregate this column per segment. agg: how to aggregate value_column — one of SUPPORTED_AGGS. Returns: dict with: column, method — echo of the chosen settings agg — present only when value_column is given segments — [{"segment", "count", ("value")}], in bin order Raises: ColumnNotFoundError: if column or value_column is absent. NonNumericColumnError: if column is not numeric. InvalidMethodError: if method is unknown. UnsupportedAggregationError: if agg is not supported. """ referenced = [column] + ([value_column] if value_column else []) missing = [c for c in referenced if c not in df.columns] if missing: raise ColumnNotFoundError(f"columns not found: {missing}") if not pd.api.types.is_numeric_dtype(df[column]): raise NonNumericColumnError(f"column '{column}' is not numeric") if method not in SUPPORTED_METHODS: raise InvalidMethodError( f"unknown method '{method}'; supported: {list(SUPPORTED_METHODS)}" ) if value_column is not None and agg not in SUPPORTED_AGGS: raise UnsupportedAggregationError( f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}" ) if method == "edges": cats = pd.cut(df[column], bins=bins, labels=labels, include_lowest=True) else: # quantile cats = pd.qcut(df[column], q=bins, labels=labels, duplicates="drop") grouped = df.groupby(cats, observed=False) counts = grouped.size() segments = [] for seg in counts.index: row = {"segment": str(seg), "count": int(counts[seg])} if value_column is not None: row["value"] = _clean(grouped[value_column].agg(agg).get(seg)) segments.append(row) out: dict[str, object] = {"column": column, "method": method, "segments": segments} if value_column is not None: out["agg"] = agg return out