Spaces:
Sleeping
Sleeping
| """Compile the final evaluation report for the multilingual chatbot. | |
| Two parts: | |
| 1. Aggregate per-model metrics from existing eval_results.json files | |
| (lang_detector, intent_classifier, ner_model). These were measured on | |
| the proper held-out test splits during their training scripts. | |
| 2. Run an end-to-end smoke eval on a curated 15-case test set covering | |
| each language x several intents. Verifies the full pipeline (lang -> | |
| intent -> NER -> RAG -> Qwen) and measures per-stage latency. | |
| Outputs: | |
| evaluation_report.md (human-readable) | |
| evaluation_summary.json (machine-readable) | |
| Usage: | |
| python src/evaluate.py | |
| python src/evaluate.py --skip-e2e # only aggregate stored metrics | |
| python src/evaluate.py --quick # fewer E2E cases | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import statistics | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| PROJECT_ROOT = Path(__file__).resolve().parent.parent | |
| # Make `src.chatbot` importable when this script is run as `python src/evaluate.py`. | |
| if str(PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| LANG_EVAL = PROJECT_ROOT / "models" / "lang_detector" / "eval_results.json" | |
| INTENT_EVAL = PROJECT_ROOT / "models" / "intent_classifier" / "eval_results.json" | |
| NER_EVAL = PROJECT_ROOT / "models" / "ner_model" / "eval_results.json" | |
| REPORT_MD = PROJECT_ROOT / "evaluation_report.md" | |
| REPORT_JSON = PROJECT_ROOT / "evaluation_summary.json" | |
| # Curated end-to-end smoke cases: (text, expected_lang, expected_intents). | |
| # expected_intents is a SET — many natural utterances are validly classified | |
| # as either "booking" or "inquiry", so we accept anything in the set. | |
| E2E_CASES: list[tuple[str, str, set[str]]] = [ | |
| ("Hello there!", "EN", {"greeting"}), | |
| ("How can I book a hotel in Paris?", "EN", {"booking", "inquiry"}), | |
| ("What payment methods do you accept?", "EN", {"inquiry"}), | |
| ("My order arrived damaged, I want a refund.", "EN", {"complaint"}), | |
| ("Goodbye, see you next time.", "EN", {"farewell"}), | |
| ("مرحباً، كيف حالك؟", "AR", {"greeting"}), | |
| ("أحتاج أحجز فندقاً في مكة المكرمة", "AR", {"booking", "inquiry"}), | |
| ("ما هي طرق الدفع المتاحة لديكم؟", "AR", {"inquiry"}), | |
| ("لدي شكوى بخصوص طلبي الأخير", "AR", {"complaint"}), | |
| ("إلى اللقاء، شكراً لكم", "AR", {"farewell"}), | |
| ("Bonjour, comment allez-vous ?", "FR", {"greeting"}), | |
| ("Comment puis-je réserver un vol pour Lyon ?", "FR", {"booking", "inquiry"}), | |
| ("Quels sont vos horaires d'ouverture ?", "FR", {"inquiry"}), | |
| ("Je voudrais déposer une réclamation.", "FR", {"complaint"}), | |
| ("Au revoir et bonne journée", "FR", {"farewell"}), | |
| ] | |
| # --------------------------------------------------------------------------- # | |
| # Part 1 — aggregate stored per-model metrics | |
| # --------------------------------------------------------------------------- # | |
| def _load_json_safely(path: Path) -> dict[str, Any] | None: | |
| if not path.exists(): | |
| return None | |
| try: | |
| return json.loads(path.read_text()) | |
| except json.JSONDecodeError: | |
| return None | |
| def aggregate_model_metrics() -> dict[str, Any]: | |
| """Pull headline numbers from each per-model eval_results.json.""" | |
| summary: dict[str, Any] = {} | |
| lang = _load_json_safely(LANG_EVAL) | |
| if lang: | |
| tm = lang.get("test_metrics", {}) | |
| summary["language_detector"] = { | |
| "model": lang.get("model_name"), | |
| "task": lang.get("task"), | |
| "test_accuracy": tm.get("test_accuracy"), | |
| "test_f1_weighted": tm.get("test_f1"), | |
| "test_f1_macro": tm.get("test_f1_macro"), | |
| "labels": list(lang.get("labels", {}).keys()), | |
| } | |
| else: | |
| summary["language_detector"] = {"error": f"missing {LANG_EVAL.name}"} | |
| intent = _load_json_safely(INTENT_EVAL) | |
| if intent: | |
| tm = intent.get("test_metrics", {}) | |
| summary["intent_classifier"] = { | |
| "model": intent.get("model_name"), | |
| "task": intent.get("task"), | |
| "test_accuracy": tm.get("test_accuracy"), | |
| "test_f1_weighted": tm.get("test_f1"), | |
| "test_f1_macro": tm.get("test_f1_macro"), | |
| "labels": list(intent.get("labels", {}).keys()), | |
| "per_language": intent.get("per_language", {}), | |
| } | |
| else: | |
| summary["intent_classifier"] = {"error": f"missing {INTENT_EVAL.name}"} | |
| ner = _load_json_safely(NER_EVAL) | |
| if ner: | |
| tm = ner.get("test_metrics", {}) | |
| summary["ner_model"] = { | |
| "model": ner.get("model_name"), | |
| "task": ner.get("task"), | |
| "test_f1_micro": tm.get("test_f1"), | |
| "test_precision_micro": tm.get("test_precision"), | |
| "test_recall_micro": tm.get("test_recall"), | |
| "labels": list(ner.get("labels", {}).keys()), | |
| "per_language": ner.get("per_language", {}), | |
| "per_entity_f1": { | |
| k: v.get("f1-score") | |
| for k, v in (ner.get("classification_report") or {}).items() | |
| if isinstance(v, dict) and k not in ("micro avg", "macro avg", | |
| "weighted avg", "accuracy") | |
| }, | |
| } | |
| else: | |
| summary["ner_model"] = {"error": f"missing {NER_EVAL.name}"} | |
| return summary | |
| # --------------------------------------------------------------------------- # | |
| # Part 2 — end-to-end smoke eval | |
| # --------------------------------------------------------------------------- # | |
| def run_e2e(cases: list[tuple[str, str, set[str]]]) -> dict[str, Any]: | |
| """Run the full chatbot pipeline against curated cases. Lazy-imports the | |
| Chatbot so --skip-e2e doesn't pay the model-loading cost.""" | |
| from src.chatbot import Chatbot # noqa: PLC0415 — intentional lazy import | |
| print(f"\n[e2e] loading chatbot (this triggers model load + ~10 s startup) ...") | |
| bot = Chatbot() | |
| results = [] | |
| correct_lang = 0 | |
| correct_intent = 0 | |
| rag_used = 0 | |
| nonempty_replies = 0 | |
| latencies: list[float] = [] | |
| for i, (text, exp_lang, exp_intents) in enumerate(cases, start=1): | |
| t0 = time.perf_counter() | |
| turn = bot.respond(text) | |
| dt = time.perf_counter() - t0 | |
| latencies.append(dt) | |
| lang_ok = (turn.language == exp_lang) | |
| intent_ok = (turn.intent in exp_intents) | |
| reply_ok = bool(turn.reply.strip()) | |
| if lang_ok: | |
| correct_lang += 1 | |
| if intent_ok: | |
| correct_intent += 1 | |
| if turn.retrieved: | |
| rag_used += 1 | |
| if reply_ok: | |
| nonempty_replies += 1 | |
| results.append({ | |
| "i": i, | |
| "text": text, | |
| "expected_language": exp_lang, | |
| "predicted_language": turn.language, | |
| "language_ok": lang_ok, | |
| "expected_intents": sorted(exp_intents), | |
| "predicted_intent": turn.intent, | |
| "intent_confidence": turn.intent_confidence, | |
| "intent_ok": intent_ok, | |
| "n_entities": len(turn.entities), | |
| "n_retrieved": len(turn.retrieved), | |
| "generator_used": turn.generator_used, | |
| "reply": turn.reply, | |
| "latency_s": round(dt, 3), | |
| }) | |
| flag = "✓" if (lang_ok and intent_ok and reply_ok) else "✗" | |
| print(f" [{flag}] case {i:>2}/{len(cases)} " | |
| f"lang={turn.language}({'ok' if lang_ok else f'≠{exp_lang}'}) " | |
| f"intent={turn.intent}({'ok' if intent_ok else 'unexpected'}) " | |
| f"latency={dt:.2f}s") | |
| n = len(cases) | |
| return { | |
| "n_cases": n, | |
| "language_accuracy": correct_lang / n, | |
| "intent_accuracy_in_set": correct_intent / n, | |
| "retrieval_triggered": rag_used, | |
| "nonempty_reply_rate": nonempty_replies / n, | |
| "latency_seconds": { | |
| "mean": round(statistics.mean(latencies), 3), | |
| "median": round(statistics.median(latencies), 3), | |
| "p95": round(sorted(latencies)[int(0.95 * n) - 1], 3) if n >= 5 else None, | |
| "max": round(max(latencies), 3), | |
| }, | |
| "cases": results, | |
| } | |
| # --------------------------------------------------------------------------- # | |
| # Markdown rendering | |
| # --------------------------------------------------------------------------- # | |
| def _fmt_pct(x: float | None, digits: int = 4) -> str: | |
| return f"{x:.{digits}f}" if isinstance(x, (int, float)) else "—" | |
| def render_markdown(model_summary: dict[str, Any], | |
| e2e: dict[str, Any] | None) -> str: | |
| out: list[str] = [] | |
| out.append("# Multilingual Chatbot — Evaluation Report") | |
| out.append("") | |
| out.append("End-to-end multilingual customer-service chatbot built around three " | |
| "fine-tuned DistilBERT classifiers, a FAISS retrieval index over a " | |
| "36-row knowledge base, and Qwen2.5-0.5B-Instruct as the response " | |
| "generator. This report aggregates per-model held-out test metrics " | |
| "and a full-pipeline smoke evaluation.") | |
| out.append("") | |
| out.append("## 1. Per-model test metrics") | |
| out.append("") | |
| out.append("| Component | Model | Headline metric (held-out test) |") | |
| out.append("|-----------|-------|---------------------------------|") | |
| ld = model_summary.get("language_detector", {}) | |
| out.append(f"| Language detector | `{ld.get('model', '?')}` | " | |
| f"acc {_fmt_pct(ld.get('test_accuracy'))} · " | |
| f"F1 macro {_fmt_pct(ld.get('test_f1_macro'))} · " | |
| f"F1 weighted {_fmt_pct(ld.get('test_f1_weighted'))} |") | |
| ic = model_summary.get("intent_classifier", {}) | |
| out.append(f"| Intent classifier | `{ic.get('model', '?')}` | " | |
| f"acc {_fmt_pct(ic.get('test_accuracy'))} · " | |
| f"F1 macro {_fmt_pct(ic.get('test_f1_macro'))} · " | |
| f"F1 weighted {_fmt_pct(ic.get('test_f1_weighted'))} |") | |
| nm = model_summary.get("ner_model", {}) | |
| out.append(f"| NER (entity-level) | `{nm.get('model', '?')}` | " | |
| f"P {_fmt_pct(nm.get('test_precision_micro'))} · " | |
| f"R {_fmt_pct(nm.get('test_recall_micro'))} · " | |
| f"F1 micro {_fmt_pct(nm.get('test_f1_micro'))} |") | |
| out.append("") | |
| if isinstance(ic.get("per_language"), dict) and ic["per_language"]: | |
| out.append("### Intent — per language") | |
| out.append("") | |
| out.append("| Language | n | accuracy | F1 weighted | F1 macro |") | |
| out.append("|----------|---|----------|-------------|----------|") | |
| for lang, m in sorted(ic["per_language"].items()): | |
| out.append(f"| {lang} | {m.get('n', '—')} | " | |
| f"{_fmt_pct(m.get('accuracy'))} | " | |
| f"{_fmt_pct(m.get('f1_weighted'))} | " | |
| f"{_fmt_pct(m.get('f1_macro'))} |") | |
| out.append("") | |
| if isinstance(nm.get("per_entity_f1"), dict) and nm["per_entity_f1"]: | |
| out.append("### NER — per entity type (F1)") | |
| out.append("") | |
| out.append("| Entity | F1 |") | |
| out.append("|--------|----|") | |
| for ent, f1 in sorted(nm["per_entity_f1"].items()): | |
| out.append(f"| {ent} | {_fmt_pct(f1)} |") | |
| out.append("") | |
| if isinstance(nm.get("per_language"), dict) and nm["per_language"]: | |
| out.append("### NER — per language (entity-level F1)") | |
| out.append("") | |
| out.append("| Language | n | precision | recall | F1 |") | |
| out.append("|----------|---|-----------|--------|----|") | |
| for lang, m in sorted(nm["per_language"].items()): | |
| out.append(f"| {lang} | {m.get('n', '—')} | " | |
| f"{_fmt_pct(m.get('precision'))} | " | |
| f"{_fmt_pct(m.get('recall'))} | " | |
| f"{_fmt_pct(m.get('f1'))} |") | |
| out.append("") | |
| out.append("## 2. End-to-end pipeline evaluation") | |
| out.append("") | |
| if e2e is None: | |
| out.append("_E2E evaluation skipped (use `python src/evaluate.py` without " | |
| "`--skip-e2e` to run it)._") | |
| out.append("") | |
| else: | |
| n = e2e["n_cases"] | |
| out.append(f"Tested **{n} curated messages** covering AR/EN/FR × greeting / " | |
| "booking / inquiry / complaint / farewell.") | |
| out.append("") | |
| out.append("| Metric | Value |") | |
| out.append("|--------|-------|") | |
| out.append(f"| Language accuracy | {_fmt_pct(e2e['language_accuracy'])} " | |
| f"({int(round(e2e['language_accuracy'] * n))}/{n}) |") | |
| out.append(f"| Intent accuracy (in-set) | " | |
| f"{_fmt_pct(e2e['intent_accuracy_in_set'])} " | |
| f"({int(round(e2e['intent_accuracy_in_set'] * n))}/{n}) |") | |
| out.append(f"| Non-empty reply rate | " | |
| f"{_fmt_pct(e2e['nonempty_reply_rate'])} " | |
| f"({int(round(e2e['nonempty_reply_rate'] * n))}/{n}) |") | |
| out.append(f"| Retrieval triggered | {e2e['retrieval_triggered']}/{n} " | |
| "(expected: only on inquiry/booking/complaint) |") | |
| lat = e2e["latency_seconds"] | |
| out.append(f"| Latency: mean / median / p95 / max | " | |
| f"{lat['mean']}s / {lat['median']}s / " | |
| f"{lat['p95'] if lat['p95'] is not None else '—'}s / " | |
| f"{lat['max']}s |") | |
| out.append("") | |
| out.append("### Per-case detail") | |
| out.append("") | |
| out.append("| # | text | lang (pred / exp) | intent (pred / exp) | " | |
| "retrieved | gen | latency |") | |
| out.append("|---|------|------|------|-----------|-----|---------|") | |
| for c in e2e["cases"]: | |
| lang_cell = f"{c['predicted_language']} / {c['expected_language']}" | |
| if not c["language_ok"]: | |
| lang_cell = f"**{lang_cell} ✗**" | |
| intent_cell = f"{c['predicted_intent']} / {','.join(c['expected_intents'])}" | |
| if not c["intent_ok"]: | |
| intent_cell = f"**{intent_cell} ✗**" | |
| text = c["text"].replace("|", "\\|") | |
| text_short = text if len(text) <= 50 else text[:47] + "..." | |
| out.append(f"| {c['i']} | {text_short} | {lang_cell} | " | |
| f"{intent_cell} | {c['n_retrieved']} | " | |
| f"{'Y' if c['generator_used'] else 'n'} | " | |
| f"{c['latency_s']}s |") | |
| out.append("") | |
| out.append("## 3. Architecture summary") | |
| out.append("") | |
| out.append("```") | |
| out.append("user_text") | |
| out.append(" -> Language Detector (DistilBERT, CPU) AR / CS / EN / FR") | |
| out.append(" -> Intent Classifier (DistilBERT, CPU) booking | complaint | farewell |") | |
| out.append(" greeting | inquiry | other") | |
| out.append(" -> NER (DistilBERT, CPU) PER / LOC / ORG / DATE (BIO)") | |
| out.append(" -> Branch on intent:") | |
| out.append(" greeting / farewell -> canned reply (instant)") | |
| out.append(" inquiry / booking / -> RAG retrieve top-3 (FAISS, MiniLM, CPU)") | |
| out.append(" complaint + Qwen2.5-0.5B generates reply (cuda fp16)") | |
| out.append(" other -> Qwen2.5-0.5B from general knowledge") | |
| out.append(" -> Reply (always in user's detected language; CS uses EN)") | |
| out.append("```") | |
| out.append("") | |
| out.append("All metrics in §1 were computed on held-out test splits during each " | |
| "model's training script. The §2 numbers are an additional integration " | |
| "check on hand-curated messages.") | |
| return "\n".join(out) | |
| # --------------------------------------------------------------------------- # | |
| # Entry point | |
| # --------------------------------------------------------------------------- # | |
| def main() -> int: | |
| parser = argparse.ArgumentParser(description=__doc__.split("\n")[0]) | |
| parser.add_argument("--skip-e2e", action="store_true", | |
| help="Don't load models or run the E2E smoke eval; only " | |
| "aggregate stored eval_results.json files.") | |
| parser.add_argument("--quick", action="store_true", | |
| help="Run only the first 6 E2E cases.") | |
| args = parser.parse_args() | |
| print("=" * 72) | |
| print("Multilingual chatbot — final evaluation") | |
| print("=" * 72) | |
| print("\n[1/2] Aggregating per-model metrics from eval_results.json files ...") | |
| model_summary = aggregate_model_metrics() | |
| for k, v in model_summary.items(): | |
| if "error" in v: | |
| print(f" - {k}: {v['error']}") | |
| else: | |
| print(f" - {k}: loaded ({v.get('model', '?')})") | |
| e2e = None | |
| if args.skip_e2e: | |
| print("\n[2/2] Skipping E2E evaluation (--skip-e2e).") | |
| else: | |
| cases = E2E_CASES[:6] if args.quick else E2E_CASES | |
| print(f"\n[2/2] Running E2E pipeline on {len(cases)} curated cases ...") | |
| e2e = run_e2e(cases) | |
| md = render_markdown(model_summary, e2e) | |
| REPORT_MD.write_text(md, encoding="utf-8") | |
| summary = {"models": model_summary, "e2e": e2e} | |
| REPORT_JSON.write_text(json.dumps(summary, indent=2, ensure_ascii=False)) | |
| print(f"\n[OK] Wrote {REPORT_MD.relative_to(PROJECT_ROOT)}") | |
| print(f"[OK] Wrote {REPORT_JSON.relative_to(PROJECT_ROOT)}") | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |