|
|
import inspect |
|
|
import math |
|
|
import re |
|
|
from dataclasses import dataclass |
|
|
from time import perf_counter |
|
|
from typing import Any, Dict, Optional, Tuple |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
from scipy.sparse import issparse |
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class PhaseProgress: |
|
|
bar: "tqdm" |
|
|
weights: Dict[str, float] |
|
|
total: int = 100 |
|
|
|
|
|
def __post_init__(self): |
|
|
self._norm = sum(self.weights.values()) or 1.0 |
|
|
self._done = 0.0 |
|
|
self._phase = None |
|
|
self._phase_t0 = None |
|
|
|
|
|
if not hasattr(self.bar, "_last_val"): |
|
|
self.bar._last_val = 0 |
|
|
|
|
|
def start(self, phase: str, extra: Optional[Dict] = None): |
|
|
self._phase = phase |
|
|
self._phase_t0 = perf_counter() |
|
|
self.bar.set_description_str(phase) |
|
|
if extra: |
|
|
self.bar.set_postfix(extra, refresh=False) |
|
|
|
|
|
def tick_abs(self, phase: str, p01: float, extra: Optional[Dict] = None): |
|
|
"""Update absolute progress based on within-phase progress p01 ∈ [0,1].""" |
|
|
p01 = max(0.0, min(1.0, float(p01))) |
|
|
w = self.weights.get(phase, 0.0) / self._norm |
|
|
target = int(round(self.total * (self._done + w * p01))) |
|
|
delta = target - self.bar._last_val |
|
|
if delta > 0: |
|
|
self.bar.update(delta) |
|
|
self.bar._last_val = target |
|
|
self.bar.set_description_str(f"{phase} {int(100*p01)}%") |
|
|
if extra: |
|
|
self.bar.set_postfix(extra, refresh=False) |
|
|
|
|
|
def end(self, phase: str, extra: Optional[Dict] = None): |
|
|
w = self.weights.get(phase, 0.0) / self._norm |
|
|
self._done += w |
|
|
elapsed_ms = (perf_counter() - (self._phase_t0 or perf_counter())) * 1000 |
|
|
post = dict(extra or {}) |
|
|
post["t"] = f"{elapsed_ms:.0f}ms" |
|
|
self.bar.set_postfix(post, refresh=False) |
|
|
|
|
|
def close(self): |
|
|
try: |
|
|
if self.bar._last_val < self.total: |
|
|
self.bar.update(self.total - self.bar._last_val) |
|
|
finally: |
|
|
self.bar.close() |
|
|
|
|
|
def choose_k(N: int, k_min: int = 5, k_max: int = 50) -> int: |
|
|
"""sqrt(N) clipped to [k_min, k_max] and ≤ N-1.""" |
|
|
if N <= 1: |
|
|
return 1 |
|
|
k = int(math.sqrt(N)) |
|
|
k = max(k_min, min(k, k_max)) |
|
|
return min(k, N - 1) |
|
|
|
|
|
def _ensure_dense32(X) -> np.ndarray: |
|
|
"""Convert to contiguous float32 ndarray (densify only if needed).""" |
|
|
if issparse(X): |
|
|
X = X.toarray() |
|
|
return np.asarray(X, dtype=np.float32, order="C") |
|
|
|
|
|
def decide_task_and_model( |
|
|
y: np.ndarray, |
|
|
series: pd.Series, |
|
|
*, |
|
|
is_categorical: bool = False, |
|
|
few_class_floor: int = 20, |
|
|
few_class_frac: float = 0.05, |
|
|
): |
|
|
N = len(y) |
|
|
|
|
|
|
|
|
is_bool = pd.api.types.is_bool_dtype(series) |
|
|
is_numeric = pd.api.types.is_numeric_dtype(series) |
|
|
|
|
|
|
|
|
y_nonnull = y[~pd.isnull(y)] |
|
|
n_unique = len(pd.unique(y_nonnull)) |
|
|
|
|
|
|
|
|
few_classes_threshold = max(few_class_floor, int(np.ceil(few_class_frac * max(N, 1)))) |
|
|
numeric_few_classes = is_numeric and (n_unique <= few_classes_threshold) |
|
|
|
|
|
use_classification = ( |
|
|
is_categorical |
|
|
or is_bool |
|
|
or (not is_numeric) |
|
|
or numeric_few_classes |
|
|
) |
|
|
|
|
|
if use_classification: |
|
|
return "classification" |
|
|
else: |
|
|
return "regression" |
|
|
|
|
|
def _infer_task(y: np.ndarray, task: Optional[str]) -> str: |
|
|
"""Decide task if not provided: numeric with many uniques -> regression, else classification.""" |
|
|
if task in {"classification", "regression"}: |
|
|
return task |
|
|
|
|
|
if np.issubdtype(y.dtype, np.number): |
|
|
nunq = len(np.unique(y[~pd.isna(y)])) |
|
|
is_categorical = nunq <= max(2, int(0.02 * max(1, len(y)))) |
|
|
else: |
|
|
is_categorical = True |
|
|
return "classification" if is_categorical else "regression" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def df_to_payload(df: pd.DataFrame) -> Dict[str, Any]: |
|
|
return {"orient": "split", "data": df.to_dict(orient="split")} |
|
|
|
|
|
|
|
|
def df_from_payload(p: Dict[str, Any]) -> pd.DataFrame: |
|
|
d = p["data"] |
|
|
return pd.DataFrame(d["data"], columns=d["columns"]) |
|
|
|
|
|
|
|
|
|
|
|
def guess_task_and_label(df: pd.DataFrame) -> Dict[str, Any]: |
|
|
cols = list(df.columns) |
|
|
label_candidates = [c for c in cols if c.lower() in {"label","target","y","class","outcome"}] |
|
|
label = label_candidates[0] if label_candidates else None |
|
|
|
|
|
|
|
|
task = None |
|
|
if label and (pd.api.types.is_integer_dtype(df[label]) or pd.api.types.is_bool_dtype(df[label])): |
|
|
nuniq = df[label].nunique(dropna=True) |
|
|
task = "classification" if nuniq <= max(20, int(0.05*len(df))) else "regression" |
|
|
elif label and pd.api.types.is_float_dtype(df[label]): |
|
|
task = "regression" |
|
|
else: |
|
|
task = "unsupervised" |
|
|
|
|
|
|
|
|
issues = [] |
|
|
if label and df[label].isna().any(): |
|
|
issues.append(f"Missing values in label `{label}`") |
|
|
if label and df[label].nunique() == 1: |
|
|
issues.append(f"Label `{label}` has a single class") |
|
|
|
|
|
|
|
|
return { |
|
|
"columns": cols, |
|
|
"dtypes": {c: str(df[c].dtype) for c in cols}, |
|
|
"label_guess": label, |
|
|
"task_guess": task, |
|
|
"issues": issues, |
|
|
"shape": df.shape, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_signature_dict(fn) -> Dict[str, Any]: |
|
|
sig = inspect.signature(fn) |
|
|
doc = (fn.__doc__ or "").strip() |
|
|
params = [] |
|
|
for p in sig.parameters.values(): |
|
|
if p.name == "df": |
|
|
continue |
|
|
default = None if (p.default is inspect._empty) else p.default |
|
|
annotation = None if (p.annotation is inspect._empty) else str(p.annotation) |
|
|
params.append({"name": p.name, "default": default, "annotation": annotation, "kind": str(p.kind)}) |
|
|
return {"params": params, "doc": doc} |
|
|
|
|
|
|
|
|
STEP_ALIASES = { |
|
|
"dedup": {"dedup","de-dup","duplicates","near-dup"}, |
|
|
"featurize": {"featurize","features","featureize","engineering"}, |
|
|
"find_label_issues": {"find_label_issues","label issues","cleanlab","label noise"}, |
|
|
} |
|
|
|
|
|
|
|
|
def parse_user_choice(text: str) -> Tuple[Optional[str], Dict[str, Any]]: |
|
|
t = text.lower() |
|
|
chosen = None |
|
|
for step, aliases in STEP_ALIASES.items(): |
|
|
if any(a in t for a in aliases): |
|
|
chosen = step |
|
|
break |
|
|
|
|
|
params: Dict[str, Any] = {} |
|
|
for m in re.finditer(r"(\w+)\s*=\s*([\-\w\.]+)", text): |
|
|
k, v = m.group(1), m.group(2) |
|
|
if v.replace('.', '', 1).isdigit(): |
|
|
v = float(v) if '.' in v else int(v) |
|
|
elif v.lower() in {"true","false"}: |
|
|
v = (v.lower() == "true") |
|
|
params[k] = v |
|
|
return chosen, params |