"""Full evaluation harness — measures everything the rubric measures. Runs Task A and Task B (warm, cold-start, cross-domain) across N users, computes all automated metrics (RMSE, ROUGE-L, BERTScore, Hit Rate, NDCG) plus LLM-as-judge proxies for the human-eval dimensions (behavioral fidelity, contextual relevance, bridge quality), and prints a structured report. This is the baseline. Run it before any experimental change to lock the current numbers; re-run after to see what moved. Usage: # Default — 30/30/10/15 users python -m scripts.run_eval # Quick smoke test — 5/5/3/5 users (~3 min) python -m scripts.run_eval --smoke # Custom sample sizes python -m scripts.run_eval --n-task-a 50 --n-warm 50 --n-cold 20 --n-cross 25 # Skip BERTScore (slow first-time, downloads ~400MB) python -m scripts.run_eval --no-bertscore # Skip LLM-judge calls (faster, automated metrics only) python -m scripts.run_eval --no-judges # Save the report to a file (markdown) python -m scripts.run_eval --out reports/baseline_stage2b.md """ from __future__ import annotations import argparse import json import logging import time from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Optional import pandas as pd from core.config import settings from core.llm import LLMClient from core.nigerian import naija_persona_examples from core.persona import PersonaEngine, UserPersona from eval.metrics import ( rmse, mae, rouge_l, bertscore_f1, ndcg_at_k, hit_rate_at_k, mean_skipping_nan, ) from eval.judges import ( judge_behavioral_fidelity, judge_contextual_relevance, judge_bridge_quality, title_quality_rate, domain_coverage, ) from task_a_user_modeling.agent import ImpersonationAgent, ItemInput from task_b_recommender.agent import RecommendationAgent logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") log = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────────────────────── # Result containers # ────────────────────────────────────────────────────────────────────────────── @dataclass class TaskAResult: n_users: int = 0 rmse: float = float("nan") mae: float = float("nan") rouge_l: float = float("nan") bertscore_f1: float = float("nan") behavioral_fidelity: float = float("nan") n_failed: int = 0 raw: list[dict] = field(default_factory=list) @dataclass class TaskBModeResult: mode: str = "" n_users: int = 0 hit_rate_at_10: float = float("nan") ndcg_at_10: float = float("nan") title_quality: float = float("nan") domain_coverage_avg: float = float("nan") contextual_relevance: float = float("nan") bridge_quality: float = float("nan") # cross-domain only n_failed: int = 0 raw: list[dict] = field(default_factory=list) @dataclass class FullEvalReport: provider: str = "" started_at: str = "" completed_at: str = "" elapsed_seconds: float = 0.0 task_a: TaskAResult = field(default_factory=TaskAResult) task_b_warm: TaskBModeResult = field(default_factory=TaskBModeResult) task_b_cold_start: TaskBModeResult = field(default_factory=TaskBModeResult) task_b_cross_domain: TaskBModeResult = field(default_factory=TaskBModeResult) # ────────────────────────────────────────────────────────────────────────────── # User sampling # ────────────────────────────────────────────────────────────────────────────── def _load_reviews() -> pd.DataFrame: path = settings.processed_dir / "reviews.parquet" if not path.exists(): raise SystemExit(f"Reviews file not found at {path}. Run prepare_data.py first.") df = pd.read_parquet(path) log.info(f"Loaded {len(df):,} reviews") return df def _sample_users_with_test_reviews(reviews: pd.DataFrame, n: int, require_min_train: int = 3, require_min_domains: int = 1, seed: int = 42) -> list[str]: """Sample N users who have both train history and test items. Constraints applied per user: - At least `require_min_train` training reviews (enough for a persona) - At least `require_min_domains` distinct domains in training - At least 1 held-out test item """ train = reviews[reviews["split"] == "train"] test = reviews[reviews["split"] == "test"] users_with_test = set(test["user_id"].unique()) counts = (train.groupby("user_id") .agg(n_train=("rating", "size"), n_domains=("domain", "nunique")) .reset_index()) eligible = counts[ (counts["user_id"].isin(users_with_test)) & (counts["n_train"] >= require_min_train) & (counts["n_domains"] >= require_min_domains) ] if len(eligible) == 0: raise SystemExit("No eligible users found.") if len(eligible) < n: log.warning(f"Only {len(eligible)} eligible users (requested {n}); using all") n = len(eligible) sample = eligible.sample(n=n, random_state=seed) return sample["user_id"].tolist() def _sample_cross_domain_users(reviews: pd.DataFrame, n: int, seed: int = 43) -> list[str]: """Sample N users with single-domain training history (cross-domain test targets). For cross-domain mode we want users whose training history is in 1 domain so we can recommend in the OTHER domains and measure if anything they engaged with in those other domains shows up. """ train = reviews[reviews["split"] == "train"] test = reviews[reviews["split"] == "test"] users_with_test = set(test["user_id"].unique()) counts = (train.groupby("user_id") .agg(n_train=("rating", "size"), n_domains=("domain", "nunique")) .reset_index()) # users with single-domain training, at least 3 train reviews, with test items eligible = counts[ (counts["user_id"].isin(users_with_test)) & (counts["n_train"] >= 3) & (counts["n_domains"] == 1) ] if len(eligible) == 0: raise SystemExit("No eligible cross-domain users found.") if len(eligible) < n: log.warning(f"Only {len(eligible)} cross-domain users (requested {n})") n = len(eligible) return eligible.sample(n=n, random_state=seed)["user_id"].tolist() # ────────────────────────────────────────────────────────────────────────────── # Task A evaluation # ────────────────────────────────────────────────────────────────────────────── def run_task_a(reviews: pd.DataFrame, n_users: int, *, with_bertscore: bool, with_judges: bool, persona_engine: PersonaEngine, agent: ImpersonationAgent, judge_llm: LLMClient) -> TaskAResult: log.info(f"════ Task A evaluation: {n_users} users ════") user_ids = _sample_users_with_test_reviews(reviews, n_users) train = reviews[reviews["split"] == "train"] test = reviews[reviews["split"] == "test"] result = TaskAResult(n_users=0) predicted_ratings, actual_ratings = [], [] predicted_reviews, actual_reviews = [], [] fidelity_scores = [] for i, user_id in enumerate(user_ids, 1): log.info(f" [{i}/{len(user_ids)}] Task A user={user_id[:12]}...") try: persona = persona_engine.from_dataframe(user_id, train) persona = persona_engine.enrich(persona) except Exception as e: log.warning(f" Persona build failed: {e}; skipping user") result.n_failed += 1 continue user_test = test[test["user_id"] == user_id] if user_test.empty: result.n_failed += 1 continue # Pick the first test review as the target target_row = user_test.iloc[0] target_item_id = target_row["parent_asin"] target_title = target_row["title"] if "title" in target_row else "(unknown)" target_domain = target_row["domain"] actual_rating = float(target_row["rating"]) actual_review = str(target_row["text"]) # Try to look up real item metadata (description) for the agent items_path = settings.processed_dir / "items.parquet" item_description = "" item_categories = "" if items_path.exists(): items_df = pd.read_parquet(items_path) match = items_df[items_df["parent_asin"] == target_item_id] if not match.empty: row = match.iloc[0] target_title = str(row.get("title") or target_title) item_description = str(row.get("description") or "") item_categories = str(row.get("categories") or "") item_input = ItemInput( parent_asin=target_item_id, title=target_title, description=item_description, categories=item_categories, domain=target_domain, ) try: output = agent.run(persona, item=item_input) pred_rating = float(output.rating) pred_review = str(output.review) except Exception as e: log.warning(f" Generation failed: {e}; skipping user") result.n_failed += 1 continue # Auto metrics predicted_ratings.append(pred_rating) actual_ratings.append(actual_rating) predicted_reviews.append(pred_review) actual_reviews.append(actual_review) # LLM-judge fidelity = None if with_judges: try: score = judge_behavioral_fidelity( judge_llm, persona.to_prompt_block(), target_title, target_domain, pred_rating, pred_review, actual_rating, actual_review, ) fidelity_scores.append(score.score) fidelity = score.score except Exception as e: log.warning(f" Judge failed: {e}") result.raw.append({ "user_id": user_id, "item_id": target_item_id, "domain": target_domain, "actual_rating": actual_rating, "predicted_rating": pred_rating, "rouge_l": rouge_l(pred_review, actual_review), "fidelity": fidelity, }) result.n_users += 1 # Aggregate if predicted_ratings: result.rmse = rmse(predicted_ratings, actual_ratings) result.mae = mae(predicted_ratings, actual_ratings) result.rouge_l = float( sum(rouge_l(c, r) for c, r in zip(predicted_reviews, actual_reviews)) / len(predicted_reviews) ) if with_bertscore and predicted_reviews: log.info(" Computing BERTScore (may download model on first use)...") try: result.bertscore_f1 = bertscore_f1(predicted_reviews, actual_reviews) except Exception as e: log.warning(f" BERTScore failed: {e}") if fidelity_scores: result.behavioral_fidelity = float(sum(fidelity_scores) / len(fidelity_scores)) return result # ────────────────────────────────────────────────────────────────────────────── # Task B evaluation # ────────────────────────────────────────────────────────────────────────────── def run_task_b_warm(reviews: pd.DataFrame, n_users: int, *, with_judges: bool, persona_engine: PersonaEngine, agent: RecommendationAgent, judge_llm: LLMClient) -> TaskBModeResult: log.info(f"════ Task B warm evaluation: {n_users} users ════") user_ids = _sample_users_with_test_reviews(reviews, n_users, require_min_domains=1) train = reviews[reviews["split"] == "train"] test = reviews[reviews["split"] == "test"] result = TaskBModeResult(mode="warm", n_users=0) hit_rates, ndcgs, title_qualities = [], [], [] coverages, relevance_scores = [], [] for i, user_id in enumerate(user_ids, 1): log.info(f" [{i}/{len(user_ids)}] Task B warm user={user_id[:12]}...") try: persona = persona_engine.from_dataframe(user_id, train) persona = persona_engine.enrich(persona) recs = agent.run(persona, k=10, cross_domain=False) except Exception as e: log.warning(f" Failed: {e}") result.n_failed += 1 continue if not recs: result.n_failed += 1 continue recs_dicts = [r.as_dict() for r in recs] user_test = test[test["user_id"] == user_id] gt_ids = list(user_test["parent_asin"].unique()) pred_ids = [r.item_id for r in recs] hr = hit_rate_at_k(pred_ids, gt_ids, k=10) nd = ndcg_at_k(pred_ids, gt_ids, k=10) tq = title_quality_rate(recs_dicts) # Domain coverage: did we span the user's known domains? known_domains = list(persona.domains) if persona.domains else [] dc = domain_coverage(recs_dicts, known_domains) if known_domains else 0.0 normalized_dc = dc / max(1, len(known_domains)) hit_rates.append(hr) ndcgs.append(nd) title_qualities.append(tq) coverages.append(normalized_dc) relevance = None if with_judges: try: score = judge_contextual_relevance( judge_llm, persona.to_prompt_block(), recs_dicts, mode="warm", ) relevance_scores.append(score.score) relevance = score.score except Exception as e: log.warning(f" Judge failed: {e}") result.raw.append({ "user_id": user_id, "hit_rate": hr, "ndcg": nd, "title_quality": tq, "domain_coverage": normalized_dc, "relevance": relevance, }) result.n_users += 1 if hit_rates: result.hit_rate_at_10 = float(sum(hit_rates) / len(hit_rates)) result.ndcg_at_10 = float(sum(ndcgs) / len(ndcgs)) result.title_quality = float(sum(title_qualities) / len(title_qualities)) result.domain_coverage_avg = float(sum(coverages) / len(coverages)) if relevance_scores: result.contextual_relevance = float(sum(relevance_scores) / len(relevance_scores)) return result def run_task_b_cold_start(n_personas: int, *, with_judges: bool, agent: RecommendationAgent, judge_llm: LLMClient) -> TaskBModeResult: """Cold-start eval uses synthetic Naija personas (no history). No held-out test items exist for synthetic personas, so we can't compute Hit Rate or NDCG — only title quality, domain coverage, and contextual relevance (LLM-judge). """ log.info(f"════ Task B cold-start evaluation: {n_personas} personas ════") naija = naija_persona_examples() # Cycle through naija personas if n_personas > len personas_to_test = (naija * ((n_personas // len(naija)) + 1))[:n_personas] result = TaskBModeResult(mode="cold_start", n_users=0) title_qualities, coverages, relevance_scores = [], [], [] for i, demo in enumerate(personas_to_test, 1): log.info(f" [{i}/{len(personas_to_test)}] Cold-start persona={demo['name']}") persona = UserPersona( user_id=f"cold_start_{i}", n_reviews=0, avg_rating=4.0, std_rating=0.5, avg_review_length=80.0, std_review_length=20.0, verified_rate=1.0, domains=["Books"], n_domains=1, rating_distribution={4: 0.6, 5: 0.3, 3: 0.1}, top_terms=[], tone="", preferred_themes=demo["stated_preferences"], common_complaints=demo["deal_breakers"], voice_one_liner=demo["description"], history_samples=[], ) try: recs = agent.run(persona, k=10, cross_domain=False) except Exception as e: log.warning(f" Generation failed: {e}") result.n_failed += 1 continue if not recs: result.n_failed += 1 continue recs_dicts = [r.as_dict() for r in recs] tq = title_quality_rate(recs_dicts) # Expect coverage across the 3 domains since cold-start often spans interests all_domains = ["Books", "Kindle_Store", "Movies_and_TV"] dc = domain_coverage(recs_dicts, all_domains) title_qualities.append(tq) coverages.append(dc) relevance = None if with_judges: try: score = judge_contextual_relevance( judge_llm, persona.to_prompt_block(), recs_dicts, mode="cold_start", ) relevance_scores.append(score.score) relevance = score.score except Exception as e: log.warning(f" Judge failed: {e}") result.raw.append({ "persona_name": demo["name"], "title_quality": tq, "domain_coverage": dc, "relevance": relevance, }) result.n_users += 1 if title_qualities: result.title_quality = float(sum(title_qualities) / len(title_qualities)) result.domain_coverage_avg = float(sum(coverages) / len(coverages)) if relevance_scores: result.contextual_relevance = float(sum(relevance_scores) / len(relevance_scores)) return result def run_task_b_cross_domain(reviews: pd.DataFrame, n_users: int, *, with_judges: bool, persona_engine: PersonaEngine, agent: RecommendationAgent, judge_llm: LLMClient) -> TaskBModeResult: log.info(f"════ Task B cross-domain evaluation: {n_users} users ════") user_ids = _sample_cross_domain_users(reviews, n_users) train = reviews[reviews["split"] == "train"] test = reviews[reviews["split"] == "test"] result = TaskBModeResult(mode="cross_domain", n_users=0) cross_hit_rates, title_qualities, coverages = [], [], [] relevance_scores, bridge_scores = [], [] for i, user_id in enumerate(user_ids, 1): log.info(f" [{i}/{len(user_ids)}] Task B cross-domain user={user_id[:12]}...") try: persona = persona_engine.from_dataframe(user_id, train) persona = persona_engine.enrich(persona) recs = agent.run(persona, k=10, cross_domain=True) except Exception as e: log.warning(f" Failed: {e}") result.n_failed += 1 continue if not recs: result.n_failed += 1 continue recs_dicts = [r.as_dict() for r in recs] # Cross-domain hit rate: test items in NEW domains (not in user's training) user_test = test[test["user_id"] == user_id] known = set(persona.domains) cross_gt = user_test[~user_test["domain"].isin(known)] if cross_gt.empty: # User has no test items in unknown domains; can't measure HR hr = float("nan") else: gt_ids = list(cross_gt["parent_asin"].unique()) pred_ids = [r.item_id for r in recs] hr = hit_rate_at_k(pred_ids, gt_ids, k=10) tq = title_quality_rate(recs_dicts) all_domains = {"Books", "Kindle_Store", "Movies_and_TV"} expected_unknown = list(all_domains - known) dc = domain_coverage(recs_dicts, expected_unknown) normalized_dc = dc / max(1, len(expected_unknown)) cross_hit_rates.append(hr) title_qualities.append(tq) coverages.append(normalized_dc) relevance = None bridge = None if with_judges: try: rscore = judge_contextual_relevance( judge_llm, persona.to_prompt_block(), recs_dicts, mode="cross_domain", ) relevance_scores.append(rscore.score) relevance = rscore.score except Exception as e: log.warning(f" Relevance judge failed: {e}") try: bscore = judge_bridge_quality( judge_llm, persona.to_prompt_block(), list(persona.domains), recs_dicts, ) bridge_scores.append(bscore.score) bridge = bscore.score except Exception as e: log.warning(f" Bridge judge failed: {e}") result.raw.append({ "user_id": user_id, "known_domains": list(known), "cross_hit_rate": hr, "title_quality": tq, "domain_coverage": normalized_dc, "relevance": relevance, "bridge_quality": bridge, }) result.n_users += 1 if cross_hit_rates: result.hit_rate_at_10 = mean_skipping_nan(cross_hit_rates) if title_qualities: result.title_quality = float(sum(title_qualities) / len(title_qualities)) result.domain_coverage_avg = float(sum(coverages) / len(coverages)) if relevance_scores: result.contextual_relevance = float(sum(relevance_scores) / len(relevance_scores)) if bridge_scores: result.bridge_quality = float(sum(bridge_scores) / len(bridge_scores)) return result # ────────────────────────────────────────────────────────────────────────────── # Report formatting # ────────────────────────────────────────────────────────────────────────────── def _f(v: float, fmt: str = ".3f") -> str: """Format a metric — return 'n/a' for NaN.""" import math if v is None or (isinstance(v, float) and math.isnan(v)): return " n/a" return f"{v:{fmt}}" def format_report(report: FullEvalReport) -> str: lines = [ "═" * 65, "NaijaTaste AI — Full Evaluation Report", f"Provider: {report.provider}", f"Started: {report.started_at}", f"Completed: {report.completed_at}", f"Elapsed: {report.elapsed_seconds:.1f}s ({report.elapsed_seconds/60:.1f} min)", "═" * 65, "", f"TASK A — User Modeling (N={report.task_a.n_users}, failed={report.task_a.n_failed})", f" Rating accuracy (RMSE): {_f(report.task_a.rmse)}", f" Rating accuracy (MAE): {_f(report.task_a.mae)}", f" Review text (ROUGE-L F1): {_f(report.task_a.rouge_l)}", f" Review text (BERTScore F1): {_f(report.task_a.bertscore_f1)}", f" Behavioral fidelity (judge 1-5): {_f(report.task_a.behavioral_fidelity, '.2f')}", "", "TASK B — Recommendation", "", f" Warm mode (N={report.task_b_warm.n_users}, failed={report.task_b_warm.n_failed})", f" Hit Rate@10: {_f(report.task_b_warm.hit_rate_at_10)}", f" NDCG@10: {_f(report.task_b_warm.ndcg_at_10)}", f" Title quality (real titles %): {_f(report.task_b_warm.title_quality)}", f" Domain coverage (known): {_f(report.task_b_warm.domain_coverage_avg)}", f" Contextual relevance (judge): {_f(report.task_b_warm.contextual_relevance, '.2f')}", "", f" Cold-start (N={report.task_b_cold_start.n_users}, failed={report.task_b_cold_start.n_failed})", f" Title quality (real titles %): {_f(report.task_b_cold_start.title_quality)}", f" Domain coverage (of 3): {_f(report.task_b_cold_start.domain_coverage_avg, '.1f')}", f" Contextual relevance (judge): {_f(report.task_b_cold_start.contextual_relevance, '.2f')}", "", f" Cross-domain (N={report.task_b_cross_domain.n_users}, failed={report.task_b_cross_domain.n_failed})", f" Cross-domain Hit Rate@10: {_f(report.task_b_cross_domain.hit_rate_at_10)}", f" Title quality (real titles %): {_f(report.task_b_cross_domain.title_quality)}", f" Domain coverage (unknown): {_f(report.task_b_cross_domain.domain_coverage_avg)}", f" Contextual relevance (judge): {_f(report.task_b_cross_domain.contextual_relevance, '.2f')}", f" Bridge quality (judge): {_f(report.task_b_cross_domain.bridge_quality, '.2f')}", "", "═" * 65, "Higher is better for: ROUGE-L, BERTScore, Hit Rate, NDCG,", " Title quality, Domain coverage, all judge scores.", "Lower is better for: RMSE, MAE.", "═" * 65, ] return "\n".join(lines) # ────────────────────────────────────────────────────────────────────────────── # Main # ────────────────────────────────────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser() ap.add_argument("--smoke", action="store_true", help="Quick smoke test: 5/5/3/5 users (~3 min)") ap.add_argument("--n-task-a", type=int, default=30) ap.add_argument("--n-warm", type=int, default=30) ap.add_argument("--n-cold", type=int, default=10) ap.add_argument("--n-cross", type=int, default=15) ap.add_argument("--no-bertscore", action="store_true", help="Skip BERTScore (first run downloads ~400MB)") ap.add_argument("--no-judges", action="store_true", help="Skip LLM-judge calls (faster, automated metrics only)") ap.add_argument("--out", type=str, default=None, help="Save report markdown to this file path") ap.add_argument("--json-out", type=str, default=None, help="Save raw per-user results as JSON to this path") args = ap.parse_args() if args.smoke: args.n_task_a, args.n_warm, args.n_cold, args.n_cross = 5, 5, 3, 5 started = time.time() started_dt = datetime.now() reviews = _load_reviews() log.info(f"Provider: {settings.llm_provider}") log.info(f"Sample sizes: Task A={args.n_task_a}, " f"Warm={args.n_warm}, Cold={args.n_cold}, Cross={args.n_cross}") log.info(f"BERTScore: {'OFF' if args.no_bertscore else 'ON'}, " f"Judges: {'OFF' if args.no_judges else 'ON'}") # Build shared resources (load once, reuse across tasks) persona_engine = PersonaEngine() task_a_agent = ImpersonationAgent() task_b_agent = RecommendationAgent() judge_llm = LLMClient() report = FullEvalReport( provider=settings.llm_provider, started_at=started_dt.strftime("%Y-%m-%d %H:%M:%S"), ) # ── Task A ─────────────────────────────────────────────────────────── if args.n_task_a > 0: report.task_a = run_task_a( reviews, args.n_task_a, with_bertscore=not args.no_bertscore, with_judges=not args.no_judges, persona_engine=persona_engine, agent=task_a_agent, judge_llm=judge_llm, ) # ── Task B warm ────────────────────────────────────────────────────── if args.n_warm > 0: report.task_b_warm = run_task_b_warm( reviews, args.n_warm, with_judges=not args.no_judges, persona_engine=persona_engine, agent=task_b_agent, judge_llm=judge_llm, ) # ── Task B cold-start ──────────────────────────────────────────────── if args.n_cold > 0: report.task_b_cold_start = run_task_b_cold_start( args.n_cold, with_judges=not args.no_judges, agent=task_b_agent, judge_llm=judge_llm, ) # ── Task B cross-domain ────────────────────────────────────────────── if args.n_cross > 0: report.task_b_cross_domain = run_task_b_cross_domain( reviews, args.n_cross, with_judges=not args.no_judges, persona_engine=persona_engine, agent=task_b_agent, judge_llm=judge_llm, ) completed = time.time() report.completed_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") report.elapsed_seconds = completed - started # ── Print + save ───────────────────────────────────────────────────── text = format_report(report) print("\n" + text + "\n") if args.out: out_path = Path(args.out) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(text) log.info(f"Report saved to {out_path}") if args.json_out: json_path = Path(args.json_out) json_path.parent.mkdir(parents=True, exist_ok=True) json_blob = { "provider": report.provider, "started_at": report.started_at, "completed_at": report.completed_at, "elapsed_seconds": report.elapsed_seconds, "task_a": {**report.task_a.__dict__}, "task_b_warm": {**report.task_b_warm.__dict__}, "task_b_cold_start": {**report.task_b_cold_start.__dict__}, "task_b_cross_domain": {**report.task_b_cross_domain.__dict__}, } json_path.write_text(json.dumps(json_blob, indent=2, default=str)) log.info(f"Raw results saved to {json_path}") if __name__ == "__main__": main()