File size: 5,831 Bytes
458c8e2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
from __future__ import annotations
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
from scipy.sparse import issparse
from sklearn.linear_model import LogisticRegression, SGDRegressor
from tqdm.auto import tqdm
from .utils_cool import PhaseProgress, _ensure_dense32, _infer_task
def find_issues(
df: pd.DataFrame,
*,
label: str,
task: Optional[str] = None, # "classification" | "regression"; if None we infer
model: Optional[Any] = None, # sklearn estimator; default chosen by task
progress: Optional[Any] = None, # tqdm bar; None -> auto-create
verbose: bool = False,
) -> Tuple[Optional[pd.DataFrame], Dict]:
"""
Detect label issues using Cleanlab's CleanLearning.
Parameters
----------
df : DataFrame
Input table containing features and a label column.
label : str
Name of the label column. Rows with NaN in label are dropped.
task : {"classification","regression"}, optional
If not provided, inferred from y.
model : sklearn estimator, optional
If not provided, defaults to LogisticRegression or SGDRegressor.
progress : tqdm, optional
Phase-aware progress bar. If None, a local bar is created and closed.
verbose : bool, default False
Print warnings/timings in addition to returning them in stats.
Returns
-------
(df_out, stats)
df_out : DataFrame with label-issues optionally removed (or original df if `remove_issues=False`).
stats : dict with minimal metadata and counts.
"""
# ---- progress setup ----
local_bar = None
pp = None
if progress is None:
local_bar = tqdm(total=100, leave=True)
pp = PhaseProgress(local_bar, weights={"clean": .15, "cleanlab": .75, "finalize": .10})
elif hasattr(progress, "set_description") and hasattr(progress, "update"):
if not hasattr(progress, "_last_val"):
progress._last_val = 0
pp = PhaseProgress(progress, weights={"clean": .15, "cleanlab": .75, "finalize": .10})
warnings: List[str] = []
n_before = len(df)
# ---- clean: ensure label exists, drop NaNs in label ----
pp and pp.start("clean", extra={"N": n_before})
if label not in df.columns:
if local_bar is not None:
pp.close()
raise KeyError(f"Label column '{label}' not found in DataFrame.")
df_in = df.dropna(subset=[label]).reset_index(drop=True)
X = df_in
y_raw = X[label].to_numpy()
pp and pp.tick_abs("clean", 1.0, extra={"N": len(X)})
pp and pp.end("clean")
# ---- task/model selection ----
task_applied = _infer_task(y_raw, task)
if model is None:
if task_applied == "classification":
model = LogisticRegression(solver="saga", n_jobs=-1)
else:
model = SGDRegressor()
# y encoding for classification (Cleanlab expects numeric labels)
if task_applied == "classification":
classes, y = np.unique(y_raw, return_inverse=True)
else:
classes = None
y = y_raw.astype(np.float64, copy=False)
# ---- cleanlab: find label issues (with auto-dense fallback for tiny/sparse edge cases) ----
pp and pp.start("cleanlab", extra={"task": task_applied})
used_dense_fallback = False
try:
if task_applied == "classification":
from cleanlab.classification import CleanLearning as _CL
else:
from cleanlab.regression.learn import CleanLearning as _CL
cl = _CL(model)
issues = cl.find_label_issues(X, y)
except Exception as e:
if issparse(X):
# retry dense (cleanlab/small-N often prefers dense)
Xd = _ensure_dense32(X)
try:
cl = _CL(model)
issues = cl.find_label_issues(Xd, y)
used_dense_fallback = True
except Exception as e2:
if local_bar is not None:
pp.close()
raise RuntimeError(f"Cleanlab failed on sparse and dense features: {e2}") from e
else:
if local_bar is not None:
pp.close()
raise
# Parse outputs robustly
if isinstance(issues, pd.DataFrame):
is_issue = issues.get("is_label_issue", None)
label_quality = issues.get("label_quality", None)
else:
is_issue = None
label_quality = None
n = len(issues) if hasattr(issues, "__len__") else len(y)
n_issues = int(is_issue.sum()) if isinstance(is_issue, (pd.Series, np.ndarray)) else 0
pct = round((n_issues / n) * 100.0, 3) if n else 0.0
avg_quality = float(np.nanmean(label_quality.values)) if isinstance(label_quality, pd.Series) else float("nan")
pp and pp.tick_abs("cleanlab", 1.0, extra={"issues": n_issues})
pp and pp.end("cleanlab", extra={"issues": n_issues})
# ---- finalize: optionally drop issue rows ----
pp and pp.start("finalize")
df_out = df_in
if isinstance(is_issue, (pd.Series, np.ndarray)):
mask_keep = ~(is_issue.astype(bool).values)
df_out = df_in.loc[mask_keep].copy()
stats: Dict[str, Any] = {
"n_rows_before_cleanlab": int(len(df_in)),
"n_label_issues": int(n_issues),
"pct_label_issues": float(pct),
"avg_label_quality": float(avg_quality),
"n_rows_after_cleanlab": int(len(df_out)),
"task_applied": task_applied,
"model_name": type(model).__name__,
"used_dense_fallback": bool(used_dense_fallback),
"warnings": warnings,
}
pp and pp.tick_abs("finalize", 1.0)
pp and pp.end("finalize")
if verbose and warnings:
print("\n".join(warnings))
if local_bar is not None:
pp.close()
return df_out, stats
|