DataLine / pipeline /issues.py
Suraj Prasai
aded initial
458c8e2
from __future__ import annotations
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from scipy.sparse import issparse
from sklearn.linear_model import LogisticRegression, SGDRegressor
from tqdm.auto import tqdm
from .utils_cool import PhaseProgress, _ensure_dense32, _infer_task
def find_issues(
df: pd.DataFrame,
*,
label: str,
task: Optional[str] = None, # "classification" | "regression"; if None we infer
model: Optional[Any] = None, # sklearn estimator; default chosen by task
progress: Optional[Any] = None, # tqdm bar; None -> auto-create
verbose: bool = False,
) -> Tuple[Optional[pd.DataFrame], Dict]:
"""
Detect label issues using Cleanlab's CleanLearning.
Parameters
----------
df : DataFrame
Input table containing features and a label column.
label : str
Name of the label column. Rows with NaN in label are dropped.
task : {"classification","regression"}, optional
If not provided, inferred from y.
model : sklearn estimator, optional
If not provided, defaults to LogisticRegression or SGDRegressor.
progress : tqdm, optional
Phase-aware progress bar. If None, a local bar is created and closed.
verbose : bool, default False
Print warnings/timings in addition to returning them in stats.
Returns
-------
(df_out, stats)
df_out : DataFrame with label-issues optionally removed (or original df if `remove_issues=False`).
stats : dict with minimal metadata and counts.
"""
# ---- progress setup ----
local_bar = None
pp = None
if progress is None:
local_bar = tqdm(total=100, leave=True)
pp = PhaseProgress(local_bar, weights={"clean": .15, "cleanlab": .75, "finalize": .10})
elif hasattr(progress, "set_description") and hasattr(progress, "update"):
if not hasattr(progress, "_last_val"):
progress._last_val = 0
pp = PhaseProgress(progress, weights={"clean": .15, "cleanlab": .75, "finalize": .10})
warnings: List[str] = []
n_before = len(df)
# ---- clean: ensure label exists, drop NaNs in label ----
pp and pp.start("clean", extra={"N": n_before})
if label not in df.columns:
if local_bar is not None:
pp.close()
raise KeyError(f"Label column '{label}' not found in DataFrame.")
df_in = df.dropna(subset=[label]).reset_index(drop=True)
X = df_in
y_raw = X[label].to_numpy()
pp and pp.tick_abs("clean", 1.0, extra={"N": len(X)})
pp and pp.end("clean")
# ---- task/model selection ----
task_applied = _infer_task(y_raw, task)
if model is None:
if task_applied == "classification":
model = LogisticRegression(solver="saga", n_jobs=-1)
else:
model = SGDRegressor()
# y encoding for classification (Cleanlab expects numeric labels)
if task_applied == "classification":
classes, y = np.unique(y_raw, return_inverse=True)
else:
classes = None
y = y_raw.astype(np.float64, copy=False)
# ---- cleanlab: find label issues (with auto-dense fallback for tiny/sparse edge cases) ----
pp and pp.start("cleanlab", extra={"task": task_applied})
used_dense_fallback = False
try:
if task_applied == "classification":
from cleanlab.classification import CleanLearning as _CL
else:
from cleanlab.regression.learn import CleanLearning as _CL
cl = _CL(model)
issues = cl.find_label_issues(X, y)
except Exception as e:
if issparse(X):
# retry dense (cleanlab/small-N often prefers dense)
Xd = _ensure_dense32(X)
try:
cl = _CL(model)
issues = cl.find_label_issues(Xd, y)
used_dense_fallback = True
except Exception as e2:
if local_bar is not None:
pp.close()
raise RuntimeError(f"Cleanlab failed on sparse and dense features: {e2}") from e
else:
if local_bar is not None:
pp.close()
raise
# Parse outputs robustly
if isinstance(issues, pd.DataFrame):
is_issue = issues.get("is_label_issue", None)
label_quality = issues.get("label_quality", None)
else:
is_issue = None
label_quality = None
n = len(issues) if hasattr(issues, "__len__") else len(y)
n_issues = int(is_issue.sum()) if isinstance(is_issue, (pd.Series, np.ndarray)) else 0
pct = round((n_issues / n) * 100.0, 3) if n else 0.0
avg_quality = float(np.nanmean(label_quality.values)) if isinstance(label_quality, pd.Series) else float("nan")
pp and pp.tick_abs("cleanlab", 1.0, extra={"issues": n_issues})
pp and pp.end("cleanlab", extra={"issues": n_issues})
# ---- finalize: optionally drop issue rows ----
pp and pp.start("finalize")
df_out = df_in
if isinstance(is_issue, (pd.Series, np.ndarray)):
mask_keep = ~(is_issue.astype(bool).values)
df_out = df_in.loc[mask_keep].copy()
stats: Dict[str, Any] = {
"n_rows_before_cleanlab": int(len(df_in)),
"n_label_issues": int(n_issues),
"pct_label_issues": float(pct),
"avg_label_quality": float(avg_quality),
"n_rows_after_cleanlab": int(len(df_out)),
"task_applied": task_applied,
"model_name": type(model).__name__,
"used_dense_fallback": bool(used_dense_fallback),
"warnings": warnings,
}
pp and pp.tick_abs("finalize", 1.0)
pp and pp.end("finalize")
if verbose and warnings:
print("\n".join(warnings))
if local_bar is not None:
pp.close()
return df_out, stats