ishaq101's picture
feat/Planner Agent (#2)
81e5fe7
Raw
History Blame
5.73 kB
"""analyze_descriptive — single/multi-column EDA (KM-608).
An analytical "family" tool: in ONE call it computes a column's center,
spread, shape, and completeness (mean, median, mode, std, variance,
quartiles, min/max, skew, null_rate).
STATUS: compute layer only — the function takes an already-materialized
DataFrame. The wrapper layer (fetching data from the catalog via source_id,
the ToolOutput envelope, ToolSpec registration) is added once the Planner
seam (KM-418) is settled. Keeping compute separate from data-fetching makes
this function easy to unit-test in isolation and stable when wrapped.
"""
from __future__ import annotations
import pandas as pd
# Default metrics used when the caller does not narrow them via `metrics`.
DEFAULT_METRICS = (
"count",
"mean",
"median",
"mode",
"std",
"var",
"q1",
"q3",
"min",
"max",
"skew",
"null_count",
"null_rate",
)
class ColumnNotFoundError(ValueError):
"""A requested column is absent from the DataFrame (maps to error_code COLUMN_NOT_FOUND)."""
def _clean(value: object) -> object:
"""Coerce a scalar to a JSON-clean Python value.
`mode` can be any dtype: an integer column yields `numpy.int64` (NOT
JSON-serializable), a datetime column yields `pandas.Timestamp`. The other
metrics are already wrapped in `float(...)`; mode is the one that needs this.
"""
if value is None:
return None
if isinstance(value, pd.Timestamp):
return value.isoformat()
if hasattr(value, "item"):
return value.item()
return value
def _describe_one(series: pd.Series, metrics: tuple[str, ...]) -> dict[str, object]:
"""Compute descriptive metrics for a single column.
Numeric metrics are computed over non-null values. `null_rate` & `count`
are computed over all rows (nulls included) so they reflect completeness
as-is. Undefined cases (e.g. std of a single value) return None — degrade
gracefully instead of raising.
"""
total = len(series)
non_null = series.dropna()
is_numeric = pd.api.types.is_numeric_dtype(series)
out: dict[str, object] = {}
for m in metrics:
if m == "count":
out["count"] = int(total)
elif m == "null_count":
out["null_count"] = int(series.isna().sum())
elif m == "null_rate":
out["null_rate"] = float(series.isna().mean()) if total else 0.0
elif m == "mode":
modes = non_null.mode()
out["mode"] = _clean(modes.iloc[0]) if not modes.empty else None
elif not is_numeric:
out[m] = None
elif m == "mean":
out["mean"] = float(non_null.mean()) if not non_null.empty else None
elif m == "median":
out["median"] = float(non_null.median()) if not non_null.empty else None
elif m == "std":
out["std"] = float(non_null.std()) if non_null.shape[0] > 1 else None
elif m == "var":
out["var"] = float(non_null.var()) if non_null.shape[0] > 1 else None
elif m == "q1":
out["q1"] = float(non_null.quantile(0.25)) if not non_null.empty else None
elif m == "q3":
out["q3"] = float(non_null.quantile(0.75)) if not non_null.empty else None
elif m == "min":
out["min"] = float(non_null.min()) if not non_null.empty else None
elif m == "max":
out["max"] = float(non_null.max()) if not non_null.empty else None
elif m == "skew":
out["skew"] = float(non_null.skew()) if non_null.shape[0] > 2 else None
return out
# Prompt-style description read by the Planner to decide WHEN to pick this tool.
# Final destination is ToolSpec.description once the wrapper layer is built.
DESCRIPTION = """\
Summary: Descriptive statistics (EDA) for one or several columns in a single \
call — center (mean, median, mode), spread (std, variance, min, max, Q1/Q3 \
quartiles), distribution shape (skew), and completeness (null count & rate).
USE WHEN the user asks for an overview, summary, or single-column statistics \
of ONE or SEVERAL columns as a whole, with NO grouping and NO comparison \
between groups. Trigger words: "overview/summary" (ringkasan), "average" \
(rata-rata), "median", "spread/distribution" (sebaran), "how many nulls" \
(berapa nilai kosong).
DON'T USE WHEN:
- the question groups by something ("per"/"each"/"by") -> analyze_aggregate
- it compares two specific groups (A vs B) -> analyze_comparison
- it tracks a metric over time -> analyze_trend
- it checks data type, quality, duplicates, outliers, constants -> analyze_profile
Example questions:
- "what's the average and median customer age?"
- "summarize the income column"
- "how is product price distributed?"
- "how many nulls are in the email column?"
"""
def analyze_descriptive(
df: pd.DataFrame,
column_ids: list[str],
metrics: list[str] | None = None,
) -> dict[str, dict[str, object]]:
"""Descriptive EDA for one or many columns.
Args:
df: already-materialized data (in the real system the wrapper fetches
this from a source_id).
column_ids: columns to analyze.
metrics: subset of metrics; defaults to all of DEFAULT_METRICS.
Returns:
dict: { column_id: { metric: value, ... }, ... }
Raises:
ColumnNotFoundError: if any column_id is absent from df.
"""
chosen = tuple(metrics) if metrics else DEFAULT_METRICS
missing = [c for c in column_ids if c not in df.columns]
if missing:
raise ColumnNotFoundError(f"columns not found: {missing}")
return {col: _describe_one(df[col], chosen) for col in column_ids}