File size: 6,784 Bytes
458c8e2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
import inspect
import math
import re
from dataclasses import dataclass
from time import perf_counter
from typing import Any, Dict, Optional, Tuple
import numpy as np
import pandas as pd
from scipy.sparse import issparse
from tqdm import tqdm
@dataclass
class PhaseProgress:
bar: "tqdm"
weights: Dict[str, float]
total: int = 100
def __post_init__(self):
self._norm = sum(self.weights.values()) or 1.0
self._done = 0.0
self._phase = None
self._phase_t0 = None
# for smooth updates
if not hasattr(self.bar, "_last_val"):
self.bar._last_val = 0
def start(self, phase: str, extra: Optional[Dict] = None):
self._phase = phase
self._phase_t0 = perf_counter()
self.bar.set_description_str(phase)
if extra:
self.bar.set_postfix(extra, refresh=False)
def tick_abs(self, phase: str, p01: float, extra: Optional[Dict] = None):
"""Update absolute progress based on within-phase progress p01 ∈ [0,1]."""
p01 = max(0.0, min(1.0, float(p01)))
w = self.weights.get(phase, 0.0) / self._norm
target = int(round(self.total * (self._done + w * p01)))
delta = target - self.bar._last_val
if delta > 0:
self.bar.update(delta)
self.bar._last_val = target
self.bar.set_description_str(f"{phase} {int(100*p01)}%")
if extra:
self.bar.set_postfix(extra, refresh=False)
def end(self, phase: str, extra: Optional[Dict] = None):
w = self.weights.get(phase, 0.0) / self._norm
self._done += w
elapsed_ms = (perf_counter() - (self._phase_t0 or perf_counter())) * 1000
post = dict(extra or {})
post["t"] = f"{elapsed_ms:.0f}ms"
self.bar.set_postfix(post, refresh=False)
def close(self):
try:
if self.bar._last_val < self.total:
self.bar.update(self.total - self.bar._last_val)
finally:
self.bar.close()
def choose_k(N: int, k_min: int = 5, k_max: int = 50) -> int:
"""sqrt(N) clipped to [k_min, k_max] and ≤ N-1."""
if N <= 1:
return 1
k = int(math.sqrt(N))
k = max(k_min, min(k, k_max))
return min(k, N - 1)
def _ensure_dense32(X) -> np.ndarray:
"""Convert to contiguous float32 ndarray (densify only if needed)."""
if issparse(X):
X = X.toarray()
return np.asarray(X, dtype=np.float32, order="C")
def decide_task_and_model(
y: np.ndarray,
series: pd.Series,
*,
is_categorical: bool = False,
few_class_floor: int = 20,
few_class_frac: float = 0.05,
):
N = len(y)
# dtype checks
is_bool = pd.api.types.is_bool_dtype(series)
is_numeric = pd.api.types.is_numeric_dtype(series)
# unique values (ignore NaNs)
y_nonnull = y[~pd.isnull(y)]
n_unique = len(pd.unique(y_nonnull))
# numeric-but-few-classes heuristic
few_classes_threshold = max(few_class_floor, int(np.ceil(few_class_frac * max(N, 1))))
numeric_few_classes = is_numeric and (n_unique <= few_classes_threshold)
use_classification = (
is_categorical
or is_bool
or (not is_numeric)
or numeric_few_classes
)
if use_classification:
return "classification"
else:
return "regression"
def _infer_task(y: np.ndarray, task: Optional[str]) -> str:
"""Decide task if not provided: numeric with many uniques -> regression, else classification."""
if task in {"classification", "regression"}:
return task
if np.issubdtype(y.dtype, np.number):
nunq = len(np.unique(y[~pd.isna(y)]))
is_categorical = nunq <= max(2, int(0.02 * max(1, len(y))))
else:
is_categorical = True
return "classification" if is_categorical else "regression"
# --------- DataFrame payload helpers (for tool IO) ---------
def df_to_payload(df: pd.DataFrame) -> Dict[str, Any]:
return {"orient": "split", "data": df.to_dict(orient="split")}
def df_from_payload(p: Dict[str, Any]) -> pd.DataFrame:
d = p["data"]
return pd.DataFrame(d["data"], columns=d["columns"])
# --------- Light heuristics for task/label guess ---------
def guess_task_and_label(df: pd.DataFrame) -> Dict[str, Any]:
cols = list(df.columns)
label_candidates = [c for c in cols if c.lower() in {"label","target","y","class","outcome"}]
label = label_candidates[0] if label_candidates else None
task = None
if label and (pd.api.types.is_integer_dtype(df[label]) or pd.api.types.is_bool_dtype(df[label])):
nuniq = df[label].nunique(dropna=True)
task = "classification" if nuniq <= max(20, int(0.05*len(df))) else "regression"
elif label and pd.api.types.is_float_dtype(df[label]):
task = "regression"
else:
task = "unsupervised"
issues = []
if label and df[label].isna().any():
issues.append(f"Missing values in label `{label}`")
if label and df[label].nunique() == 1:
issues.append(f"Label `{label}` has a single class")
return {
"columns": cols,
"dtypes": {c: str(df[c].dtype) for c in cols},
"label_guess": label,
"task_guess": task,
"issues": issues,
"shape": df.shape,
}
# --------- Signature extraction for asking params ---------
def get_signature_dict(fn) -> Dict[str, Any]:
sig = inspect.signature(fn)
doc = (fn.__doc__ or "").strip()
params = []
for p in sig.parameters.values():
if p.name == "df":
continue
default = None if (p.default is inspect._empty) else p.default
annotation = None if (p.annotation is inspect._empty) else str(p.annotation)
params.append({"name": p.name, "default": default, "annotation": annotation, "kind": str(p.kind)})
return {"params": params, "doc": doc}
# --------- Parse free-text confirmation like "Run dedup threshold=0.93 metric=cosine" ---------
STEP_ALIASES = {
"dedup": {"dedup","de-dup","duplicates","near-dup"},
"featurize": {"featurize","features","featureize","engineering"},
"find_label_issues": {"find_label_issues","label issues","cleanlab","label noise"},
}
def parse_user_choice(text: str) -> Tuple[Optional[str], Dict[str, Any]]:
t = text.lower()
chosen = None
for step, aliases in STEP_ALIASES.items():
if any(a in t for a in aliases):
chosen = step
break
params: Dict[str, Any] = {}
for m in re.finditer(r"(\w+)\s*=\s*([\-\w\.]+)", text):
k, v = m.group(1), m.group(2)
if v.replace('.', '', 1).isdigit():
v = float(v) if '.' in v else int(v)
elif v.lower() in {"true","false"}:
v = (v.lower() == "true")
params[k] = v
return chosen, params |