DataLine / pipeline /utils_cool.py
Suraj Prasai
aded initial
458c8e2
import inspect
import math
import re
from dataclasses import dataclass
from time import perf_counter
from typing import Any, Dict, Optional, Tuple
import numpy as np
import pandas as pd
from scipy.sparse import issparse
from tqdm import tqdm
@dataclass
class PhaseProgress:
bar: "tqdm"
weights: Dict[str, float]
total: int = 100
def __post_init__(self):
self._norm = sum(self.weights.values()) or 1.0
self._done = 0.0
self._phase = None
self._phase_t0 = None
# for smooth updates
if not hasattr(self.bar, "_last_val"):
self.bar._last_val = 0
def start(self, phase: str, extra: Optional[Dict] = None):
self._phase = phase
self._phase_t0 = perf_counter()
self.bar.set_description_str(phase)
if extra:
self.bar.set_postfix(extra, refresh=False)
def tick_abs(self, phase: str, p01: float, extra: Optional[Dict] = None):
"""Update absolute progress based on within-phase progress p01 ∈ [0,1]."""
p01 = max(0.0, min(1.0, float(p01)))
w = self.weights.get(phase, 0.0) / self._norm
target = int(round(self.total * (self._done + w * p01)))
delta = target - self.bar._last_val
if delta > 0:
self.bar.update(delta)
self.bar._last_val = target
self.bar.set_description_str(f"{phase} {int(100*p01)}%")
if extra:
self.bar.set_postfix(extra, refresh=False)
def end(self, phase: str, extra: Optional[Dict] = None):
w = self.weights.get(phase, 0.0) / self._norm
self._done += w
elapsed_ms = (perf_counter() - (self._phase_t0 or perf_counter())) * 1000
post = dict(extra or {})
post["t"] = f"{elapsed_ms:.0f}ms"
self.bar.set_postfix(post, refresh=False)
def close(self):
try:
if self.bar._last_val < self.total:
self.bar.update(self.total - self.bar._last_val)
finally:
self.bar.close()
def choose_k(N: int, k_min: int = 5, k_max: int = 50) -> int:
"""sqrt(N) clipped to [k_min, k_max] and ≤ N-1."""
if N <= 1:
return 1
k = int(math.sqrt(N))
k = max(k_min, min(k, k_max))
return min(k, N - 1)
def _ensure_dense32(X) -> np.ndarray:
"""Convert to contiguous float32 ndarray (densify only if needed)."""
if issparse(X):
X = X.toarray()
return np.asarray(X, dtype=np.float32, order="C")
def decide_task_and_model(
y: np.ndarray,
series: pd.Series,
*,
is_categorical: bool = False,
few_class_floor: int = 20,
few_class_frac: float = 0.05,
):
N = len(y)
# dtype checks
is_bool = pd.api.types.is_bool_dtype(series)
is_numeric = pd.api.types.is_numeric_dtype(series)
# unique values (ignore NaNs)
y_nonnull = y[~pd.isnull(y)]
n_unique = len(pd.unique(y_nonnull))
# numeric-but-few-classes heuristic
few_classes_threshold = max(few_class_floor, int(np.ceil(few_class_frac * max(N, 1))))
numeric_few_classes = is_numeric and (n_unique <= few_classes_threshold)
use_classification = (
is_categorical
or is_bool
or (not is_numeric)
or numeric_few_classes
)
if use_classification:
return "classification"
else:
return "regression"
def _infer_task(y: np.ndarray, task: Optional[str]) -> str:
"""Decide task if not provided: numeric with many uniques -> regression, else classification."""
if task in {"classification", "regression"}:
return task
if np.issubdtype(y.dtype, np.number):
nunq = len(np.unique(y[~pd.isna(y)]))
is_categorical = nunq <= max(2, int(0.02 * max(1, len(y))))
else:
is_categorical = True
return "classification" if is_categorical else "regression"
# --------- DataFrame payload helpers (for tool IO) ---------
def df_to_payload(df: pd.DataFrame) -> Dict[str, Any]:
return {"orient": "split", "data": df.to_dict(orient="split")}
def df_from_payload(p: Dict[str, Any]) -> pd.DataFrame:
d = p["data"]
return pd.DataFrame(d["data"], columns=d["columns"])
# --------- Light heuristics for task/label guess ---------
def guess_task_and_label(df: pd.DataFrame) -> Dict[str, Any]:
cols = list(df.columns)
label_candidates = [c for c in cols if c.lower() in {"label","target","y","class","outcome"}]
label = label_candidates[0] if label_candidates else None
task = None
if label and (pd.api.types.is_integer_dtype(df[label]) or pd.api.types.is_bool_dtype(df[label])):
nuniq = df[label].nunique(dropna=True)
task = "classification" if nuniq <= max(20, int(0.05*len(df))) else "regression"
elif label and pd.api.types.is_float_dtype(df[label]):
task = "regression"
else:
task = "unsupervised"
issues = []
if label and df[label].isna().any():
issues.append(f"Missing values in label `{label}`")
if label and df[label].nunique() == 1:
issues.append(f"Label `{label}` has a single class")
return {
"columns": cols,
"dtypes": {c: str(df[c].dtype) for c in cols},
"label_guess": label,
"task_guess": task,
"issues": issues,
"shape": df.shape,
}
# --------- Signature extraction for asking params ---------
def get_signature_dict(fn) -> Dict[str, Any]:
sig = inspect.signature(fn)
doc = (fn.__doc__ or "").strip()
params = []
for p in sig.parameters.values():
if p.name == "df":
continue
default = None if (p.default is inspect._empty) else p.default
annotation = None if (p.annotation is inspect._empty) else str(p.annotation)
params.append({"name": p.name, "default": default, "annotation": annotation, "kind": str(p.kind)})
return {"params": params, "doc": doc}
# --------- Parse free-text confirmation like "Run dedup threshold=0.93 metric=cosine" ---------
STEP_ALIASES = {
"dedup": {"dedup","de-dup","duplicates","near-dup"},
"featurize": {"featurize","features","featureize","engineering"},
"find_label_issues": {"find_label_issues","label issues","cleanlab","label noise"},
}
def parse_user_choice(text: str) -> Tuple[Optional[str], Dict[str, Any]]:
t = text.lower()
chosen = None
for step, aliases in STEP_ALIASES.items():
if any(a in t for a in aliases):
chosen = step
break
params: Dict[str, Any] = {}
for m in re.finditer(r"(\w+)\s*=\s*([\-\w\.]+)", text):
k, v = m.group(1), m.group(2)
if v.replace('.', '', 1).isdigit():
v = float(v) if '.' in v else int(v)
elif v.lower() in {"true","false"}:
v = (v.lower() == "true")
params[k] = v
return chosen, params