"""analyze_contribution — share-of-total per category (KM-608). An analytical "family" tool: in ONE call it breaks a total down into each category's contribution (absolute value, share of total, and running cumulative share), sorted largest-first. This is a single snapshot — "who makes up the total right now" — as opposed to analyze_comparison (two groups) or analyze_trend (movement over time). Answers questions like "which region drives most of revenue?" and supports Pareto (80/20) reasoning. STATUS: compute layer only — the function takes an already-materialized DataFrame. The wrapper layer (fetching data from the catalog via source_id, the ToolOutput envelope, ToolSpec registration) is added once the Planner seam (KM-418) is settled. Keeping compute separate from data-fetching makes this function easy to unit-test in isolation and stable when wrapped. """ from __future__ import annotations import pandas as pd from src.tools.analytics.descriptive import ColumnNotFoundError # Share-of-total is most meaningful for additive aggregates (sum/count), but # mean/min/max are allowed; "total" is then the sum of the group aggregates. SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median") class UnsupportedAggregationError(ValueError): """The requested aggregation is not supported (maps to error_code UNSUPPORTED_AGG).""" def _clean(value: object) -> object: """Convert numpy scalars to plain Python so the output is JSON-clean.""" if hasattr(value, "item"): return value.item() return value # Prompt-style description read by the Planner to decide WHEN to pick this tool. # Final destination is ToolSpec.description once the wrapper layer is built. DESCRIPTION = """\ Summary: Breaks a single total into per-category contributions (share of total) \ in one snapshot, largest first, with cumulative share. Supports Pareto (80/20) \ reasoning. Optional top_n folds the long tail into "Others". USE WHEN the question is about how a whole splits into parts, or which \ categories dominate. Trigger words: "contribution" (kontribusi), "share" \ (porsi/proporsi), "% of total" (persen dari total), "Pareto/80-20", "top \ contributors", "which ... make up most". DON'T USE WHEN: - it pits two specific groups against each other -> analyze_comparison - it tracks change over time -> analyze_trend - it just aggregates per group without share-of-total -> analyze_aggregate Example questions: - "which products contribute most to total sales?" - "what share of revenue comes from each region?" - "top 5 customers by contribution to profit" - "do 20% of items make up 80% of revenue?" """ def analyze_contribution( df: pd.DataFrame, dimension: str, value_column: str, agg: str = "sum", top_n: int | None = None, ) -> dict[str, object]: """Contribution (share of total) of each category, largest first. Args: df: already-materialized data (in the real system the wrapper fetches this from a source_id). dimension: categorical column to break the total down by. value_column: numeric column to aggregate per category. agg: how to aggregate within each category — one of SUPPORTED_AGGS. top_n: if set, keep the top N categories and lump the remainder into a single "Others" row. Returns: dict with: dimension, value_column, agg — echo of the chosen settings total — sum of all category aggregates items — [{"category", "value", "share", "cumulative_share"}] largest first Raises: ColumnNotFoundError: if dimension or value_column is absent. UnsupportedAggregationError: if agg is not supported. """ missing = [c for c in (dimension, value_column) if c not in df.columns] if missing: raise ColumnNotFoundError(f"columns not found: {missing}") if agg not in SUPPORTED_AGGS: raise UnsupportedAggregationError( f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}" ) grouped = df.groupby(dimension, dropna=False)[value_column].agg(agg) grouped = grouped.sort_values(ascending=False) pairs = list(grouped.items()) # Optionally collapse the long tail into a single "Others" bucket. if top_n is not None and len(pairs) > top_n: head = pairs[:top_n] others_value = sum(val for _, val in pairs[top_n:]) head.append(("Others", others_value)) pairs = head total = sum(val for _, val in pairs) items = [] cumulative = 0.0 for cat, val in pairs: share = (val / total) if total else None if share is not None: cumulative += share items.append( { "category": _clean(cat), "value": _clean(val), "share": share, "cumulative_share": cumulative if total else None, } ) return { "dimension": dimension, "value_column": value_column, "agg": agg, "total": _clean(total), "items": items, }