from __future__ import annotations import argparse import json import os from datetime import datetime from pathlib import Path from typing import Dict, List import pandas as pd from data.catalog_loader import load_catalog from data.train_loader import load_train, save_label_resolution_report from eval.metrics import recall_at_k, mrr_at_k from recommenders.dummy_random import DummyRandomRecommender from recommenders.bm25 import BM25Recommender from recommenders.vector_recommender import VectorRecommender from recommenders.hybrid_rrf import HybridRRFRecommender, HybridRerankRecommender from recommenders.hybrid_rrf_lgbm import HybridRRFLGBMRecommender from retrieval.vector_index import VectorIndex from models.embedding_model import EmbeddingModel from rerankers.cross_encoder import CrossEncoderReranker from rerankers.lgbm_reranker import LGBMReranker from retrieval.query_rewriter import rewrite_query def split_examples(examples, val_ratio=0.2, seed=42): import random rnd = random.Random(seed) shuffled = examples[:] rnd.shuffle(shuffled) cut = int(len(shuffled) * (1 - val_ratio)) return shuffled[:cut], shuffled[cut:] def run_eval(catalog_path: str, train_path: str, recommender_name: str, out_dir: str, seed: int = 42): df_catalog, catalog_by_id, id_by_url = load_catalog(catalog_path) examples, label_report = load_train(train_path, id_by_url) save_label_resolution_report(label_report, Path(out_dir) / "label_resolution_report.json") train_split, val_split = split_examples(examples, val_ratio=0.2, seed=seed) def make_recommender(): if recommender_name == "dummy_random": return DummyRandomRecommender(df_catalog["assessment_id"].tolist(), seed=seed) if recommender_name == "bm25": return BM25Recommender(df_catalog) if recommender_name == "vector": # Expect doc_text present in df_catalog and provided index/ids/model via env/args; set below in main() raise RuntimeError("Vector recommender should be constructed in main with index and ids.") raise ValueError(f"Unknown recommender: {recommender_name}") recommender = make_recommender() def eval_split(split, split_name): preds_list: List[List[str]] = [] gt_list: List[set] = [] rows = [] for ex in split: preds_raw = recommender.recommend(ex.query, k=10) preds = [] for pr in preds_raw: if isinstance(pr, str): preds.append(pr) elif isinstance(pr, dict) and "assessment_id" in pr: preds.append(pr["assessment_id"]) preds = preds[:10] preds_list.append(preds) gt_list.append(ex.relevant_ids) hits = len(set(preds).intersection(ex.relevant_ids)) rows.append( { "query": ex.query, "relevant_ids": list(ex.relevant_ids), "predicted_ids": preds, "hits": hits, } ) recall10 = sum(recall_at_k(g, p, 10) for g, p in zip(gt_list, preds_list)) / len(gt_list) if gt_list else 0.0 recall5 = sum(recall_at_k(g, p, 5) for g, p in zip(gt_list, preds_list)) / len(gt_list) if gt_list else 0.0 mrr10 = sum(mrr_at_k(g, p, 10) for g, p in zip(gt_list, preds_list)) / len(gt_list) if gt_list else 0.0 return recall10, recall5, mrr10, rows train_r10, train_r5, train_mrr10, train_rows = eval_split(train_split, "train") val_r10, val_r5, val_mrr10, val_rows = eval_split(val_split, "val") Path(out_dir).mkdir(parents=True, exist_ok=True) metrics = { "recommender": recommender_name, "label_match_pct": label_report.get("matched_pct"), "train": {"recall@10": train_r10, "recall@5": train_r5, "mrr@10": train_mrr10, "n": len(train_split)}, "val": {"recall@10": val_r10, "recall@5": val_r5, "mrr@10": val_mrr10, "n": len(val_split)}, } with open(Path(out_dir) / "metrics.json", "w") as f: json.dump(metrics, f, indent=2) pd.DataFrame(train_rows + val_rows).to_json(Path(out_dir) / "per_query_results.jsonl", orient="records", lines=True) worst = sorted(val_rows, key=lambda r: r["hits"])[:10] pd.DataFrame(worst).to_csv(Path(out_dir) / "worst_queries.csv", index=False) def main(): parser = argparse.ArgumentParser() parser.add_argument("--catalog", default="data/catalog.jsonl") parser.add_argument("--train", required=True) parser.add_argument("--recommender", default="dummy_random") parser.add_argument("--out-dir", default=None) parser.add_argument("--seed", type=int, default=42) parser.add_argument("--vector-index", type=str, help="Path to FAISS index (for recommender=vector/hybrid_rrf)") parser.add_argument("--assessment-ids", type=str, help="Path to assessment_ids.json aligned with embeddings/index") parser.add_argument("--model", type=str, default="sentence-transformers/all-MiniLM-L6-v2", help="Embedding model for vector recommender") parser.add_argument("--topn-candidates", type=int, default=200, help="Top-N candidates to retrieve before fusion/rerank") parser.add_argument("--rrf-k", type=int, default=60, help="RRF smoothing constant") parser.add_argument("--reranker-model", type=str, default="cross-encoder/ms-marco-MiniLM-L-6-v2", help="Cross-encoder model for reranking") parser.add_argument("--lgbm-model", type=str, help="Path to trained LGBM model (for hybrid_rrf_lgbm)") parser.add_argument("--lgbm-features", type=str, help="Path to feature_schema.json for LGBM reranker") parser.add_argument("--use-rewriter", action="store_true", help="Rewrite queries before retrieval/rerank.") parser.add_argument("--vocab", type=str, help="Optional vocab JSON for rewriter boosts.") args = parser.parse_args() run_id = datetime.utcnow().strftime("%Y%m%d_%H%M%S") out_dir = args.out_dir or f"runs/{run_id}_{args.recommender}" if args.recommender in {"vector", "hybrid_rrf", "hybrid_rrf_rerank", "hybrid_rrf_lgbm"}: if not args.vector_index or not args.assessment_ids: raise ValueError("Vector/hybrid recommender requires --vector-index and --assessment-ids") df_catalog, _, id_by_url = load_catalog(args.catalog) with open(args.assessment_ids) as f: ids = json.load(f) index = VectorIndex.load(args.vector_index) embed_model = EmbeddingModel(args.model) examples, label_report = load_train(args.train, id_by_url) Path(out_dir).mkdir(parents=True, exist_ok=True) save_label_resolution_report(label_report, Path(out_dir) / "label_resolution_report.json") vocab = {} if args.use_rewriter and args.vocab: with open(args.vocab) as f: vocab = json.load(f) train_split, val_split = split_examples(examples, val_ratio=0.2, seed=args.seed) vector_rec = VectorRecommender(embed_model, index, df_catalog, ids, k_candidates=args.topn_candidates) if args.recommender == "vector": recommender = vector_rec elif args.recommender == "hybrid_rrf": bm25_rec = BM25Recommender(df_catalog) recommender = HybridRRFRecommender(bm25_rec, vector_rec, topn_candidates=args.topn_candidates, rrf_k=args.rrf_k) elif args.recommender == "hybrid_rrf_rerank": bm25_rec = BM25Recommender(df_catalog) reranker = CrossEncoderReranker(model_name=args.reranker_model) recommender = HybridRerankRecommender( bm25_rec, vector_rec, reranker, df_catalog, topn_candidates=args.topn_candidates, rrf_k=args.rrf_k, ) else: if not args.lgbm_model or not args.lgbm_features: raise ValueError("hybrid_rrf_lgbm requires --lgbm-model and --lgbm-features") bm25_rec = BM25Recommender(df_catalog) feature_cols = json.load(open(args.lgbm_features)) if isinstance(feature_cols, dict) and "features" in feature_cols: feature_cols = feature_cols["features"] recommender = HybridRRFLGBMRecommender( bm25_rec, vector_rec, lgbm_model_path=args.lgbm_model, feature_cols=feature_cols, catalog_df=df_catalog, topn_candidates=args.topn_candidates, rrf_k=args.rrf_k, ) def eval_split(split, split_name): preds_list = [] gt_list = [] rows = [] for ex in split: retrieval_query = ex.query rerank_query = ex.query if args.use_rewriter: rw = rewrite_query(ex.query, catalog_vocab=vocab) retrieval_query = rw.retrieval_query rerank_query = rw.rerank_query if args.recommender == "hybrid_rrf_rerank": preds_raw = recommender.recommend(retrieval_query, k=10, rerank_query=rerank_query) else: preds_raw = recommender.recommend(retrieval_query, k=10) preds = [] for pr in preds_raw: if isinstance(pr, str): preds.append(pr) elif isinstance(pr, dict) and "assessment_id" in pr: preds.append(pr["assessment_id"]) preds = preds[:10] preds_list.append(preds) gt_list.append(ex.relevant_ids) hits = len(set(preds).intersection(ex.relevant_ids)) rows.append( { "query": ex.query, "relevant_ids": list(ex.relevant_ids), "predicted_ids": preds, "hits": hits, } ) recall10 = sum(recall_at_k(g, p, 10) for g, p in zip(gt_list, preds_list)) / len(gt_list) if gt_list else 0.0 recall5 = sum(recall_at_k(g, p, 5) for g, p in zip(gt_list, preds_list)) / len(gt_list) if gt_list else 0.0 mrr10 = sum(mrr_at_k(g, p, 10) for g, p in zip(gt_list, preds_list)) / len(gt_list) if gt_list else 0.0 return recall10, recall5, mrr10, rows train_r10, train_r5, train_mrr10, train_rows = eval_split(train_split, "train") val_r10, val_r5, val_mrr10, val_rows = eval_split(val_split, "val") metrics = { "recommender": args.recommender, "label_match_pct": label_report.get("matched_pct"), "train": {"recall@10": train_r10, "recall@5": train_r5, "mrr@10": train_mrr10, "n": len(train_split)}, "val": {"recall@10": val_r10, "recall@5": val_r5, "mrr@10": val_mrr10, "n": len(val_split)}, "config": { "topn_candidates": args.topn_candidates, "rrf_k": args.rrf_k, "model": args.model, "index": args.vector_index, }, } with open(Path(out_dir) / "metrics.json", "w") as f: json.dump(metrics, f, indent=2) pd.DataFrame(train_rows + val_rows).to_json(Path(out_dir) / "per_query_results.jsonl", orient="records", lines=True) worst = sorted(val_rows, key=lambda r: r["hits"])[:10] pd.DataFrame(worst).to_csv(Path(out_dir) / "worst_queries.csv", index=False) print(f"Run saved to {out_dir}") else: run_eval(args.catalog, args.train, args.recommender, out_dir, seed=args.seed) print(f"Run saved to {out_dir}") if __name__ == "__main__": main()