Spaces:
Sleeping
Sleeping
| import os | |
| import joblib | |
| import warnings | |
| import numpy as np | |
| import pandas as pd | |
| from typing import List, Tuple, Sequence, Optional | |
| from xgboost import XGBRanker | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import ndcg_score | |
| from pandas.api.types import is_numeric_dtype | |
| from .feature import FEATURE_COLS | |
| from datetime import datetime | |
| # ----------------------------- Helpers ----------------------------- | |
| def _pick_feature_cols(df: pd.DataFrame, drop_cols: Sequence[str]) -> List[str]: | |
| """ | |
| Pick numeric feature columns robustly, excluding drop_cols. | |
| Uses pandas is_numeric_dtype to correctly include nullable ints/floats/bools. | |
| """ | |
| cols = [] | |
| for c in df.columns: | |
| if c in drop_cols: | |
| continue | |
| if is_numeric_dtype(df[c]): | |
| cols.append(c) | |
| return cols | |
| def _sort_and_pack_by_qid( | |
| X: pd.DataFrame, y: pd.Series, qid: pd.Series, feature_cols: List[str] | |
| ) -> Tuple[pd.DataFrame, np.ndarray, List[int], np.ndarray]: | |
| """ | |
| Sort rows by qid so that group sizes match the sample order. | |
| Returns: | |
| X_sorted, y_sorted, groups, qid_sorted (aligned with X_sorted/y_sorted) | |
| """ | |
| packed = X.copy() | |
| packed["_label"] = y.values | |
| packed["_qid"] = qid.values | |
| packed = packed.sort_values("_qid").reset_index(drop=True) | |
| groups = packed.groupby("_qid").size().tolist() | |
| X_sorted = packed[feature_cols].copy() | |
| y_sorted = packed["_label"].astype(float).values | |
| qid_sorted = packed["_qid"].values | |
| return X_sorted, y_sorted, groups, qid_sorted | |
| def _eval_mean_ndcg( | |
| model: XGBRanker, | |
| X_val: pd.DataFrame, | |
| y_val, # can be np.ndarray or pd.Series | |
| qid_val, # aligned with X_val/y_val | |
| ks: Sequence[int] = (5, 10), | |
| ) -> dict: | |
| """ | |
| Compute mean NDCG@k for each k in ks over validation queries. | |
| Accepts numpy arrays or pandas Series. | |
| """ | |
| # Try to respect early-stopping best iteration if available (xgboost>=2.0) | |
| try: | |
| preds = model.predict(X_val, iteration_range=(0, model.best_iteration + 1)) | |
| except Exception: | |
| preds = model.predict(X_val) | |
| y_arr = np.asarray(y_val) | |
| q_arr = np.asarray(qid_val) | |
| out = {} | |
| for k in ks: | |
| ndcgs = [] | |
| for q in np.unique(q_arr): | |
| mask = (q_arr == q) | |
| if mask.sum() < 2: | |
| continue | |
| ndcgs.append(ndcg_score([y_arr[mask]], [preds[mask]], k=k)) | |
| out[f"NDCG@{k}"] = float(np.mean(ndcgs)) if ndcgs else 0.0 | |
| return out | |
| # ----------------------------- Main Trainer ----------------------------- | |
| def train_model_ranker( | |
| user_id: str = "user_1", | |
| features_path: Optional[str] = None, | |
| save_model: bool = True, | |
| model_params: Optional[dict] = None, | |
| val_ratio: float = 0.2, | |
| random_state: int = 42, | |
| max_rows: Optional[int] = None, | |
| ): | |
| """ | |
| Train an XGBoost Learning-to-Rank model (XGBRanker) on cold-start generated data. | |
| Expected input CSV (from cold_start.py): | |
| - qid: query id (one round of pantry sampling = one query) | |
| - relevance: graded relevance label (e.g., 3/2/1/0) | |
| - features: numeric columns produced by build_features (and any extra numeric signals) | |
| The function: | |
| 1) Reads the CSV | |
| 2) Selects numeric feature columns robustly | |
| 3) Splits train/val by qid to avoid leakage | |
| 4) Sorts each split by qid and builds group sizes aligned to sample order | |
| 5) Trains XGBRanker and reports NDCG@5/10 | |
| 6) Saves model to user_data/<user_id>/ranker.pkl | |
| """ | |
| base_dir = os.path.join("recipe_recommendation", "user_data", user_id) | |
| os.makedirs(base_dir, exist_ok=True) | |
| # Resolve features path | |
| if features_path is None: | |
| features_path = os.path.join(base_dir, "user_features_rank.csv") | |
| if not os.path.exists(features_path): | |
| raise FileNotFoundError( | |
| f"[train_model_ranker] Cold-start features not found at: {features_path}\n" | |
| f"Please run cold_start_ranker(user_id='{user_id}') first." | |
| ) | |
| # Load data | |
| df = pd.read_csv(features_path) | |
| if max_rows is not None and len(df) > max_rows: | |
| df = df.sample(max_rows, random_state=random_state).reset_index(drop=True) | |
| # Basic validation | |
| if "qid" not in df.columns or "relevance" not in df.columns: | |
| raise ValueError("Input CSV must contain 'qid' and 'relevance' columns.") | |
| # Fill NaNs in label/qid (should not happen, but defensive) | |
| df["qid"] = pd.to_numeric(df["qid"], errors="coerce").fillna(-1).astype(int) | |
| df["relevance"] = pd.to_numeric(df["relevance"], errors="coerce").fillna(0).astype(float) | |
| # Pick numeric feature columns robustly | |
| feature_cols = FEATURE_COLS.copy() | |
| df = df.reindex(columns=["qid", "relevance"] + feature_cols, fill_value=0) | |
| # Ensure numeric + finite values only (replace inf/nan with 0) | |
| df[feature_cols] = df[feature_cols].apply(pd.to_numeric, errors="coerce") | |
| df[feature_cols] = df[feature_cols].replace([np.inf, -np.inf], np.nan).fillna(0.0) | |
| # Split by qid to avoid leakage across queries | |
| unique_qids = df["qid"].unique() | |
| if len(unique_qids) < 2: | |
| warnings.warn("Only one unique qid found — ranking training may be ineffective.") | |
| train_mask = np.ones(len(df), dtype=bool) | |
| val_mask = np.zeros(len(df), dtype=bool) | |
| else: | |
| train_qids, val_qids = train_test_split( | |
| unique_qids, test_size=val_ratio, random_state=random_state | |
| ) | |
| train_mask = df["qid"].isin(train_qids) | |
| val_mask = df["qid"].isin(val_qids) | |
| # Split dataframes AFTER defining masks | |
| X_train_raw = df.loc[train_mask, feature_cols] | |
| y_train_raw = df.loc[train_mask, "relevance"] | |
| qid_train = df.loc[train_mask, "qid"] | |
| X_val_raw = df.loc[val_mask, feature_cols] | |
| y_val_raw = df.loc[val_mask, "relevance"] | |
| qid_val = df.loc[val_mask, "qid"] | |
| # Sort by qid and build group sizes aligned with sample order (CRITICAL for XGBRanker) | |
| X_train, y_train, group_train, _ = _sort_and_pack_by_qid( | |
| X_train_raw, y_train_raw, qid_train, feature_cols | |
| ) | |
| X_val, y_val, group_val, qid_val_sorted = _sort_and_pack_by_qid( | |
| X_val_raw, y_val_raw, qid_val, feature_cols | |
| ) | |
| print(f"[ranker] #Train groups: {len(group_train)} | #Val groups: {len(group_val)}") | |
| print(f"[ranker] Train rows: {len(X_train)} | Val rows: {len(X_val)} | #Features: {len(feature_cols)}") | |
| # Default model params | |
| default_params = dict( | |
| objective="rank:ndcg", | |
| eval_metric="ndcg", | |
| n_estimators=400, | |
| learning_rate=0.08, | |
| max_depth=6, | |
| subsample=0.8, | |
| colsample_bytree=0.8, | |
| random_state=random_state, | |
| tree_method="hist", | |
| reg_lambda=1.0, | |
| reg_alpha=0.0, | |
| ) | |
| if model_params: | |
| default_params.update(model_params) | |
| model = XGBRanker(**default_params) | |
| # Fit model (XGBRanker requires group/group for eval_set as well) | |
| fit_kwargs = dict( | |
| X=X_train, | |
| y=y_train, | |
| group=group_train, | |
| eval_set=[(X_val, y_val)], | |
| eval_group=[group_val], | |
| verbose=False, | |
| ) | |
| try: | |
| # Newer xgboost versions (some builds) support early_stopping_rounds on Ranker | |
| model.fit(early_stopping_rounds=50, **fit_kwargs) # maximize=True is inferred by 'ndcg' | |
| except TypeError: | |
| # Fallback to callback API (older versions) | |
| try: | |
| from xgboost.callback import EarlyStopping | |
| model.fit(callbacks=[EarlyStopping(rounds=50, save_best=True, maximize=True)], **fit_kwargs) | |
| except Exception: | |
| # Last resort: train without early stopping | |
| model.fit(**fit_kwargs) | |
| # Evaluate mean NDCG@5/10 | |
| metrics = _eval_mean_ndcg(model, X_val, y_val, qid_val_sorted, ks=(5, 10)) | |
| print("[ranker] Validation metrics:", " ".join(f"{k}={v:.4f}" for k, v in metrics.items())) | |
| # Evaluate mean NDCG@5/10 | |
| metrics = _eval_mean_ndcg(model, X_val, y_val, qid_val_sorted, ks=(5, 10)) | |
| print("[ranker] Validation metrics:", " ".join(f"{k}={v:.4f}" for k, v in metrics.items())) | |
| # === Save NDCG metrics to log === | |
| from datetime import datetime | |
| log_path = os.path.join(base_dir, "training_log.txt") | |
| with open(log_path, "a", encoding="utf-8") as f: | |
| ndcg5 = metrics.get("NDCG@5", 0.0) | |
| ndcg10 = metrics.get("NDCG@10", 0.0) | |
| f.write(f"{datetime.now().isoformat()} | NDCG@5={ndcg5:.4f}, NDCG@10={ndcg10:.4f}\n") | |
| print(f"[ranker] Logged metrics to {log_path}") | |
| # Save model | |
| model_path = os.path.join(base_dir, "ranker.pkl") | |
| joblib.dump(model, model_path) | |
| print(f"[ranker] Model saved to {model_path}") | |
| # Save model | |
| if save_model: | |
| model_path = os.path.join(base_dir, "ranker.pkl") | |
| joblib.dump(model, model_path) | |
| print(f"[ranker] Model saved to {model_path}") | |
| return model, metrics, feature_cols | |
| if __name__ == "__main__": | |
| # Example run | |
| train_model_ranker( | |
| user_id="user_1", | |
| save_model=True, | |
| val_ratio=0.2, | |
| random_state=42, | |
| max_rows=None, # or set an upper bound for quick iterations, e.g., 200_000 | |
| model_params=None, # override defaults if desired | |
| ) | |