File size: 5,831 Bytes
458c8e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from __future__ import annotations

from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
from scipy.sparse import issparse
from sklearn.linear_model import LogisticRegression, SGDRegressor
from tqdm.auto import tqdm

from .utils_cool import PhaseProgress, _ensure_dense32, _infer_task


def find_issues(
    df: pd.DataFrame,
    *,
    label: str,
    task: Optional[str] = None,                 # "classification" | "regression"; if None we infer
    model: Optional[Any] = None,                # sklearn estimator; default chosen by task
    progress: Optional[Any] = None,             # tqdm bar; None -> auto-create
    verbose: bool = False,
) -> Tuple[Optional[pd.DataFrame], Dict]:
    """
    Detect label issues using Cleanlab's CleanLearning.

    Parameters
    ----------
    df : DataFrame
        Input table containing features and a label column.
    label : str
        Name of the label column. Rows with NaN in label are dropped.
    task : {"classification","regression"}, optional
        If not provided, inferred from y.
    model : sklearn estimator, optional
        If not provided, defaults to LogisticRegression or SGDRegressor.
    progress : tqdm, optional
        Phase-aware progress bar. If None, a local bar is created and closed.
    verbose : bool, default False
        Print warnings/timings in addition to returning them in stats.

    Returns
    -------
    (df_out, stats)
        df_out : DataFrame with label-issues optionally removed (or original df if `remove_issues=False`).
        stats  : dict with minimal metadata and counts.
    """
    # ---- progress setup ----
    local_bar = None
    pp = None
    if progress is None:
        local_bar = tqdm(total=100, leave=True)
        pp = PhaseProgress(local_bar, weights={"clean": .15, "cleanlab": .75, "finalize": .10})
    elif hasattr(progress, "set_description") and hasattr(progress, "update"):
        if not hasattr(progress, "_last_val"):
            progress._last_val = 0
        pp = PhaseProgress(progress, weights={"clean": .15, "cleanlab": .75, "finalize": .10})

    warnings: List[str] = []
    n_before = len(df)

    # ---- clean: ensure label exists, drop NaNs in label ----
    pp and pp.start("clean", extra={"N": n_before})
    if label not in df.columns:
        if local_bar is not None:
            pp.close()
        raise KeyError(f"Label column '{label}' not found in DataFrame.")
    df_in = df.dropna(subset=[label]).reset_index(drop=True)
    X = df_in
    y_raw = X[label].to_numpy()
    pp and pp.tick_abs("clean", 1.0, extra={"N": len(X)})
    pp and pp.end("clean")

    # ---- task/model selection ----
    task_applied = _infer_task(y_raw, task)
    if model is None:
        if task_applied == "classification":
            model = LogisticRegression(solver="saga", n_jobs=-1)
        else:
            model = SGDRegressor()

    # y encoding for classification (Cleanlab expects numeric labels)
    if task_applied == "classification":
        classes, y = np.unique(y_raw, return_inverse=True)
    else:
        classes = None
        y = y_raw.astype(np.float64, copy=False)

    # ---- cleanlab: find label issues (with auto-dense fallback for tiny/sparse edge cases) ----
    pp and pp.start("cleanlab", extra={"task": task_applied})
    used_dense_fallback = False
    try:
        if task_applied == "classification":
            from cleanlab.classification import CleanLearning as _CL
        else:
            from cleanlab.regression.learn import CleanLearning as _CL
        cl = _CL(model)
        issues = cl.find_label_issues(X, y)
    except Exception as e:
        if issparse(X):
            # retry dense (cleanlab/small-N often prefers dense)
            Xd = _ensure_dense32(X)
            try:
                cl = _CL(model)
                issues = cl.find_label_issues(Xd, y)
                used_dense_fallback = True
            except Exception as e2:
                if local_bar is not None:
                    pp.close()
                raise RuntimeError(f"Cleanlab failed on sparse and dense features: {e2}") from e
        else:
            if local_bar is not None:
                pp.close()
            raise

    # Parse outputs robustly
    if isinstance(issues, pd.DataFrame):
        is_issue = issues.get("is_label_issue", None)
        label_quality = issues.get("label_quality", None)
    else:
        is_issue = None
        label_quality = None

    n = len(issues) if hasattr(issues, "__len__") else len(y)
    n_issues = int(is_issue.sum()) if isinstance(is_issue, (pd.Series, np.ndarray)) else 0
    pct = round((n_issues / n) * 100.0, 3) if n else 0.0
    avg_quality = float(np.nanmean(label_quality.values)) if isinstance(label_quality, pd.Series) else float("nan")

    pp and pp.tick_abs("cleanlab", 1.0, extra={"issues": n_issues})
    pp and pp.end("cleanlab", extra={"issues": n_issues})

    # ---- finalize: optionally drop issue rows ----
    pp and pp.start("finalize")
    df_out = df_in
    if isinstance(is_issue, (pd.Series, np.ndarray)):
        mask_keep = ~(is_issue.astype(bool).values)
        df_out = df_in.loc[mask_keep].copy()

    stats: Dict[str, Any] = {
        "n_rows_before_cleanlab": int(len(df_in)),
        "n_label_issues": int(n_issues),
        "pct_label_issues": float(pct),
        "avg_label_quality": float(avg_quality),
        "n_rows_after_cleanlab": int(len(df_out)),
        "task_applied": task_applied,
        "model_name": type(model).__name__,
        "used_dense_fallback": bool(used_dense_fallback),
        "warnings": warnings,
    }
    pp and pp.tick_abs("finalize", 1.0)
    pp and pp.end("finalize")

    if verbose and warnings:
        print("\n".join(warnings))
    if local_bar is not None:
        pp.close()

    return df_out, stats