Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import numpy as np | |
| import pandas as pd | |
| from typing import Dict, Optional, Tuple | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.preprocessing import RobustScaler, StandardScaler, FunctionTransformer | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.neighbors import NearestNeighbors | |
| # NEW: medoids (robust, nearest-exemplar clustering) | |
| from sklearn_extra.cluster import KMedoids | |
| ARCH_FEATURES = [ | |
| "velo", | |
| "ivb_in", | |
| "hb_as_in", | |
| "rel_height", | |
| "rel_side", | |
| "spin", | |
| "csw", | |
| "whiff_rate", | |
| "gb_rate", | |
| "zone_pct", | |
| ] | |
| # ---------- existing helpers (unchanged API) ---------- | |
| def winsorize_df(df: pd.DataFrame, cols, lower=0.01, upper=0.99): | |
| q_low = df[cols].quantile(lower) | |
| q_hi = df[cols].quantile(upper) | |
| return df.assign(**{c: df[c].clip(q_low[c], q_hi[c]) for c in cols}) | |
| def groupwise_z(df: pd.DataFrame, cols, group_col="pitch_type"): | |
| df = df.copy() | |
| def _z(g): | |
| return (g - g.mean()) / (g.std(ddof=0) + 1e-8) | |
| gz_cols = [] | |
| for c in cols: | |
| gz = f"{c}_gz" | |
| df[gz] = df.groupby(group_col)[c].transform(_z) | |
| gz_cols.append(gz) | |
| return df, gz_cols | |
| def _preprocessor( | |
| gz_feats: list[str], weights: Optional[Dict[str, float]] = None | |
| ) -> Pipeline: | |
| """ | |
| Consistent preprocessing for clustering and neighbor search. | |
| Applies impute -> robust scale -> standardize -> optional weights. | |
| """ | |
| steps = [ | |
| ("imputer", SimpleImputer(strategy="median")), | |
| ("robust", RobustScaler()), | |
| ("std", StandardScaler(with_mean=True, with_std=True)), | |
| ] | |
| if weights: | |
| w = np.array( | |
| [weights.get(f.replace("_gz", ""), 1.0) for f in gz_feats], dtype=float | |
| ) | |
| steps.append( | |
| ( | |
| "weights", | |
| FunctionTransformer(lambda X: X * w, feature_names_out="one-to-one"), | |
| ) | |
| ) | |
| return Pipeline(steps) | |
| # ---------- local, neighbor-aware label smoothing (kept) ---------- | |
| def _contextual_smooth_labels( | |
| Xs: np.ndarray, | |
| labels: np.ndarray, | |
| n_neighbors: int = 15, | |
| vote_thresh: float = 0.6, | |
| margin: float = 0.0, | |
| max_iters: int = 2, | |
| ) -> np.ndarray: | |
| """ | |
| Reassign labels by local kNN majority with a confidence threshold. | |
| - vote_thresh: minimum fraction of neighbors that must agree to flip (e.g., 0.6) | |
| - margin: require the neighbor-majority centroid to be at least 'margin' closer | |
| than the current cluster center (0.0 = no distance guard) | |
| """ | |
| n = len(labels) | |
| labels = labels.copy() | |
| knn = NearestNeighbors(n_neighbors=min(n, n_neighbors + 1), metric="manhattan").fit( | |
| Xs | |
| ) | |
| dists, idxs = knn.kneighbors(Xs) | |
| def centroids(lbls): | |
| Cs = [] | |
| for k in np.unique(lbls): | |
| Cs.append(Xs[lbls == k].mean(axis=0)) | |
| return {k: c for k, c in zip(np.unique(lbls), Cs)} | |
| for _ in range(max_iters): | |
| C = centroids(labels) | |
| changed = 0 | |
| for i in range(n): | |
| neigh = idxs[i][1:] # drop self | |
| neigh_lbls = labels[neigh] | |
| vals, counts = np.unique(neigh_lbls, return_counts=True) | |
| j = np.argmax(counts) | |
| maj_label, maj_frac = vals[j], counts[j] / len(neigh_lbls) | |
| if maj_frac < vote_thresh or maj_label == labels[i]: | |
| continue | |
| if margin > 0.0: | |
| cur_c = C[labels[i]] | |
| maj_c = C[maj_label] | |
| di_cur = np.linalg.norm(Xs[i] - cur_c) | |
| di_maj = np.linalg.norm(Xs[i] - maj_c) | |
| if di_maj >= di_cur - margin: | |
| continue | |
| labels[i] = maj_label | |
| changed += 1 | |
| if changed == 0: | |
| break | |
| return labels | |
| # ---------- API: fit + comps (drop-in) ---------- | |
| def fit_kmeans(df_feat: pd.DataFrame, k: int = 20, random_state: int = 42): | |
| """ | |
| DROP-IN REPLACEMENT: | |
| - Uses K-MEDOIDS with MANHATTAN distance (closest-neighbor–friendly). | |
| - Returns (df_with_clusters, scaler_pipeline, kmedoids_model, knn_index). | |
| """ | |
| df = df_feat.dropna(subset=ARCH_FEATURES).copy() | |
| # Light winsorization: dampen outliers without warping scale | |
| df[ARCH_FEATURES] = df[ARCH_FEATURES].clip( | |
| df[ARCH_FEATURES].quantile(0.01), | |
| df[ARCH_FEATURES].quantile(0.99), | |
| axis=1, | |
| ) | |
| # Consistent preprocessing for clustering and neighbors | |
| scaler = _preprocessor(ARCH_FEATURES, weights=None) | |
| Xs = scaler.fit_transform(df[ARCH_FEATURES].values) | |
| # K-Medoids with Manhattan distance -> emphasizes true nearest relationships | |
| km = KMedoids( | |
| n_clusters=k, | |
| metric="manhattan", | |
| init="k-medoids++", | |
| max_iter=500, | |
| random_state=random_state, | |
| ) | |
| labels = km.fit_predict(Xs) | |
| df["cluster"] = labels | |
| # NN index in the SAME space & metric | |
| nn = NearestNeighbors(n_neighbors=8, metric="manhattan").fit(Xs) | |
| return df, scaler, km, nn | |
| def nearest_comps( | |
| row: pd.Series, | |
| df_fit: pd.DataFrame, | |
| scaler: Pipeline, | |
| nn: NearestNeighbors, | |
| within_pitch_type: bool = True, | |
| k: int = 6, | |
| ): | |
| """ | |
| Nearest comps in the SAME preprocessed space and metric (Manhattan). | |
| If within_pitch_type=True, restricts candidates to the same pitch_type. | |
| """ | |
| # Ensure all required features exist | |
| missing = [c for c in ARCH_FEATURES if c not in df_fit.columns] | |
| if missing: | |
| raise KeyError(f"nearest_comps: df_fit is missing required features: {missing}") | |
| # Query vector in the exact same space as clustering | |
| xq = scaler.transform(row[ARCH_FEATURES].values.reshape(1, -1)) | |
| # Columns to return | |
| cols = [ | |
| "player_name", | |
| "pitch_type", | |
| "p_throws", | |
| "velo", | |
| "ivb_in", | |
| "hb_as_in", | |
| "whiff_rate", | |
| "gb_rate", | |
| "cluster", | |
| ] | |
| # Per-pitch-type neighborhood (preferred) | |
| if within_pitch_type and "pitch_type" in df_fit.columns: | |
| ptype = row.get("pitch_type") | |
| if isinstance(ptype, str): | |
| sub = df_fit[df_fit["pitch_type"] == ptype].copy() | |
| if not sub.empty: | |
| Xsub = scaler.transform(sub[ARCH_FEATURES].values) | |
| k_loc = min(len(sub), max(2, k + 1)) # +1 to allow excluding self | |
| knn_local = NearestNeighbors(n_neighbors=k_loc, metric="manhattan").fit( | |
| Xsub | |
| ) | |
| dists, inds = knn_local.kneighbors(xq, n_neighbors=k_loc) | |
| cand = sub.iloc[inds[0]].copy() | |
| cand["_dist"] = dists[0] | |
| # Prefer excluding the same player if present | |
| pname = row.get("player_name", None) | |
| if pname is not None and "player_name" in cand.columns: | |
| cand = cand[cand["player_name"] != pname] | |
| return ( | |
| cand.sort_values("_dist") | |
| .drop(columns=["_dist"], errors="ignore")[cols] | |
| .head(k) | |
| ) | |
| # Global fallback: use provided NN (already fit in Manhattan space) | |
| k_glob = min(len(df_fit), max(2, k + 1)) | |
| dists, inds = nn.kneighbors(xq, n_neighbors=k_glob) | |
| cand = df_fit.iloc[inds[0]].copy() | |
| if within_pitch_type and "pitch_type" in df_fit.columns: | |
| ptype = row.get("pitch_type") | |
| if isinstance(ptype, str): | |
| cand = cand[cand["pitch_type"] == ptype] | |
| pname = row.get("player_name", None) | |
| if pname is not None and "player_name" in cand.columns: | |
| cand = cand[cand["player_name"] != pname] | |
| cand["_dist"] = dists[0][: len(cand)] if len(dists[0]) >= len(cand) else 0.0 | |
| return ( | |
| cand.sort_values("_dist").drop(columns=["_dist"], errors="ignore")[cols].head(k) | |
| ) | |
| # Make public API explicit (unchanged) | |
| __all__ = ["ARCH_FEATURES", "fit_kmeans", "nearest_comps"] | |