from __future__ import annotations import numpy as np import pandas as pd from typing import Dict, Optional, Tuple from sklearn.impute import SimpleImputer from sklearn.preprocessing import RobustScaler, StandardScaler, FunctionTransformer from sklearn.pipeline import Pipeline from sklearn.neighbors import NearestNeighbors # NEW: medoids (robust, nearest-exemplar clustering) from sklearn_extra.cluster import KMedoids ARCH_FEATURES = [ "velo", "ivb_in", "hb_as_in", "rel_height", "rel_side", "spin", "csw", "whiff_rate", "gb_rate", "zone_pct", ] # ---------- existing helpers (unchanged API) ---------- def winsorize_df(df: pd.DataFrame, cols, lower=0.01, upper=0.99): q_low = df[cols].quantile(lower) q_hi = df[cols].quantile(upper) return df.assign(**{c: df[c].clip(q_low[c], q_hi[c]) for c in cols}) def groupwise_z(df: pd.DataFrame, cols, group_col="pitch_type"): df = df.copy() def _z(g): return (g - g.mean()) / (g.std(ddof=0) + 1e-8) gz_cols = [] for c in cols: gz = f"{c}_gz" df[gz] = df.groupby(group_col)[c].transform(_z) gz_cols.append(gz) return df, gz_cols def _preprocessor( gz_feats: list[str], weights: Optional[Dict[str, float]] = None ) -> Pipeline: """ Consistent preprocessing for clustering and neighbor search. Applies impute -> robust scale -> standardize -> optional weights. """ steps = [ ("imputer", SimpleImputer(strategy="median")), ("robust", RobustScaler()), ("std", StandardScaler(with_mean=True, with_std=True)), ] if weights: w = np.array( [weights.get(f.replace("_gz", ""), 1.0) for f in gz_feats], dtype=float ) steps.append( ( "weights", FunctionTransformer(lambda X: X * w, feature_names_out="one-to-one"), ) ) return Pipeline(steps) # ---------- local, neighbor-aware label smoothing (kept) ---------- def _contextual_smooth_labels( Xs: np.ndarray, labels: np.ndarray, n_neighbors: int = 15, vote_thresh: float = 0.6, margin: float = 0.0, max_iters: int = 2, ) -> np.ndarray: """ Reassign labels by local kNN majority with a confidence threshold. - vote_thresh: minimum fraction of neighbors that must agree to flip (e.g., 0.6) - margin: require the neighbor-majority centroid to be at least 'margin' closer than the current cluster center (0.0 = no distance guard) """ n = len(labels) labels = labels.copy() knn = NearestNeighbors(n_neighbors=min(n, n_neighbors + 1), metric="manhattan").fit( Xs ) dists, idxs = knn.kneighbors(Xs) def centroids(lbls): Cs = [] for k in np.unique(lbls): Cs.append(Xs[lbls == k].mean(axis=0)) return {k: c for k, c in zip(np.unique(lbls), Cs)} for _ in range(max_iters): C = centroids(labels) changed = 0 for i in range(n): neigh = idxs[i][1:] # drop self neigh_lbls = labels[neigh] vals, counts = np.unique(neigh_lbls, return_counts=True) j = np.argmax(counts) maj_label, maj_frac = vals[j], counts[j] / len(neigh_lbls) if maj_frac < vote_thresh or maj_label == labels[i]: continue if margin > 0.0: cur_c = C[labels[i]] maj_c = C[maj_label] di_cur = np.linalg.norm(Xs[i] - cur_c) di_maj = np.linalg.norm(Xs[i] - maj_c) if di_maj >= di_cur - margin: continue labels[i] = maj_label changed += 1 if changed == 0: break return labels # ---------- API: fit + comps (drop-in) ---------- def fit_kmeans(df_feat: pd.DataFrame, k: int = 20, random_state: int = 42): """ DROP-IN REPLACEMENT: - Uses K-MEDOIDS with MANHATTAN distance (closest-neighbor–friendly). - Returns (df_with_clusters, scaler_pipeline, kmedoids_model, knn_index). """ df = df_feat.dropna(subset=ARCH_FEATURES).copy() # Light winsorization: dampen outliers without warping scale df[ARCH_FEATURES] = df[ARCH_FEATURES].clip( df[ARCH_FEATURES].quantile(0.01), df[ARCH_FEATURES].quantile(0.99), axis=1, ) # Consistent preprocessing for clustering and neighbors scaler = _preprocessor(ARCH_FEATURES, weights=None) Xs = scaler.fit_transform(df[ARCH_FEATURES].values) # K-Medoids with Manhattan distance -> emphasizes true nearest relationships km = KMedoids( n_clusters=k, metric="manhattan", init="k-medoids++", max_iter=500, random_state=random_state, ) labels = km.fit_predict(Xs) df["cluster"] = labels # NN index in the SAME space & metric nn = NearestNeighbors(n_neighbors=8, metric="manhattan").fit(Xs) return df, scaler, km, nn def nearest_comps( row: pd.Series, df_fit: pd.DataFrame, scaler: Pipeline, nn: NearestNeighbors, within_pitch_type: bool = True, k: int = 6, ): """ Nearest comps in the SAME preprocessed space and metric (Manhattan). If within_pitch_type=True, restricts candidates to the same pitch_type. """ # Ensure all required features exist missing = [c for c in ARCH_FEATURES if c not in df_fit.columns] if missing: raise KeyError(f"nearest_comps: df_fit is missing required features: {missing}") # Query vector in the exact same space as clustering xq = scaler.transform(row[ARCH_FEATURES].values.reshape(1, -1)) # Columns to return cols = [ "player_name", "pitch_type", "p_throws", "velo", "ivb_in", "hb_as_in", "whiff_rate", "gb_rate", "cluster", ] # Per-pitch-type neighborhood (preferred) if within_pitch_type and "pitch_type" in df_fit.columns: ptype = row.get("pitch_type") if isinstance(ptype, str): sub = df_fit[df_fit["pitch_type"] == ptype].copy() if not sub.empty: Xsub = scaler.transform(sub[ARCH_FEATURES].values) k_loc = min(len(sub), max(2, k + 1)) # +1 to allow excluding self knn_local = NearestNeighbors(n_neighbors=k_loc, metric="manhattan").fit( Xsub ) dists, inds = knn_local.kneighbors(xq, n_neighbors=k_loc) cand = sub.iloc[inds[0]].copy() cand["_dist"] = dists[0] # Prefer excluding the same player if present pname = row.get("player_name", None) if pname is not None and "player_name" in cand.columns: cand = cand[cand["player_name"] != pname] return ( cand.sort_values("_dist") .drop(columns=["_dist"], errors="ignore")[cols] .head(k) ) # Global fallback: use provided NN (already fit in Manhattan space) k_glob = min(len(df_fit), max(2, k + 1)) dists, inds = nn.kneighbors(xq, n_neighbors=k_glob) cand = df_fit.iloc[inds[0]].copy() if within_pitch_type and "pitch_type" in df_fit.columns: ptype = row.get("pitch_type") if isinstance(ptype, str): cand = cand[cand["pitch_type"] == ptype] pname = row.get("player_name", None) if pname is not None and "player_name" in cand.columns: cand = cand[cand["player_name"] != pname] cand["_dist"] = dists[0][: len(cand)] if len(dists[0]) >= len(cand) else 0.0 return ( cand.sort_values("_dist").drop(columns=["_dist"], errors="ignore")[cols].head(k) ) # Make public API explicit (unchanged) __all__ = ["ARCH_FEATURES", "fit_kmeans", "nearest_comps"]