| """analyze_profile — per-column data-quality profile (KM-608). |
| |
| An analytical "family" tool: in ONE call it profiles each column's health — |
| dtype, inferred type, completeness (null count/rate), cardinality (distinct |
| count/rate, constant flag), and — for numeric columns — min/max/mean plus an |
| IQR-based outlier count; for non-numeric columns the most frequent value. |
| Answers "is this data clean enough to analyze?" and surfaces issues (lots of |
| nulls, a constant column, outliers) before deeper analysis. |
| |
| STATUS: compute layer only — the function takes an already-materialized |
| DataFrame. The wrapper layer (fetching data from the catalog via source_id, |
| the ToolOutput envelope, ToolSpec registration) is added once the Planner |
| seam (KM-418) is settled. Keeping compute separate from data-fetching makes |
| this function easy to unit-test in isolation and stable when wrapped. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import pandas as pd |
|
|
| from src.tools.analytics.descriptive import ColumnNotFoundError |
|
|
|
|
| def _clean(value: object) -> object: |
| """Convert numpy/pandas scalars to plain Python so the output is JSON-clean. |
| |
| `top_value` (most frequent value) can be a `pandas.Timestamp` when profiling |
| a datetime column — neither `Timestamp` nor numpy scalars are JSON-safe. |
| """ |
| if isinstance(value, pd.Timestamp): |
| return value.isoformat() |
| if hasattr(value, "item"): |
| return value.item() |
| return value |
|
|
|
|
| def _profile_one(series: pd.Series) -> dict[str, object]: |
| """Build the quality profile for a single column.""" |
| total = len(series) |
| non_null = series.dropna() |
| nn = len(non_null) |
| distinct = int(series.nunique(dropna=True)) |
|
|
| is_bool = pd.api.types.is_bool_dtype(series) |
| is_datetime = pd.api.types.is_datetime64_any_dtype(series) |
| |
| is_numeric = pd.api.types.is_numeric_dtype(series) and not is_bool |
|
|
| if is_bool: |
| inferred = "boolean" |
| elif is_datetime: |
| inferred = "datetime" |
| elif is_numeric: |
| inferred = "numeric" |
| else: |
| inferred = "categorical" |
|
|
| out: dict[str, object] = { |
| "dtype": str(series.dtype), |
| "inferred_type": inferred, |
| "count": int(total), |
| "null_count": int(series.isna().sum()), |
| "null_rate": float(series.isna().mean()) if total else 0.0, |
| "distinct_count": distinct, |
| "distinct_rate": (distinct / nn) if nn else 0.0, |
| "is_constant": distinct <= 1, |
| } |
|
|
| if is_numeric and nn > 0: |
| out["min"] = _clean(non_null.min()) |
| out["max"] = _clean(non_null.max()) |
| out["mean"] = _clean(non_null.mean()) |
| |
| |
| if nn >= 4: |
| q1 = non_null.quantile(0.25) |
| q3 = non_null.quantile(0.75) |
| iqr = q3 - q1 |
| lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr |
| out["outlier_count"] = int(((non_null < lower) | (non_null > upper)).sum()) |
| else: |
| out["outlier_count"] = None |
| elif not is_numeric and nn > 0: |
| counts = non_null.value_counts() |
| out["top_value"] = _clean(counts.index[0]) |
| out["top_freq"] = int(counts.iloc[0]) |
|
|
| return out |
|
|
|
|
| |
| |
| DESCRIPTION = """\ |
| Summary: Per-column data-quality profile. For each column reports dtype, \ |
| inferred type, completeness (null count/rate), cardinality (distinct count/rate, \ |
| constant flag), and — for numeric columns — min/max/mean plus an IQR-based \ |
| outlier count; for non-numeric columns the most frequent value. |
| |
| USE WHEN the question is about the HEALTH of the data, not its statistics: \ |
| missing values, duplicates, data types, outliers, "is this clean enough to \ |
| analyze". Trigger words: "quality" (kualitas), "missing/nulls" (data kosong), \ |
| "data type" (tipe data), "duplicates/unique" (duplikat/unik), "outliers". |
| |
| DON'T USE WHEN: |
| - the user wants statistics like mean/median/std/skew -> analyze_descriptive |
| - it groups or compares -> analyze_aggregate / analyze_comparison |
| |
| Example questions: |
| - "is this dataset clean enough to analyze?" |
| - "which columns have a lot of missing values?" |
| - "what are the data types and unique counts per column?" |
| - "are there outliers in the amount column?" |
| """ |
|
|
|
|
| def analyze_profile( |
| df: pd.DataFrame, |
| column_ids: list[str] | None = None, |
| ) -> dict[str, dict[str, object]]: |
| """Per-column data-quality profile. |
| |
| Args: |
| df: already-materialized data (in the real system the wrapper fetches |
| this from a source_id). |
| column_ids: columns to profile. If None, every column is profiled. |
| |
| Returns: |
| dict: { column_id: { profile fields, ... }, ... } |
| |
| Raises: |
| ColumnNotFoundError: if any column_id is absent from df. |
| """ |
| cols = list(column_ids) if column_ids is not None else list(df.columns) |
|
|
| missing = [c for c in cols if c not in df.columns] |
| if missing: |
| raise ColumnNotFoundError(f"columns not found: {missing}") |
|
|
| return {col: _profile_one(df[col]) for col in cols} |
|
|