|
|
from __future__ import annotations |
|
|
|
|
|
from time import perf_counter |
|
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from cleanlab import Datalab |
|
|
from cleanlab.datalab.internal.issue_manager.duplicate import NearDuplicateIssueManager |
|
|
from scipy.sparse import issparse |
|
|
from scipy.special import comb |
|
|
from sklearn.base import TransformerMixin |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
from tqdm.auto import tqdm |
|
|
|
|
|
from .utils_cool import PhaseProgress, _ensure_dense32, choose_k |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_near_duplicates( |
|
|
df: pd.DataFrame, |
|
|
*, |
|
|
|
|
|
metric: str = "cosine", |
|
|
threshold: float = 0.13, |
|
|
k: Optional[int] = None, |
|
|
|
|
|
vectorizer: Optional[TransformerMixin] = None, |
|
|
force_dense: bool = False, |
|
|
|
|
|
verbose: bool = False, |
|
|
|
|
|
progress: Optional[Any] = None, |
|
|
) -> Tuple[Optional[pd.DataFrame], Dict]: |
|
|
""" |
|
|
Detect near-duplicates using Cleanlab's NearDuplicateIssueManager. |
|
|
|
|
|
Parameters |
|
|
---------- |
|
|
df : DataFrame |
|
|
Rows to analyze. If no vectorizer is passed, all columns are joined as strings for TF-IDF. |
|
|
metric : {"cosine", "euclidean", "manhattan"} |
|
|
Distance metric for kNN graph. |
|
|
threshold : float |
|
|
Near-duplicate radius is based on threshold × median NN distance (internal to Cleanlab). |
|
|
k : int or None |
|
|
Neighborhood size. If None, uses sqrt(N) clipped to [5, 50] and ≤ N-1. |
|
|
vectorizer : sklearn Transformer |
|
|
Any transformer with fit_transform/transform. If None, uses TF-IDF (float32). |
|
|
force_dense : bool |
|
|
If True, densify features before passing to Cleanlab. |
|
|
verbose : bool |
|
|
Print timing breakdown. |
|
|
progress : tqdm or Callable[[str, float], None] |
|
|
Phase-aware progress reporting. |
|
|
|
|
|
Returns |
|
|
------- |
|
|
(output_df, stats) |
|
|
output_df : DataFrame after deduplication |
|
|
stats : dict of counts/timings/params |
|
|
""" |
|
|
|
|
|
local_bar = None |
|
|
pp = None |
|
|
if progress is None: |
|
|
local_bar = tqdm(total=100, leave=True) |
|
|
pp = PhaseProgress(local_bar, weights={"vectorize": .2, "find_issues": .7, "grouping": .1}) |
|
|
else: |
|
|
if hasattr(progress, "set_description") and hasattr(progress, "update"): |
|
|
|
|
|
if not hasattr(progress, "_last_val"): |
|
|
progress._last_val = 0 |
|
|
pp = PhaseProgress(progress, weights={"vectorize": .2, "find_issues": .7, "grouping": .1}) |
|
|
|
|
|
timings: Dict[str, float] = {} |
|
|
t0 = perf_counter() |
|
|
N = int(len(df)) |
|
|
|
|
|
|
|
|
pp and pp.start("vectorize", extra={"N": N}) |
|
|
t_vec0 = perf_counter() |
|
|
text_series = df.astype(str).agg(" ".join, axis=1) |
|
|
|
|
|
if vectorizer is None: |
|
|
vectorizer = TfidfVectorizer(dtype=np.float32) |
|
|
|
|
|
if hasattr(vectorizer, "fit_transform"): |
|
|
X = vectorizer.fit_transform(text_series.tolist()) |
|
|
elif hasattr(vectorizer, "transform"): |
|
|
X = vectorizer.transform(text_series.tolist()) |
|
|
else: |
|
|
raise TypeError("`vectorizer` must implement fit_transform or transform.") |
|
|
|
|
|
if force_dense: |
|
|
X = _ensure_dense32(X) |
|
|
|
|
|
timings["vectorize"] = perf_counter() - t_vec0 |
|
|
pp and pp.tick_abs("vectorize", 1.0) |
|
|
pp and pp.end("vectorize") |
|
|
|
|
|
|
|
|
pp and pp.start("find_issues", extra={"metric": metric}) |
|
|
t_cl0 = perf_counter() |
|
|
|
|
|
if k is None: |
|
|
k = choose_k(N) |
|
|
|
|
|
lab = Datalab(data={"__row__": list(range(N))}) |
|
|
ndm = NearDuplicateIssueManager(datalab=lab, metric=metric, threshold=threshold, k=k) |
|
|
|
|
|
pp and pp.tick_abs("find_issues", 0.0, extra={"k": k}) |
|
|
|
|
|
try: |
|
|
ndm.find_issues(features=X) |
|
|
except Exception: |
|
|
if issparse(X): |
|
|
X_dense = _ensure_dense32(X) |
|
|
ndm.find_issues(features=X_dense) |
|
|
else: |
|
|
raise |
|
|
|
|
|
pp and pp.tick_abs("find_issues", 1.0) |
|
|
timings["cleanlab_find_issues"] = perf_counter() - t_cl0 |
|
|
pp and pp.end("find_issues", extra={"k": k}) |
|
|
|
|
|
near_dup_sets: List[List[int]] = getattr(ndm, "near_duplicate_sets", []) or [] |
|
|
|
|
|
|
|
|
pp and pp.start("grouping") |
|
|
t_out0 = perf_counter() |
|
|
|
|
|
|
|
|
reps = [int(np.min(g)) for g in near_dup_sets if np.size(g) > 0] |
|
|
in_any = {int(i) for g in near_dup_sets for i in np.asarray(g).ravel()} |
|
|
keep_set = set(reps) | (set(range(N)) - in_any) |
|
|
|
|
|
out_df = None |
|
|
if N > 0: |
|
|
keep_mask = np.zeros(N, dtype=bool) |
|
|
if keep_set: |
|
|
keep_mask[list(keep_set)] = True |
|
|
out_df = df.iloc[np.where(keep_mask)[0]].copy() |
|
|
else: |
|
|
out_df = df.copy() |
|
|
|
|
|
|
|
|
n_groups = sum(1 for g in near_dup_sets if np.size(g) > 0) |
|
|
group_sizes = [int(np.size(g)) for g in near_dup_sets if np.size(g) > 0] |
|
|
n_flagged = sum(max(0, s - 1) for s in group_sizes) |
|
|
n_pairs = int(sum(comb(s, 2, exact=True) for s in group_sizes)) |
|
|
|
|
|
timings["groups_and_output"] = perf_counter() - t_out0 |
|
|
total_time = perf_counter() - t0 |
|
|
|
|
|
stats = { |
|
|
"n_rows_before_dedup": N, |
|
|
"n_near_dupe_pairs": n_pairs, |
|
|
"n_groups": n_groups, |
|
|
"avg_group_size": float(np.mean(group_sizes)) if group_sizes else 0.0, |
|
|
"max_group_size": max(group_sizes) if group_sizes else 0, |
|
|
"n_rows_flagged_duplicates": n_flagged, |
|
|
"n_rows_after_dedup": int(len(out_df)) if out_df is not None else N, |
|
|
"metric": metric, |
|
|
"threshold": threshold, |
|
|
"k": int(k), |
|
|
"timings": timings, |
|
|
"total_time_sec": total_time, |
|
|
} |
|
|
|
|
|
if verbose: |
|
|
print(f"[timing] TOTAL: {total_time*1000:.1f} ms") |
|
|
for k_, v in timings.items(): |
|
|
print(f" - {k_}: {v*1000:.1f} ms") |
|
|
|
|
|
pp and pp.tick_abs("grouping", 1.0, extra={"groups": n_groups, "pairs": n_pairs}) |
|
|
pp and pp.end("grouping") |
|
|
|
|
|
if local_bar is not None: |
|
|
pp.close() |
|
|
|
|
|
|
|
|
return out_df, stats |