| """analyze_contribution — share-of-total per category (KM-608). |
| |
| An analytical "family" tool: in ONE call it breaks a total down into each |
| category's contribution (absolute value, share of total, and running |
| cumulative share), sorted largest-first. This is a single snapshot — "who |
| makes up the total right now" — as opposed to analyze_comparison (two groups) |
| or analyze_trend (movement over time). Answers questions like "which region |
| drives most of revenue?" and supports Pareto (80/20) reasoning. |
| |
| STATUS: compute layer only — the function takes an already-materialized |
| DataFrame. The wrapper layer (fetching data from the catalog via source_id, |
| the ToolOutput envelope, ToolSpec registration) is added once the Planner |
| seam (KM-418) is settled. Keeping compute separate from data-fetching makes |
| this function easy to unit-test in isolation and stable when wrapped. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import pandas as pd |
|
|
| from src.tools.analytics.descriptive import ColumnNotFoundError |
|
|
| |
| |
| SUPPORTED_AGGS = ("sum", "mean", "count", "min", "max", "median") |
|
|
|
|
| class UnsupportedAggregationError(ValueError): |
| """The requested aggregation is not supported (maps to error_code UNSUPPORTED_AGG).""" |
|
|
|
|
| def _clean(value: object) -> object: |
| """Convert numpy scalars to plain Python so the output is JSON-clean.""" |
| if hasattr(value, "item"): |
| return value.item() |
| return value |
|
|
|
|
| |
| |
| DESCRIPTION = """\ |
| Summary: Breaks a single total into per-category contributions (share of total) \ |
| in one snapshot, largest first, with cumulative share. Supports Pareto (80/20) \ |
| reasoning. Optional top_n folds the long tail into "Others". |
| |
| USE WHEN the question is about how a whole splits into parts, or which \ |
| categories dominate. Trigger words: "contribution" (kontribusi), "share" \ |
| (porsi/proporsi), "% of total" (persen dari total), "Pareto/80-20", "top \ |
| contributors", "which ... make up most". |
| |
| DON'T USE WHEN: |
| - it pits two specific groups against each other -> analyze_comparison |
| - it tracks change over time -> analyze_trend |
| - it just aggregates per group without share-of-total -> analyze_aggregate |
| |
| Example questions: |
| - "which products contribute most to total sales?" |
| - "what share of revenue comes from each region?" |
| - "top 5 customers by contribution to profit" |
| - "do 20% of items make up 80% of revenue?" |
| """ |
|
|
|
|
| def analyze_contribution( |
| df: pd.DataFrame, |
| dimension: str, |
| value_column: str, |
| agg: str = "sum", |
| top_n: int | None = None, |
| ) -> dict[str, object]: |
| """Contribution (share of total) of each category, largest first. |
| |
| Args: |
| df: already-materialized data (in the real system the wrapper fetches |
| this from a source_id). |
| dimension: categorical column to break the total down by. |
| value_column: numeric column to aggregate per category. |
| agg: how to aggregate within each category — one of SUPPORTED_AGGS. |
| top_n: if set, keep the top N categories and lump the remainder into a |
| single "Others" row. |
| |
| Returns: |
| dict with: |
| dimension, value_column, agg — echo of the chosen settings |
| total — sum of all category aggregates |
| items — [{"category", "value", "share", |
| "cumulative_share"}] largest first |
| |
| Raises: |
| ColumnNotFoundError: if dimension or value_column is absent. |
| UnsupportedAggregationError: if agg is not supported. |
| """ |
| missing = [c for c in (dimension, value_column) if c not in df.columns] |
| if missing: |
| raise ColumnNotFoundError(f"columns not found: {missing}") |
| if agg not in SUPPORTED_AGGS: |
| raise UnsupportedAggregationError( |
| f"unsupported aggregation '{agg}'; supported: {list(SUPPORTED_AGGS)}" |
| ) |
|
|
| grouped = df.groupby(dimension, dropna=False)[value_column].agg(agg) |
| grouped = grouped.sort_values(ascending=False) |
| pairs = list(grouped.items()) |
|
|
| |
| if top_n is not None and len(pairs) > top_n: |
| head = pairs[:top_n] |
| others_value = sum(val for _, val in pairs[top_n:]) |
| head.append(("Others", others_value)) |
| pairs = head |
|
|
| total = sum(val for _, val in pairs) |
|
|
| items = [] |
| cumulative = 0.0 |
| for cat, val in pairs: |
| share = (val / total) if total else None |
| if share is not None: |
| cumulative += share |
| items.append( |
| { |
| "category": _clean(cat), |
| "value": _clean(val), |
| "share": share, |
| "cumulative_share": cumulative if total else None, |
| } |
| ) |
|
|
| return { |
| "dimension": dimension, |
| "value_column": value_column, |
| "agg": agg, |
| "total": _clean(total), |
| "items": items, |
| } |
|
|