|
|
from __future__ import annotations |
|
|
|
|
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from scipy.sparse import issparse |
|
|
from sklearn.linear_model import LogisticRegression, SGDRegressor |
|
|
from tqdm.auto import tqdm |
|
|
|
|
|
from .utils_cool import PhaseProgress, _ensure_dense32, _infer_task |
|
|
|
|
|
|
|
|
def find_issues( |
|
|
df: pd.DataFrame, |
|
|
*, |
|
|
label: str, |
|
|
task: Optional[str] = None, |
|
|
model: Optional[Any] = None, |
|
|
progress: Optional[Any] = None, |
|
|
verbose: bool = False, |
|
|
) -> Tuple[Optional[pd.DataFrame], Dict]: |
|
|
""" |
|
|
Detect label issues using Cleanlab's CleanLearning. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
df : DataFrame |
|
|
Input table containing features and a label column. |
|
|
label : str |
|
|
Name of the label column. Rows with NaN in label are dropped. |
|
|
task : {"classification","regression"}, optional |
|
|
If not provided, inferred from y. |
|
|
model : sklearn estimator, optional |
|
|
If not provided, defaults to LogisticRegression or SGDRegressor. |
|
|
progress : tqdm, optional |
|
|
Phase-aware progress bar. If None, a local bar is created and closed. |
|
|
verbose : bool, default False |
|
|
Print warnings/timings in addition to returning them in stats. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
(df_out, stats) |
|
|
df_out : DataFrame with label-issues optionally removed (or original df if `remove_issues=False`). |
|
|
stats : dict with minimal metadata and counts. |
|
|
""" |
|
|
|
|
|
local_bar = None |
|
|
pp = None |
|
|
if progress is None: |
|
|
local_bar = tqdm(total=100, leave=True) |
|
|
pp = PhaseProgress(local_bar, weights={"clean": .15, "cleanlab": .75, "finalize": .10}) |
|
|
elif hasattr(progress, "set_description") and hasattr(progress, "update"): |
|
|
if not hasattr(progress, "_last_val"): |
|
|
progress._last_val = 0 |
|
|
pp = PhaseProgress(progress, weights={"clean": .15, "cleanlab": .75, "finalize": .10}) |
|
|
|
|
|
warnings: List[str] = [] |
|
|
n_before = len(df) |
|
|
|
|
|
|
|
|
pp and pp.start("clean", extra={"N": n_before}) |
|
|
if label not in df.columns: |
|
|
if local_bar is not None: |
|
|
pp.close() |
|
|
raise KeyError(f"Label column '{label}' not found in DataFrame.") |
|
|
df_in = df.dropna(subset=[label]).reset_index(drop=True) |
|
|
X = df_in |
|
|
y_raw = X[label].to_numpy() |
|
|
pp and pp.tick_abs("clean", 1.0, extra={"N": len(X)}) |
|
|
pp and pp.end("clean") |
|
|
|
|
|
|
|
|
task_applied = _infer_task(y_raw, task) |
|
|
if model is None: |
|
|
if task_applied == "classification": |
|
|
model = LogisticRegression(solver="saga", n_jobs=-1) |
|
|
else: |
|
|
model = SGDRegressor() |
|
|
|
|
|
|
|
|
if task_applied == "classification": |
|
|
classes, y = np.unique(y_raw, return_inverse=True) |
|
|
else: |
|
|
classes = None |
|
|
y = y_raw.astype(np.float64, copy=False) |
|
|
|
|
|
|
|
|
pp and pp.start("cleanlab", extra={"task": task_applied}) |
|
|
used_dense_fallback = False |
|
|
try: |
|
|
if task_applied == "classification": |
|
|
from cleanlab.classification import CleanLearning as _CL |
|
|
else: |
|
|
from cleanlab.regression.learn import CleanLearning as _CL |
|
|
cl = _CL(model) |
|
|
issues = cl.find_label_issues(X, y) |
|
|
except Exception as e: |
|
|
if issparse(X): |
|
|
|
|
|
Xd = _ensure_dense32(X) |
|
|
try: |
|
|
cl = _CL(model) |
|
|
issues = cl.find_label_issues(Xd, y) |
|
|
used_dense_fallback = True |
|
|
except Exception as e2: |
|
|
if local_bar is not None: |
|
|
pp.close() |
|
|
raise RuntimeError(f"Cleanlab failed on sparse and dense features: {e2}") from e |
|
|
else: |
|
|
if local_bar is not None: |
|
|
pp.close() |
|
|
raise |
|
|
|
|
|
|
|
|
if isinstance(issues, pd.DataFrame): |
|
|
is_issue = issues.get("is_label_issue", None) |
|
|
label_quality = issues.get("label_quality", None) |
|
|
else: |
|
|
is_issue = None |
|
|
label_quality = None |
|
|
|
|
|
n = len(issues) if hasattr(issues, "__len__") else len(y) |
|
|
n_issues = int(is_issue.sum()) if isinstance(is_issue, (pd.Series, np.ndarray)) else 0 |
|
|
pct = round((n_issues / n) * 100.0, 3) if n else 0.0 |
|
|
avg_quality = float(np.nanmean(label_quality.values)) if isinstance(label_quality, pd.Series) else float("nan") |
|
|
|
|
|
pp and pp.tick_abs("cleanlab", 1.0, extra={"issues": n_issues}) |
|
|
pp and pp.end("cleanlab", extra={"issues": n_issues}) |
|
|
|
|
|
|
|
|
pp and pp.start("finalize") |
|
|
df_out = df_in |
|
|
if isinstance(is_issue, (pd.Series, np.ndarray)): |
|
|
mask_keep = ~(is_issue.astype(bool).values) |
|
|
df_out = df_in.loc[mask_keep].copy() |
|
|
|
|
|
stats: Dict[str, Any] = { |
|
|
"n_rows_before_cleanlab": int(len(df_in)), |
|
|
"n_label_issues": int(n_issues), |
|
|
"pct_label_issues": float(pct), |
|
|
"avg_label_quality": float(avg_quality), |
|
|
"n_rows_after_cleanlab": int(len(df_out)), |
|
|
"task_applied": task_applied, |
|
|
"model_name": type(model).__name__, |
|
|
"used_dense_fallback": bool(used_dense_fallback), |
|
|
"warnings": warnings, |
|
|
} |
|
|
pp and pp.tick_abs("finalize", 1.0) |
|
|
pp and pp.end("finalize") |
|
|
|
|
|
if verbose and warnings: |
|
|
print("\n".join(warnings)) |
|
|
if local_bar is not None: |
|
|
pp.close() |
|
|
|
|
|
return df_out, stats |
|
|
|