ishaq101's picture
feat/Planner Agent (#2)
81e5fe7
Raw
History Blame
5.33 kB
"""analyze_profile — per-column data-quality profile (KM-608).
An analytical "family" tool: in ONE call it profiles each column's health —
dtype, inferred type, completeness (null count/rate), cardinality (distinct
count/rate, constant flag), and — for numeric columns — min/max/mean plus an
IQR-based outlier count; for non-numeric columns the most frequent value.
Answers "is this data clean enough to analyze?" and surfaces issues (lots of
nulls, a constant column, outliers) before deeper analysis.
STATUS: compute layer only — the function takes an already-materialized
DataFrame. The wrapper layer (fetching data from the catalog via source_id,
the ToolOutput envelope, ToolSpec registration) is added once the Planner
seam (KM-418) is settled. Keeping compute separate from data-fetching makes
this function easy to unit-test in isolation and stable when wrapped.
"""
from __future__ import annotations
import pandas as pd
from src.tools.analytics.descriptive import ColumnNotFoundError
def _clean(value: object) -> object:
"""Convert numpy/pandas scalars to plain Python so the output is JSON-clean.
`top_value` (most frequent value) can be a `pandas.Timestamp` when profiling
a datetime column — neither `Timestamp` nor numpy scalars are JSON-safe.
"""
if isinstance(value, pd.Timestamp):
return value.isoformat()
if hasattr(value, "item"):
return value.item()
return value
def _profile_one(series: pd.Series) -> dict[str, object]:
"""Build the quality profile for a single column."""
total = len(series)
non_null = series.dropna()
nn = len(non_null)
distinct = int(series.nunique(dropna=True))
is_bool = pd.api.types.is_bool_dtype(series)
is_datetime = pd.api.types.is_datetime64_any_dtype(series)
# bool is technically numeric in pandas; treat it as its own type.
is_numeric = pd.api.types.is_numeric_dtype(series) and not is_bool
if is_bool:
inferred = "boolean"
elif is_datetime:
inferred = "datetime"
elif is_numeric:
inferred = "numeric"
else:
inferred = "categorical"
out: dict[str, object] = {
"dtype": str(series.dtype),
"inferred_type": inferred,
"count": int(total),
"null_count": int(series.isna().sum()),
"null_rate": float(series.isna().mean()) if total else 0.0,
"distinct_count": distinct,
"distinct_rate": (distinct / nn) if nn else 0.0, # over non-null values
"is_constant": distinct <= 1,
}
if is_numeric and nn > 0:
out["min"] = _clean(non_null.min())
out["max"] = _clean(non_null.max())
out["mean"] = _clean(non_null.mean())
# IQR rule: values outside [Q1 - 1.5*IQR, Q3 + 1.5*IQR] are outliers.
# Needs enough points for stable quartiles.
if nn >= 4:
q1 = non_null.quantile(0.25)
q3 = non_null.quantile(0.75)
iqr = q3 - q1
lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr
out["outlier_count"] = int(((non_null < lower) | (non_null > upper)).sum())
else:
out["outlier_count"] = None
elif not is_numeric and nn > 0:
counts = non_null.value_counts()
out["top_value"] = _clean(counts.index[0])
out["top_freq"] = int(counts.iloc[0])
return out
# Prompt-style description read by the Planner to decide WHEN to pick this tool.
# Final destination is ToolSpec.description once the wrapper layer is built.
DESCRIPTION = """\
Summary: Per-column data-quality profile. For each column reports dtype, \
inferred type, completeness (null count/rate), cardinality (distinct count/rate, \
constant flag), and — for numeric columns — min/max/mean plus an IQR-based \
outlier count; for non-numeric columns the most frequent value.
USE WHEN the question is about the HEALTH of the data, not its statistics: \
missing values, duplicates, data types, outliers, "is this clean enough to \
analyze". Trigger words: "quality" (kualitas), "missing/nulls" (data kosong), \
"data type" (tipe data), "duplicates/unique" (duplikat/unik), "outliers".
DON'T USE WHEN:
- the user wants statistics like mean/median/std/skew -> analyze_descriptive
- it groups or compares -> analyze_aggregate / analyze_comparison
Example questions:
- "is this dataset clean enough to analyze?"
- "which columns have a lot of missing values?"
- "what are the data types and unique counts per column?"
- "are there outliers in the amount column?"
"""
def analyze_profile(
df: pd.DataFrame,
column_ids: list[str] | None = None,
) -> dict[str, dict[str, object]]:
"""Per-column data-quality profile.
Args:
df: already-materialized data (in the real system the wrapper fetches
this from a source_id).
column_ids: columns to profile. If None, every column is profiled.
Returns:
dict: { column_id: { profile fields, ... }, ... }
Raises:
ColumnNotFoundError: if any column_id is absent from df.
"""
cols = list(column_ids) if column_ids is not None else list(df.columns)
missing = [c for c in cols if c not in df.columns]
if missing:
raise ColumnNotFoundError(f"columns not found: {missing}")
return {col: _profile_one(df[col]) for col in cols}