import os import joblib import warnings import numpy as np import pandas as pd from typing import List, Tuple, Sequence, Optional from xgboost import XGBRanker from sklearn.model_selection import train_test_split from sklearn.metrics import ndcg_score from pandas.api.types import is_numeric_dtype from .feature import FEATURE_COLS from datetime import datetime # ----------------------------- Helpers ----------------------------- def _pick_feature_cols(df: pd.DataFrame, drop_cols: Sequence[str]) -> List[str]: """ Pick numeric feature columns robustly, excluding drop_cols. Uses pandas is_numeric_dtype to correctly include nullable ints/floats/bools. """ cols = [] for c in df.columns: if c in drop_cols: continue if is_numeric_dtype(df[c]): cols.append(c) return cols def _sort_and_pack_by_qid( X: pd.DataFrame, y: pd.Series, qid: pd.Series, feature_cols: List[str] ) -> Tuple[pd.DataFrame, np.ndarray, List[int], np.ndarray]: """ Sort rows by qid so that group sizes match the sample order. Returns: X_sorted, y_sorted, groups, qid_sorted (aligned with X_sorted/y_sorted) """ packed = X.copy() packed["_label"] = y.values packed["_qid"] = qid.values packed = packed.sort_values("_qid").reset_index(drop=True) groups = packed.groupby("_qid").size().tolist() X_sorted = packed[feature_cols].copy() y_sorted = packed["_label"].astype(float).values qid_sorted = packed["_qid"].values return X_sorted, y_sorted, groups, qid_sorted def _eval_mean_ndcg( model: XGBRanker, X_val: pd.DataFrame, y_val, # can be np.ndarray or pd.Series qid_val, # aligned with X_val/y_val ks: Sequence[int] = (5, 10), ) -> dict: """ Compute mean NDCG@k for each k in ks over validation queries. Accepts numpy arrays or pandas Series. """ # Try to respect early-stopping best iteration if available (xgboost>=2.0) try: preds = model.predict(X_val, iteration_range=(0, model.best_iteration + 1)) except Exception: preds = model.predict(X_val) y_arr = np.asarray(y_val) q_arr = np.asarray(qid_val) out = {} for k in ks: ndcgs = [] for q in np.unique(q_arr): mask = (q_arr == q) if mask.sum() < 2: continue ndcgs.append(ndcg_score([y_arr[mask]], [preds[mask]], k=k)) out[f"NDCG@{k}"] = float(np.mean(ndcgs)) if ndcgs else 0.0 return out # ----------------------------- Main Trainer ----------------------------- def train_model_ranker( user_id: str = "user_1", features_path: Optional[str] = None, save_model: bool = True, model_params: Optional[dict] = None, val_ratio: float = 0.2, random_state: int = 42, max_rows: Optional[int] = None, ): """ Train an XGBoost Learning-to-Rank model (XGBRanker) on cold-start generated data. Expected input CSV (from cold_start.py): - qid: query id (one round of pantry sampling = one query) - relevance: graded relevance label (e.g., 3/2/1/0) - features: numeric columns produced by build_features (and any extra numeric signals) The function: 1) Reads the CSV 2) Selects numeric feature columns robustly 3) Splits train/val by qid to avoid leakage 4) Sorts each split by qid and builds group sizes aligned to sample order 5) Trains XGBRanker and reports NDCG@5/10 6) Saves model to user_data//ranker.pkl """ base_dir = os.path.join("recipe_recommendation", "user_data", user_id) os.makedirs(base_dir, exist_ok=True) # Resolve features path if features_path is None: features_path = os.path.join(base_dir, "user_features_rank.csv") if not os.path.exists(features_path): raise FileNotFoundError( f"[train_model_ranker] Cold-start features not found at: {features_path}\n" f"Please run cold_start_ranker(user_id='{user_id}') first." ) # Load data df = pd.read_csv(features_path) if max_rows is not None and len(df) > max_rows: df = df.sample(max_rows, random_state=random_state).reset_index(drop=True) # Basic validation if "qid" not in df.columns or "relevance" not in df.columns: raise ValueError("Input CSV must contain 'qid' and 'relevance' columns.") # Fill NaNs in label/qid (should not happen, but defensive) df["qid"] = pd.to_numeric(df["qid"], errors="coerce").fillna(-1).astype(int) df["relevance"] = pd.to_numeric(df["relevance"], errors="coerce").fillna(0).astype(float) # Pick numeric feature columns robustly feature_cols = FEATURE_COLS.copy() df = df.reindex(columns=["qid", "relevance"] + feature_cols, fill_value=0) # Ensure numeric + finite values only (replace inf/nan with 0) df[feature_cols] = df[feature_cols].apply(pd.to_numeric, errors="coerce") df[feature_cols] = df[feature_cols].replace([np.inf, -np.inf], np.nan).fillna(0.0) # Split by qid to avoid leakage across queries unique_qids = df["qid"].unique() if len(unique_qids) < 2: warnings.warn("Only one unique qid found — ranking training may be ineffective.") train_mask = np.ones(len(df), dtype=bool) val_mask = np.zeros(len(df), dtype=bool) else: train_qids, val_qids = train_test_split( unique_qids, test_size=val_ratio, random_state=random_state ) train_mask = df["qid"].isin(train_qids) val_mask = df["qid"].isin(val_qids) # Split dataframes AFTER defining masks X_train_raw = df.loc[train_mask, feature_cols] y_train_raw = df.loc[train_mask, "relevance"] qid_train = df.loc[train_mask, "qid"] X_val_raw = df.loc[val_mask, feature_cols] y_val_raw = df.loc[val_mask, "relevance"] qid_val = df.loc[val_mask, "qid"] # Sort by qid and build group sizes aligned with sample order (CRITICAL for XGBRanker) X_train, y_train, group_train, _ = _sort_and_pack_by_qid( X_train_raw, y_train_raw, qid_train, feature_cols ) X_val, y_val, group_val, qid_val_sorted = _sort_and_pack_by_qid( X_val_raw, y_val_raw, qid_val, feature_cols ) print(f"[ranker] #Train groups: {len(group_train)} | #Val groups: {len(group_val)}") print(f"[ranker] Train rows: {len(X_train)} | Val rows: {len(X_val)} | #Features: {len(feature_cols)}") # Default model params default_params = dict( objective="rank:ndcg", eval_metric="ndcg", n_estimators=400, learning_rate=0.08, max_depth=6, subsample=0.8, colsample_bytree=0.8, random_state=random_state, tree_method="hist", reg_lambda=1.0, reg_alpha=0.0, ) if model_params: default_params.update(model_params) model = XGBRanker(**default_params) # Fit model (XGBRanker requires group/group for eval_set as well) fit_kwargs = dict( X=X_train, y=y_train, group=group_train, eval_set=[(X_val, y_val)], eval_group=[group_val], verbose=False, ) try: # Newer xgboost versions (some builds) support early_stopping_rounds on Ranker model.fit(early_stopping_rounds=50, **fit_kwargs) # maximize=True is inferred by 'ndcg' except TypeError: # Fallback to callback API (older versions) try: from xgboost.callback import EarlyStopping model.fit(callbacks=[EarlyStopping(rounds=50, save_best=True, maximize=True)], **fit_kwargs) except Exception: # Last resort: train without early stopping model.fit(**fit_kwargs) # Evaluate mean NDCG@5/10 metrics = _eval_mean_ndcg(model, X_val, y_val, qid_val_sorted, ks=(5, 10)) print("[ranker] Validation metrics:", " ".join(f"{k}={v:.4f}" for k, v in metrics.items())) # Evaluate mean NDCG@5/10 metrics = _eval_mean_ndcg(model, X_val, y_val, qid_val_sorted, ks=(5, 10)) print("[ranker] Validation metrics:", " ".join(f"{k}={v:.4f}" for k, v in metrics.items())) # === Save NDCG metrics to log === from datetime import datetime log_path = os.path.join(base_dir, "training_log.txt") with open(log_path, "a", encoding="utf-8") as f: ndcg5 = metrics.get("NDCG@5", 0.0) ndcg10 = metrics.get("NDCG@10", 0.0) f.write(f"{datetime.now().isoformat()} | NDCG@5={ndcg5:.4f}, NDCG@10={ndcg10:.4f}\n") print(f"[ranker] Logged metrics to {log_path}") # Save model model_path = os.path.join(base_dir, "ranker.pkl") joblib.dump(model, model_path) print(f"[ranker] Model saved to {model_path}") # Save model if save_model: model_path = os.path.join(base_dir, "ranker.pkl") joblib.dump(model, model_path) print(f"[ranker] Model saved to {model_path}") return model, metrics, feature_cols if __name__ == "__main__": # Example run train_model_ranker( user_id="user_1", save_model=True, val_ratio=0.2, random_state=42, max_rows=None, # or set an upper bound for quick iterations, e.g., 200_000 model_params=None, # override defaults if desired )