Spaces:
Running
Running
| """Full evaluation harness β measures everything the rubric measures. | |
| Runs Task A and Task B (warm, cold-start, cross-domain) across N users, | |
| computes all automated metrics (RMSE, ROUGE-L, BERTScore, Hit Rate, NDCG) | |
| plus LLM-as-judge proxies for the human-eval dimensions (behavioral | |
| fidelity, contextual relevance, bridge quality), and prints a structured | |
| report. | |
| This is the baseline. Run it before any experimental change to lock the | |
| current numbers; re-run after to see what moved. | |
| Usage: | |
| # Default β 30/30/10/15 users | |
| python -m scripts.run_eval | |
| # Quick smoke test β 5/5/3/5 users (~3 min) | |
| python -m scripts.run_eval --smoke | |
| # Custom sample sizes | |
| python -m scripts.run_eval --n-task-a 50 --n-warm 50 --n-cold 20 --n-cross 25 | |
| # Skip BERTScore (slow first-time, downloads ~400MB) | |
| python -m scripts.run_eval --no-bertscore | |
| # Skip LLM-judge calls (faster, automated metrics only) | |
| python -m scripts.run_eval --no-judges | |
| # Save the report to a file (markdown) | |
| python -m scripts.run_eval --out reports/baseline_stage2b.md | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import logging | |
| import time | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional | |
| import pandas as pd | |
| from core.config import settings | |
| from core.llm import LLMClient | |
| from core.nigerian import naija_persona_examples | |
| from core.persona import PersonaEngine, UserPersona | |
| from eval.metrics import ( | |
| rmse, mae, rouge_l, bertscore_f1, | |
| ndcg_at_k, hit_rate_at_k, mean_skipping_nan, | |
| ) | |
| from eval.judges import ( | |
| judge_behavioral_fidelity, judge_contextual_relevance, | |
| judge_bridge_quality, title_quality_rate, domain_coverage, | |
| ) | |
| from task_a_user_modeling.agent import ImpersonationAgent, ItemInput | |
| from task_b_recommender.agent import RecommendationAgent | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| log = logging.getLogger(__name__) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Result containers | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TaskAResult: | |
| n_users: int = 0 | |
| rmse: float = float("nan") | |
| mae: float = float("nan") | |
| rouge_l: float = float("nan") | |
| bertscore_f1: float = float("nan") | |
| behavioral_fidelity: float = float("nan") | |
| n_failed: int = 0 | |
| raw: list[dict] = field(default_factory=list) | |
| class TaskBModeResult: | |
| mode: str = "" | |
| n_users: int = 0 | |
| hit_rate_at_10: float = float("nan") | |
| ndcg_at_10: float = float("nan") | |
| title_quality: float = float("nan") | |
| domain_coverage_avg: float = float("nan") | |
| contextual_relevance: float = float("nan") | |
| bridge_quality: float = float("nan") # cross-domain only | |
| n_failed: int = 0 | |
| raw: list[dict] = field(default_factory=list) | |
| class FullEvalReport: | |
| provider: str = "" | |
| started_at: str = "" | |
| completed_at: str = "" | |
| elapsed_seconds: float = 0.0 | |
| task_a: TaskAResult = field(default_factory=TaskAResult) | |
| task_b_warm: TaskBModeResult = field(default_factory=TaskBModeResult) | |
| task_b_cold_start: TaskBModeResult = field(default_factory=TaskBModeResult) | |
| task_b_cross_domain: TaskBModeResult = field(default_factory=TaskBModeResult) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # User sampling | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _load_reviews() -> pd.DataFrame: | |
| path = settings.processed_dir / "reviews.parquet" | |
| if not path.exists(): | |
| raise SystemExit(f"Reviews file not found at {path}. Run prepare_data.py first.") | |
| df = pd.read_parquet(path) | |
| log.info(f"Loaded {len(df):,} reviews") | |
| return df | |
| def _sample_users_with_test_reviews(reviews: pd.DataFrame, n: int, | |
| require_min_train: int = 3, | |
| require_min_domains: int = 1, | |
| seed: int = 42) -> list[str]: | |
| """Sample N users who have both train history and test items. | |
| Constraints applied per user: | |
| - At least `require_min_train` training reviews (enough for a persona) | |
| - At least `require_min_domains` distinct domains in training | |
| - At least 1 held-out test item | |
| """ | |
| train = reviews[reviews["split"] == "train"] | |
| test = reviews[reviews["split"] == "test"] | |
| users_with_test = set(test["user_id"].unique()) | |
| counts = (train.groupby("user_id") | |
| .agg(n_train=("rating", "size"), | |
| n_domains=("domain", "nunique")) | |
| .reset_index()) | |
| eligible = counts[ | |
| (counts["user_id"].isin(users_with_test)) | |
| & (counts["n_train"] >= require_min_train) | |
| & (counts["n_domains"] >= require_min_domains) | |
| ] | |
| if len(eligible) == 0: | |
| raise SystemExit("No eligible users found.") | |
| if len(eligible) < n: | |
| log.warning(f"Only {len(eligible)} eligible users (requested {n}); using all") | |
| n = len(eligible) | |
| sample = eligible.sample(n=n, random_state=seed) | |
| return sample["user_id"].tolist() | |
| def _sample_cross_domain_users(reviews: pd.DataFrame, n: int, seed: int = 43) -> list[str]: | |
| """Sample N users with single-domain training history (cross-domain test targets). | |
| For cross-domain mode we want users whose training history is in 1 domain so | |
| we can recommend in the OTHER domains and measure if anything they engaged | |
| with in those other domains shows up. | |
| """ | |
| train = reviews[reviews["split"] == "train"] | |
| test = reviews[reviews["split"] == "test"] | |
| users_with_test = set(test["user_id"].unique()) | |
| counts = (train.groupby("user_id") | |
| .agg(n_train=("rating", "size"), | |
| n_domains=("domain", "nunique")) | |
| .reset_index()) | |
| # users with single-domain training, at least 3 train reviews, with test items | |
| eligible = counts[ | |
| (counts["user_id"].isin(users_with_test)) | |
| & (counts["n_train"] >= 3) | |
| & (counts["n_domains"] == 1) | |
| ] | |
| if len(eligible) == 0: | |
| raise SystemExit("No eligible cross-domain users found.") | |
| if len(eligible) < n: | |
| log.warning(f"Only {len(eligible)} cross-domain users (requested {n})") | |
| n = len(eligible) | |
| return eligible.sample(n=n, random_state=seed)["user_id"].tolist() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Task A evaluation | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_task_a(reviews: pd.DataFrame, n_users: int, *, | |
| with_bertscore: bool, with_judges: bool, | |
| persona_engine: PersonaEngine, agent: ImpersonationAgent, | |
| judge_llm: LLMClient) -> TaskAResult: | |
| log.info(f"ββββ Task A evaluation: {n_users} users ββββ") | |
| user_ids = _sample_users_with_test_reviews(reviews, n_users) | |
| train = reviews[reviews["split"] == "train"] | |
| test = reviews[reviews["split"] == "test"] | |
| result = TaskAResult(n_users=0) | |
| predicted_ratings, actual_ratings = [], [] | |
| predicted_reviews, actual_reviews = [], [] | |
| fidelity_scores = [] | |
| for i, user_id in enumerate(user_ids, 1): | |
| log.info(f" [{i}/{len(user_ids)}] Task A user={user_id[:12]}...") | |
| try: | |
| persona = persona_engine.from_dataframe(user_id, train) | |
| persona = persona_engine.enrich(persona) | |
| except Exception as e: | |
| log.warning(f" Persona build failed: {e}; skipping user") | |
| result.n_failed += 1 | |
| continue | |
| user_test = test[test["user_id"] == user_id] | |
| if user_test.empty: | |
| result.n_failed += 1 | |
| continue | |
| # Pick the first test review as the target | |
| target_row = user_test.iloc[0] | |
| target_item_id = target_row["parent_asin"] | |
| target_title = target_row["title"] if "title" in target_row else "(unknown)" | |
| target_domain = target_row["domain"] | |
| actual_rating = float(target_row["rating"]) | |
| actual_review = str(target_row["text"]) | |
| # Try to look up real item metadata (description) for the agent | |
| items_path = settings.processed_dir / "items.parquet" | |
| item_description = "" | |
| item_categories = "" | |
| if items_path.exists(): | |
| items_df = pd.read_parquet(items_path) | |
| match = items_df[items_df["parent_asin"] == target_item_id] | |
| if not match.empty: | |
| row = match.iloc[0] | |
| target_title = str(row.get("title") or target_title) | |
| item_description = str(row.get("description") or "") | |
| item_categories = str(row.get("categories") or "") | |
| item_input = ItemInput( | |
| parent_asin=target_item_id, | |
| title=target_title, | |
| description=item_description, | |
| categories=item_categories, | |
| domain=target_domain, | |
| ) | |
| try: | |
| output = agent.run(persona, item=item_input) | |
| pred_rating = float(output.rating) | |
| pred_review = str(output.review) | |
| except Exception as e: | |
| log.warning(f" Generation failed: {e}; skipping user") | |
| result.n_failed += 1 | |
| continue | |
| # Auto metrics | |
| predicted_ratings.append(pred_rating) | |
| actual_ratings.append(actual_rating) | |
| predicted_reviews.append(pred_review) | |
| actual_reviews.append(actual_review) | |
| # LLM-judge | |
| fidelity = None | |
| if with_judges: | |
| try: | |
| score = judge_behavioral_fidelity( | |
| judge_llm, persona.to_prompt_block(), | |
| target_title, target_domain, | |
| pred_rating, pred_review, | |
| actual_rating, actual_review, | |
| ) | |
| fidelity_scores.append(score.score) | |
| fidelity = score.score | |
| except Exception as e: | |
| log.warning(f" Judge failed: {e}") | |
| result.raw.append({ | |
| "user_id": user_id, | |
| "item_id": target_item_id, | |
| "domain": target_domain, | |
| "actual_rating": actual_rating, | |
| "predicted_rating": pred_rating, | |
| "rouge_l": rouge_l(pred_review, actual_review), | |
| "fidelity": fidelity, | |
| }) | |
| result.n_users += 1 | |
| # Aggregate | |
| if predicted_ratings: | |
| result.rmse = rmse(predicted_ratings, actual_ratings) | |
| result.mae = mae(predicted_ratings, actual_ratings) | |
| result.rouge_l = float( | |
| sum(rouge_l(c, r) for c, r in zip(predicted_reviews, actual_reviews)) | |
| / len(predicted_reviews) | |
| ) | |
| if with_bertscore and predicted_reviews: | |
| log.info(" Computing BERTScore (may download model on first use)...") | |
| try: | |
| result.bertscore_f1 = bertscore_f1(predicted_reviews, actual_reviews) | |
| except Exception as e: | |
| log.warning(f" BERTScore failed: {e}") | |
| if fidelity_scores: | |
| result.behavioral_fidelity = float(sum(fidelity_scores) / len(fidelity_scores)) | |
| return result | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Task B evaluation | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_task_b_warm(reviews: pd.DataFrame, n_users: int, *, | |
| with_judges: bool, persona_engine: PersonaEngine, | |
| agent: RecommendationAgent, judge_llm: LLMClient) -> TaskBModeResult: | |
| log.info(f"ββββ Task B warm evaluation: {n_users} users ββββ") | |
| user_ids = _sample_users_with_test_reviews(reviews, n_users, require_min_domains=1) | |
| train = reviews[reviews["split"] == "train"] | |
| test = reviews[reviews["split"] == "test"] | |
| result = TaskBModeResult(mode="warm", n_users=0) | |
| hit_rates, ndcgs, title_qualities = [], [], [] | |
| coverages, relevance_scores = [], [] | |
| for i, user_id in enumerate(user_ids, 1): | |
| log.info(f" [{i}/{len(user_ids)}] Task B warm user={user_id[:12]}...") | |
| try: | |
| persona = persona_engine.from_dataframe(user_id, train) | |
| persona = persona_engine.enrich(persona) | |
| recs = agent.run(persona, k=10, cross_domain=False) | |
| except Exception as e: | |
| log.warning(f" Failed: {e}") | |
| result.n_failed += 1 | |
| continue | |
| if not recs: | |
| result.n_failed += 1 | |
| continue | |
| recs_dicts = [r.as_dict() for r in recs] | |
| user_test = test[test["user_id"] == user_id] | |
| gt_ids = list(user_test["parent_asin"].unique()) | |
| pred_ids = [r.item_id for r in recs] | |
| hr = hit_rate_at_k(pred_ids, gt_ids, k=10) | |
| nd = ndcg_at_k(pred_ids, gt_ids, k=10) | |
| tq = title_quality_rate(recs_dicts) | |
| # Domain coverage: did we span the user's known domains? | |
| known_domains = list(persona.domains) if persona.domains else [] | |
| dc = domain_coverage(recs_dicts, known_domains) if known_domains else 0.0 | |
| normalized_dc = dc / max(1, len(known_domains)) | |
| hit_rates.append(hr) | |
| ndcgs.append(nd) | |
| title_qualities.append(tq) | |
| coverages.append(normalized_dc) | |
| relevance = None | |
| if with_judges: | |
| try: | |
| score = judge_contextual_relevance( | |
| judge_llm, persona.to_prompt_block(), recs_dicts, mode="warm", | |
| ) | |
| relevance_scores.append(score.score) | |
| relevance = score.score | |
| except Exception as e: | |
| log.warning(f" Judge failed: {e}") | |
| result.raw.append({ | |
| "user_id": user_id, | |
| "hit_rate": hr, | |
| "ndcg": nd, | |
| "title_quality": tq, | |
| "domain_coverage": normalized_dc, | |
| "relevance": relevance, | |
| }) | |
| result.n_users += 1 | |
| if hit_rates: | |
| result.hit_rate_at_10 = float(sum(hit_rates) / len(hit_rates)) | |
| result.ndcg_at_10 = float(sum(ndcgs) / len(ndcgs)) | |
| result.title_quality = float(sum(title_qualities) / len(title_qualities)) | |
| result.domain_coverage_avg = float(sum(coverages) / len(coverages)) | |
| if relevance_scores: | |
| result.contextual_relevance = float(sum(relevance_scores) / len(relevance_scores)) | |
| return result | |
| def run_task_b_cold_start(n_personas: int, *, | |
| with_judges: bool, agent: RecommendationAgent, | |
| judge_llm: LLMClient) -> TaskBModeResult: | |
| """Cold-start eval uses synthetic Naija personas (no history). | |
| No held-out test items exist for synthetic personas, so we can't compute | |
| Hit Rate or NDCG β only title quality, domain coverage, and contextual | |
| relevance (LLM-judge). | |
| """ | |
| log.info(f"ββββ Task B cold-start evaluation: {n_personas} personas ββββ") | |
| naija = naija_persona_examples() | |
| # Cycle through naija personas if n_personas > len | |
| personas_to_test = (naija * ((n_personas // len(naija)) + 1))[:n_personas] | |
| result = TaskBModeResult(mode="cold_start", n_users=0) | |
| title_qualities, coverages, relevance_scores = [], [], [] | |
| for i, demo in enumerate(personas_to_test, 1): | |
| log.info(f" [{i}/{len(personas_to_test)}] Cold-start persona={demo['name']}") | |
| persona = UserPersona( | |
| user_id=f"cold_start_{i}", | |
| n_reviews=0, avg_rating=4.0, std_rating=0.5, | |
| avg_review_length=80.0, std_review_length=20.0, | |
| verified_rate=1.0, domains=["Books"], n_domains=1, | |
| rating_distribution={4: 0.6, 5: 0.3, 3: 0.1}, | |
| top_terms=[], | |
| tone="", preferred_themes=demo["stated_preferences"], | |
| common_complaints=demo["deal_breakers"], | |
| voice_one_liner=demo["description"], | |
| history_samples=[], | |
| ) | |
| try: | |
| recs = agent.run(persona, k=10, cross_domain=False) | |
| except Exception as e: | |
| log.warning(f" Generation failed: {e}") | |
| result.n_failed += 1 | |
| continue | |
| if not recs: | |
| result.n_failed += 1 | |
| continue | |
| recs_dicts = [r.as_dict() for r in recs] | |
| tq = title_quality_rate(recs_dicts) | |
| # Expect coverage across the 3 domains since cold-start often spans interests | |
| all_domains = ["Books", "Kindle_Store", "Movies_and_TV"] | |
| dc = domain_coverage(recs_dicts, all_domains) | |
| title_qualities.append(tq) | |
| coverages.append(dc) | |
| relevance = None | |
| if with_judges: | |
| try: | |
| score = judge_contextual_relevance( | |
| judge_llm, persona.to_prompt_block(), recs_dicts, mode="cold_start", | |
| ) | |
| relevance_scores.append(score.score) | |
| relevance = score.score | |
| except Exception as e: | |
| log.warning(f" Judge failed: {e}") | |
| result.raw.append({ | |
| "persona_name": demo["name"], | |
| "title_quality": tq, | |
| "domain_coverage": dc, | |
| "relevance": relevance, | |
| }) | |
| result.n_users += 1 | |
| if title_qualities: | |
| result.title_quality = float(sum(title_qualities) / len(title_qualities)) | |
| result.domain_coverage_avg = float(sum(coverages) / len(coverages)) | |
| if relevance_scores: | |
| result.contextual_relevance = float(sum(relevance_scores) / len(relevance_scores)) | |
| return result | |
| def run_task_b_cross_domain(reviews: pd.DataFrame, n_users: int, *, | |
| with_judges: bool, persona_engine: PersonaEngine, | |
| agent: RecommendationAgent, | |
| judge_llm: LLMClient) -> TaskBModeResult: | |
| log.info(f"ββββ Task B cross-domain evaluation: {n_users} users ββββ") | |
| user_ids = _sample_cross_domain_users(reviews, n_users) | |
| train = reviews[reviews["split"] == "train"] | |
| test = reviews[reviews["split"] == "test"] | |
| result = TaskBModeResult(mode="cross_domain", n_users=0) | |
| cross_hit_rates, title_qualities, coverages = [], [], [] | |
| relevance_scores, bridge_scores = [], [] | |
| for i, user_id in enumerate(user_ids, 1): | |
| log.info(f" [{i}/{len(user_ids)}] Task B cross-domain user={user_id[:12]}...") | |
| try: | |
| persona = persona_engine.from_dataframe(user_id, train) | |
| persona = persona_engine.enrich(persona) | |
| recs = agent.run(persona, k=10, cross_domain=True) | |
| except Exception as e: | |
| log.warning(f" Failed: {e}") | |
| result.n_failed += 1 | |
| continue | |
| if not recs: | |
| result.n_failed += 1 | |
| continue | |
| recs_dicts = [r.as_dict() for r in recs] | |
| # Cross-domain hit rate: test items in NEW domains (not in user's training) | |
| user_test = test[test["user_id"] == user_id] | |
| known = set(persona.domains) | |
| cross_gt = user_test[~user_test["domain"].isin(known)] | |
| if cross_gt.empty: | |
| # User has no test items in unknown domains; can't measure HR | |
| hr = float("nan") | |
| else: | |
| gt_ids = list(cross_gt["parent_asin"].unique()) | |
| pred_ids = [r.item_id for r in recs] | |
| hr = hit_rate_at_k(pred_ids, gt_ids, k=10) | |
| tq = title_quality_rate(recs_dicts) | |
| all_domains = {"Books", "Kindle_Store", "Movies_and_TV"} | |
| expected_unknown = list(all_domains - known) | |
| dc = domain_coverage(recs_dicts, expected_unknown) | |
| normalized_dc = dc / max(1, len(expected_unknown)) | |
| cross_hit_rates.append(hr) | |
| title_qualities.append(tq) | |
| coverages.append(normalized_dc) | |
| relevance = None | |
| bridge = None | |
| if with_judges: | |
| try: | |
| rscore = judge_contextual_relevance( | |
| judge_llm, persona.to_prompt_block(), recs_dicts, | |
| mode="cross_domain", | |
| ) | |
| relevance_scores.append(rscore.score) | |
| relevance = rscore.score | |
| except Exception as e: | |
| log.warning(f" Relevance judge failed: {e}") | |
| try: | |
| bscore = judge_bridge_quality( | |
| judge_llm, persona.to_prompt_block(), | |
| list(persona.domains), recs_dicts, | |
| ) | |
| bridge_scores.append(bscore.score) | |
| bridge = bscore.score | |
| except Exception as e: | |
| log.warning(f" Bridge judge failed: {e}") | |
| result.raw.append({ | |
| "user_id": user_id, | |
| "known_domains": list(known), | |
| "cross_hit_rate": hr, | |
| "title_quality": tq, | |
| "domain_coverage": normalized_dc, | |
| "relevance": relevance, | |
| "bridge_quality": bridge, | |
| }) | |
| result.n_users += 1 | |
| if cross_hit_rates: | |
| result.hit_rate_at_10 = mean_skipping_nan(cross_hit_rates) | |
| if title_qualities: | |
| result.title_quality = float(sum(title_qualities) / len(title_qualities)) | |
| result.domain_coverage_avg = float(sum(coverages) / len(coverages)) | |
| if relevance_scores: | |
| result.contextual_relevance = float(sum(relevance_scores) / len(relevance_scores)) | |
| if bridge_scores: | |
| result.bridge_quality = float(sum(bridge_scores) / len(bridge_scores)) | |
| return result | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Report formatting | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _f(v: float, fmt: str = ".3f") -> str: | |
| """Format a metric β return 'n/a' for NaN.""" | |
| import math | |
| if v is None or (isinstance(v, float) and math.isnan(v)): | |
| return " n/a" | |
| return f"{v:{fmt}}" | |
| def format_report(report: FullEvalReport) -> str: | |
| lines = [ | |
| "β" * 65, | |
| "NaijaTaste AI β Full Evaluation Report", | |
| f"Provider: {report.provider}", | |
| f"Started: {report.started_at}", | |
| f"Completed: {report.completed_at}", | |
| f"Elapsed: {report.elapsed_seconds:.1f}s ({report.elapsed_seconds/60:.1f} min)", | |
| "β" * 65, | |
| "", | |
| f"TASK A β User Modeling (N={report.task_a.n_users}, failed={report.task_a.n_failed})", | |
| f" Rating accuracy (RMSE): {_f(report.task_a.rmse)}", | |
| f" Rating accuracy (MAE): {_f(report.task_a.mae)}", | |
| f" Review text (ROUGE-L F1): {_f(report.task_a.rouge_l)}", | |
| f" Review text (BERTScore F1): {_f(report.task_a.bertscore_f1)}", | |
| f" Behavioral fidelity (judge 1-5): {_f(report.task_a.behavioral_fidelity, '.2f')}", | |
| "", | |
| "TASK B β Recommendation", | |
| "", | |
| f" Warm mode (N={report.task_b_warm.n_users}, failed={report.task_b_warm.n_failed})", | |
| f" Hit Rate@10: {_f(report.task_b_warm.hit_rate_at_10)}", | |
| f" NDCG@10: {_f(report.task_b_warm.ndcg_at_10)}", | |
| f" Title quality (real titles %): {_f(report.task_b_warm.title_quality)}", | |
| f" Domain coverage (known): {_f(report.task_b_warm.domain_coverage_avg)}", | |
| f" Contextual relevance (judge): {_f(report.task_b_warm.contextual_relevance, '.2f')}", | |
| "", | |
| f" Cold-start (N={report.task_b_cold_start.n_users}, failed={report.task_b_cold_start.n_failed})", | |
| f" Title quality (real titles %): {_f(report.task_b_cold_start.title_quality)}", | |
| f" Domain coverage (of 3): {_f(report.task_b_cold_start.domain_coverage_avg, '.1f')}", | |
| f" Contextual relevance (judge): {_f(report.task_b_cold_start.contextual_relevance, '.2f')}", | |
| "", | |
| f" Cross-domain (N={report.task_b_cross_domain.n_users}, failed={report.task_b_cross_domain.n_failed})", | |
| f" Cross-domain Hit Rate@10: {_f(report.task_b_cross_domain.hit_rate_at_10)}", | |
| f" Title quality (real titles %): {_f(report.task_b_cross_domain.title_quality)}", | |
| f" Domain coverage (unknown): {_f(report.task_b_cross_domain.domain_coverage_avg)}", | |
| f" Contextual relevance (judge): {_f(report.task_b_cross_domain.contextual_relevance, '.2f')}", | |
| f" Bridge quality (judge): {_f(report.task_b_cross_domain.bridge_quality, '.2f')}", | |
| "", | |
| "β" * 65, | |
| "Higher is better for: ROUGE-L, BERTScore, Hit Rate, NDCG,", | |
| " Title quality, Domain coverage, all judge scores.", | |
| "Lower is better for: RMSE, MAE.", | |
| "β" * 65, | |
| ] | |
| return "\n".join(lines) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Main | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--smoke", action="store_true", | |
| help="Quick smoke test: 5/5/3/5 users (~3 min)") | |
| ap.add_argument("--n-task-a", type=int, default=30) | |
| ap.add_argument("--n-warm", type=int, default=30) | |
| ap.add_argument("--n-cold", type=int, default=10) | |
| ap.add_argument("--n-cross", type=int, default=15) | |
| ap.add_argument("--no-bertscore", action="store_true", | |
| help="Skip BERTScore (first run downloads ~400MB)") | |
| ap.add_argument("--no-judges", action="store_true", | |
| help="Skip LLM-judge calls (faster, automated metrics only)") | |
| ap.add_argument("--out", type=str, default=None, | |
| help="Save report markdown to this file path") | |
| ap.add_argument("--json-out", type=str, default=None, | |
| help="Save raw per-user results as JSON to this path") | |
| args = ap.parse_args() | |
| if args.smoke: | |
| args.n_task_a, args.n_warm, args.n_cold, args.n_cross = 5, 5, 3, 5 | |
| started = time.time() | |
| started_dt = datetime.now() | |
| reviews = _load_reviews() | |
| log.info(f"Provider: {settings.llm_provider}") | |
| log.info(f"Sample sizes: Task A={args.n_task_a}, " | |
| f"Warm={args.n_warm}, Cold={args.n_cold}, Cross={args.n_cross}") | |
| log.info(f"BERTScore: {'OFF' if args.no_bertscore else 'ON'}, " | |
| f"Judges: {'OFF' if args.no_judges else 'ON'}") | |
| # Build shared resources (load once, reuse across tasks) | |
| persona_engine = PersonaEngine() | |
| task_a_agent = ImpersonationAgent() | |
| task_b_agent = RecommendationAgent() | |
| judge_llm = LLMClient() | |
| report = FullEvalReport( | |
| provider=settings.llm_provider, | |
| started_at=started_dt.strftime("%Y-%m-%d %H:%M:%S"), | |
| ) | |
| # ββ Task A βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if args.n_task_a > 0: | |
| report.task_a = run_task_a( | |
| reviews, args.n_task_a, | |
| with_bertscore=not args.no_bertscore, | |
| with_judges=not args.no_judges, | |
| persona_engine=persona_engine, | |
| agent=task_a_agent, | |
| judge_llm=judge_llm, | |
| ) | |
| # ββ Task B warm ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if args.n_warm > 0: | |
| report.task_b_warm = run_task_b_warm( | |
| reviews, args.n_warm, | |
| with_judges=not args.no_judges, | |
| persona_engine=persona_engine, | |
| agent=task_b_agent, | |
| judge_llm=judge_llm, | |
| ) | |
| # ββ Task B cold-start ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if args.n_cold > 0: | |
| report.task_b_cold_start = run_task_b_cold_start( | |
| args.n_cold, | |
| with_judges=not args.no_judges, | |
| agent=task_b_agent, | |
| judge_llm=judge_llm, | |
| ) | |
| # ββ Task B cross-domain ββββββββββββββββββββββββββββββββββββββββββββββ | |
| if args.n_cross > 0: | |
| report.task_b_cross_domain = run_task_b_cross_domain( | |
| reviews, args.n_cross, | |
| with_judges=not args.no_judges, | |
| persona_engine=persona_engine, | |
| agent=task_b_agent, | |
| judge_llm=judge_llm, | |
| ) | |
| completed = time.time() | |
| report.completed_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| report.elapsed_seconds = completed - started | |
| # ββ Print + save βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| text = format_report(report) | |
| print("\n" + text + "\n") | |
| if args.out: | |
| out_path = Path(args.out) | |
| out_path.parent.mkdir(parents=True, exist_ok=True) | |
| out_path.write_text(text) | |
| log.info(f"Report saved to {out_path}") | |
| if args.json_out: | |
| json_path = Path(args.json_out) | |
| json_path.parent.mkdir(parents=True, exist_ok=True) | |
| json_blob = { | |
| "provider": report.provider, | |
| "started_at": report.started_at, | |
| "completed_at": report.completed_at, | |
| "elapsed_seconds": report.elapsed_seconds, | |
| "task_a": {**report.task_a.__dict__}, | |
| "task_b_warm": {**report.task_b_warm.__dict__}, | |
| "task_b_cold_start": {**report.task_b_cold_start.__dict__}, | |
| "task_b_cross_domain": {**report.task_b_cross_domain.__dict__}, | |
| } | |
| json_path.write_text(json.dumps(json_blob, indent=2, default=str)) | |
| log.info(f"Raw results saved to {json_path}") | |
| if __name__ == "__main__": | |
| main() | |