from __future__ import annotations import argparse import json import math import os import re import statistics import time from datetime import datetime, timezone from pathlib import Path from app.config import DATA_DIR, GEMINI_API_KEY, GEMINI_MODEL from app.multimodal import multimodal_artifacts from app.rag import answer_question from app.retriever import hybrid_retrieve EVAL_CASES_PATH = DATA_DIR / "evaluation" / "eval_cases.json" EVAL_REPORT_DIR = DATA_DIR / "evaluation" / "reports" SUGGESTED_QUESTIONS_PATH = DATA_DIR / "processed" / "q&a.json" TICKER_PATTERN = re.compile(r"\b[A-Z]{2,5}\b") NUMBER_PATTERN = re.compile(r"[-+]?\d+(?:[.,]\d+)*(?:%|x)?") STOPWORDS = { "a", "an", "and", "are", "as", "at", "be", "by", "cho", "co", "cua", "da", "duoc", "gi", "hay", "khi", "khong", "la", "mot", "neu", "nhung", "noi", "nua", "or", "tai", "the", "thi", "this", "to", "tren", "tu", "va", "ve", "voi", } def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Evaluate retrieval, generation and performance.") parser.add_argument("--cases", default=str(EVAL_CASES_PATH), help="Path to evaluation cases JSON.") parser.add_argument("--top-k", type=int, default=5, help="Top-k retrieval cutoff.") parser.add_argument( "--repeats", type=int, default=3, help="Number of repeated runs for latency measurements.", ) parser.add_argument( "--output-dir", default=str(EVAL_REPORT_DIR), help="Directory to write JSON and Markdown reports.", ) parser.add_argument( "--eval-model", default=None, help="Optional DeepEval model name. Defaults to GEMINI_MODEL when GEMINI_API_KEY is set.", ) parser.add_argument( "--deepeval-threshold", type=float, default=0.5, help="Passing threshold for DeepEval metrics.", ) parser.add_argument( "--include-reason", action="store_true", help="Include DeepEval metric reasons in the JSON/Markdown reports.", ) return parser.parse_args() def normalize_text(text: str) -> str: return " ".join(str(text).lower().replace("\n", " ").split()) def tokenize(text: str) -> list[str]: cleaned = [] current = [] for char in normalize_text(text): if char.isalnum(): current.append(char) else: if current: cleaned.append("".join(current)) current = [] if current: cleaned.append("".join(current)) return cleaned def informative_tokens(text: str) -> list[str]: return [token for token in tokenize(text) if len(token) > 2 and token not in STOPWORDS] def token_set(text: str) -> set[str]: return set(informative_tokens(text)) def overlap_score(candidate: str, reference: str) -> float: reference_tokens = token_set(reference) if not reference_tokens: return 0.0 candidate_tokens = token_set(candidate) return len(candidate_tokens & reference_tokens) / len(reference_tokens) def mean_or_zero(values: list[float]) -> float: if not values: return 0.0 return round(statistics.mean(values), 3) def numeric_values(results: list[dict], key: str) -> list[float]: values = [] for result in results: value = result.get(key) if isinstance(value, (int, float)) and not isinstance(value, bool): values.append(float(value)) return values def preview(text: str, limit: int = 180) -> str: compact = " ".join(str(text).split()) if len(compact) <= limit: return compact return compact[: limit - 3] + "..." def percentile(values: list[float], pct: float) -> float: if not values: return 0.0 ordered = sorted(values) if len(ordered) == 1: return ordered[0] rank = pct * (len(ordered) - 1) lower = math.floor(rank) upper = math.ceil(rank) if lower == upper: return ordered[lower] fraction = rank - lower return ordered[lower] * (1 - fraction) + ordered[upper] * fraction def latency_summary(latencies_ms: list[float]) -> dict: if not latencies_ms: return {"count": 0, "avg_ms": 0.0, "p95_ms": 0.0, "min_ms": 0.0, "max_ms": 0.0} return { "count": len(latencies_ms), "avg_ms": round(statistics.mean(latencies_ms), 2), "p95_ms": round(percentile(latencies_ms, 0.95), 2), "min_ms": round(min(latencies_ms), 2), "max_ms": round(max(latencies_ms), 2), } def infer_case_ticker(question: str) -> str | None: for match in TICKER_PATTERN.findall(question.upper()): if match in {"HPG", "FPT", "VCB"}: return match return None def build_default_eval_cases(limit: int = 9) -> list[dict]: payload = {} if SUGGESTED_QUESTIONS_PATH.exists(): payload = json.loads(SUGGESTED_QUESTIONS_PATH.read_text(encoding="utf-8")) questions = [] for group in payload.get("suggested_questions", []): if not isinstance(group, dict): continue for question in group.get("questions", []): if isinstance(question, str) and question.strip(): questions.append(question.strip()) if not questions: questions = [ "Tom tat nhanh co phieu HPG hien tai", "FPT co nhung dong luc tang truong nao?", "VCB co diem manh va rui ro gi?", ] cases = [] for index, question in enumerate(questions[:limit], start=1): cases.append( { "id": f"auto_{index:03d}", "question": question, "ticker": infer_case_ticker(question), "expected_chunks": [], "expected_answer_keywords": [], "expected_source_keywords": [], } ) return cases def write_default_eval_cases(path: Path, cases: list[dict]) -> None: path.parent.mkdir(parents=True, exist_ok=True) payload = { "note": ( "Auto-generated starter cases. Add expected_chunks, expected_answer_keywords " "and expected_source_keywords for stricter evaluation." ), "cases": cases, } path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def load_eval_cases(path: Path) -> list[dict]: if not path.exists(): cases = build_default_eval_cases() write_default_eval_cases(path, cases) print(f"Created starter eval cases: {path.as_posix()}") return cases payload = json.loads(path.read_text(encoding="utf-8")) if isinstance(payload, dict): cases = payload.get("cases", []) elif isinstance(payload, list): cases = payload else: cases = [] return [case for case in cases if isinstance(case, dict)] def chunk_matches_expectation(chunk, expectation: dict) -> bool: source_path = normalize_text(getattr(chunk, "source_path", "")) heading_path = normalize_text(" ".join(getattr(chunk, "heading_path", []))) text = normalize_text(getattr(chunk, "text", "")) modality = normalize_text(getattr(chunk, "modality", "")) ticker = normalize_text(getattr(chunk, "ticker", "")) scope = normalize_text(getattr(chunk, "scope", "") or getattr(chunk, "ticker", "")) if expectation.get("ticker") and ticker != normalize_text(expectation["ticker"]): return False if expectation.get("scope") and scope != normalize_text(expectation["scope"]): return False if expectation.get("modality") and modality != normalize_text(expectation["modality"]): return False if expectation.get("source_path_contains"): if normalize_text(expectation["source_path_contains"]) not in source_path: return False if expectation.get("heading_contains_any"): if not any(normalize_text(value) in heading_path for value in expectation["heading_contains_any"]): return False if expectation.get("text_contains_any"): if not any(normalize_text(value) in text for value in expectation["text_contains_any"]): return False return True def expected_context_text(case: dict) -> str: values = [] for key in ["expected_output", "expected_answer", "reference_answer"]: if case.get(key): values.append(str(case[key])) values.extend(str(value) for value in case.get("expected_answer_keywords", [])) values.extend(str(value) for value in case.get("expected_source_keywords", [])) for expectation in case.get("expected_chunks", []): values.extend(str(value) for value in expectation.get("text_contains_any", [])) values.extend(str(value) for value in expectation.get("heading_contains_any", [])) if expectation.get("source_path_contains"): values.append(str(expectation["source_path_contains"])) return " ".join(values) def chunk_relevance_flags(case: dict, chunks) -> list[bool]: expectations = case.get("expected_chunks", []) if expectations: return [ any(chunk_matches_expectation(chunk, expectation) for expectation in expectations) for chunk in chunks ] reference = expected_context_text(case) or case["question"] return [overlap_score(chunk.text, reference) > 0 for chunk in chunks] def evaluate_retrieval_case(case: dict, top_k: int) -> dict: started_at = time.perf_counter() try: hits = hybrid_retrieve(case["question"], top_k=top_k, ticker=case.get("ticker")) latency_ms = (time.perf_counter() - started_at) * 1000 except Exception as exc: # noqa: BLE001 return { "case_id": case["id"], "question": case["question"], "ticker": case.get("ticker"), "latency_ms": round((time.perf_counter() - started_at) * 1000, 2), "top_k": top_k, "expected_evidence_count": len(case.get("expected_chunks", [])), "matched_evidence_count": 0, "strict_evaluation": bool(case.get("expected_chunks")), "recall_at_k": None, "precision_at_k": None, "hit_rate_at_k": None, "first_relevant_rank": None, "mrr": None, "qualitative_top_chunks": [], "error": str(exc), } expectations = case.get("expected_chunks", []) matched_ranks: list[int] = [] for expectation in expectations: matched_rank = None for rank, chunk in enumerate(hits, start=1): if chunk_matches_expectation(chunk, expectation): matched_rank = rank break if matched_rank is not None: matched_ranks.append(matched_rank) expected_count = len(expectations) strict_evaluation = expected_count > 0 first_relevant_rank = min(matched_ranks) if matched_ranks else None coverage = (len(matched_ranks) / expected_count) if strict_evaluation else None relevance_flags = chunk_relevance_flags(case, hits) if strict_evaluation and relevance_flags and first_relevant_rank is None: first_relevant_rank = next((rank for rank, flag in enumerate(relevance_flags, start=1) if flag), None) evaluated_flags = relevance_flags[:top_k] precision_denominator = len(evaluated_flags) relevant_retrieved_count = sum(evaluated_flags) if strict_evaluation else 0 precision_at_k = ( relevant_retrieved_count / precision_denominator if strict_evaluation and precision_denominator > 0 else (0.0 if strict_evaluation else None) ) hit_rate_at_k = ( float(any(evaluated_flags)) if strict_evaluation else None ) return { "case_id": case["id"], "question": case["question"], "ticker": case.get("ticker"), "latency_ms": round(latency_ms, 2), "top_k": top_k, "strict_evaluation": strict_evaluation, "expected_evidence_count": expected_count, "matched_evidence_count": len(matched_ranks), "relevant_retrieved_count": relevant_retrieved_count if strict_evaluation else None, "recall_at_k": round(coverage, 3) if coverage is not None else None, "precision_at_k": round(precision_at_k, 3) if precision_at_k is not None else None, "hit_rate_at_k": round(hit_rate_at_k, 3) if hit_rate_at_k is not None else None, "first_relevant_rank": first_relevant_rank, "mrr": round(1 / first_relevant_rank, 4) if first_relevant_rank else None, "qualitative_top_chunks": [ { "rank": rank, "score": round(chunk.score, 4), "ticker": chunk.ticker, "scope": chunk.scope, "modality": chunk.modality, "source_path": chunk.source_path, "heading_path": chunk.heading_path, "preview": preview(chunk.text), } for rank, chunk in enumerate(hits[: min(3, len(hits))], start=1) ], } def source_keyword_match_score(sources: list[dict], expected_keywords: list[str]) -> float: if not expected_keywords: return 1.0 haystack = normalize_text( " ".join( f"{source.get('source_path', '')} {source.get('artifact_path', '')} {source.get('url', '')}" for source in sources ) ) hits = sum(1 for keyword in expected_keywords if normalize_text(keyword) in haystack) return hits / len(expected_keywords) def context_grounding_score(answer: str, retrieved_chunks) -> float: answer_tokens = informative_tokens(answer) if not answer_tokens: return 0.0 context_tokens: set[str] = set() for chunk in retrieved_chunks: context_tokens.update(informative_tokens(chunk.text)) if not context_tokens: return 0.0 shared = sum(1 for token in answer_tokens if token in context_tokens) return shared / len(answer_tokens) def lexical_answer_relevancy_score(question: str, answer: str) -> float: question_tokens = token_set(question) answer_tokens = token_set(answer) if not question_tokens: return 0.0 return len(question_tokens & answer_tokens) / len(question_tokens) def extract_numbers(text: str) -> list[float]: values = [] for raw_value in NUMBER_PATTERN.findall(text): parsed = parse_number_token(raw_value) if parsed is None: continue values.append(parsed) return values def parse_number_token(raw_value: str) -> float | None: value = raw_value.strip().rstrip("%xX") if not value: return None sign = "" if value[0] in {"+", "-"}: sign, value = value[0], value[1:] if "," in value and "." in value: last_comma = value.rfind(",") last_dot = value.rfind(".") if last_comma > last_dot: normalized = value.replace(".", "").replace(",", ".") else: normalized = value.replace(",", "") elif "," in value: parts = value.split(",") if len(parts) > 2 or len(parts[-1]) == 3: normalized = "".join(parts) else: normalized = value.replace(",", ".") elif "." in value: parts = value.split(".") if len(parts) > 2: normalized = "".join(parts) elif len(parts[-1]) == 3 and len(parts[0]) > 2: normalized = "".join(parts) else: normalized = value else: normalized = value try: return float(f"{sign}{normalized}") except ValueError: return None def numerical_accuracy_score(answer: str, case: dict) -> float | None: expected_numbers = case.get("expected_numbers") if expected_numbers is None: return None else: expected_numbers = [float(value) for value in expected_numbers] if not expected_numbers: return None answer_numbers = extract_numbers(answer) if not answer_numbers: return 0.0 matched = 0 remaining = answer_numbers[:] for expected in expected_numbers: tolerance = max(abs(expected) * 0.01, 0.01) match_index = next( (index for index, actual in enumerate(remaining) if math.isclose(actual, expected, abs_tol=tolerance)), None, ) if match_index is not None: matched += 1 remaining.pop(match_index) return matched / len(expected_numbers) def citation_accuracy_score(sources: list[dict], case: dict) -> float | None: expected_keywords = case.get("expected_source_keywords", []) if expected_keywords: return source_keyword_match_score(sources, expected_keywords) expectations = case.get("expected_chunks", []) if not expectations: return None matched = 0 for expectation in expectations: expected_path = normalize_text(expectation.get("source_path_contains", "")) expected_text = normalize_text(expectation.get("text_contains", "")) for source in sources: source_text = normalize_text( " ".join( str(source.get(key, "")) for key in ["source_path", "artifact_path", "url", "title", "structure_type"] ) ) if expected_path and expected_path in source_text: matched += 1 break if expected_text and expected_text in source_text: matched += 1 break return matched / len(expectations) def evaluate_generation_case(case: dict, top_k: int) -> dict: started_at = time.perf_counter() try: result = answer_question(case["question"], ticker=case.get("ticker"), top_k=top_k) latency_ms = (time.perf_counter() - started_at) * 1000 except Exception as exc: # noqa: BLE001 return { "case_id": case["id"], "question": case["question"], "ticker": case.get("ticker"), "latency_ms": round((time.perf_counter() - started_at) * 1000, 2), "source_count": 0, "numerical_accuracy": None, "citation_accuracy": None, "has_sources": False, "answer_preview": "", "source_preview": [], "error": str(exc), } answer = str(result.get("answer", "")) sources = list(result.get("sources", [])) numerical_accuracy = numerical_accuracy_score(answer, case) citation_accuracy = citation_accuracy_score(sources, case) fallback_metrics = {} if not case.get("expected_numbers") and not extract_numbers(expected_output_for_deepeval(case)): fallback_metrics["numerical_accuracy"] = "no_expected_numbers" if not case.get("expected_source_keywords") and not case.get("expected_chunks"): fallback_metrics["citation_accuracy"] = "sources_present_without_expected_citations" return { "case_id": case["id"], "question": case["question"], "ticker": case.get("ticker"), "latency_ms": round(latency_ms, 2), "source_count": len(sources), "numerical_accuracy": round(numerical_accuracy, 3) if numerical_accuracy is not None else None, "citation_accuracy": round(citation_accuracy, 3) if citation_accuracy is not None else None, "fallback_metrics": fallback_metrics, "has_sources": bool(sources), "answer": answer, "answer_preview": preview(answer, limit=260), "source_preview": [ { "ticker": source.get("ticker"), "modality": source.get("modality"), "structure_type": source.get("structure_type"), "source_path": source.get("source_path"), "url": source.get("url"), } for source in sources[:3] ], } def expected_output_for_deepeval(case: dict) -> str: return str( case.get("expected_output") or case.get("expected_answer") or case.get("reference_answer") or expected_context_text(case) or "" ) def deepeval_metric_kwargs(eval_model: str | None, threshold: float, include_reason: bool) -> dict: kwargs = { "threshold": threshold, "include_reason": include_reason, } if eval_model and not eval_model.lower().startswith("gemini"): kwargs["model"] = eval_model elif GEMINI_API_KEY: from deepeval.models.llms.gemini_model import GeminiModel kwargs["model"] = GeminiModel( model=eval_model or GEMINI_MODEL, api_key=GEMINI_API_KEY or None, temperature=0, ) return kwargs def effective_eval_model(eval_model: str | None) -> str | None: if eval_model: return eval_model if GEMINI_API_KEY: return GEMINI_MODEL return None def measure_deepeval_metric(metric, test_case) -> dict: metric.measure(test_case) return { "score": round(float(getattr(metric, "score", 0.0) or 0.0), 3), "reason": getattr(metric, "reason", None), "success": bool(getattr(metric, "success", False)), } def apply_deepeval_generation_scores( case: dict, result: dict, chunks, answer: str, eval_model: str | None, threshold: float, include_reason: bool, ) -> dict: try: from deepeval.metrics import AnswerRelevancyMetric, FaithfulnessMetric from deepeval.test_case import LLMTestCase except ImportError as exc: result["deepeval_error"] = ( "DeepEval is not installed. Install the optional dependency with " "`pip install deepeval` before running evaluation." ) result["deepeval_import_error"] = str(exc) return result retrieval_context = [chunk.text for chunk in chunks] test_case_kwargs = { "input": case["question"], "actual_output": answer, "retrieval_context": retrieval_context, } expected_output = expected_output_for_deepeval(case) if expected_output: test_case_kwargs["expected_output"] = expected_output test_case = LLMTestCase(**test_case_kwargs) metric_kwargs = deepeval_metric_kwargs(eval_model, threshold, include_reason) scores = {} for key, metric_cls in [ ("answer_relevancy", AnswerRelevancyMetric), ("faithfulness", FaithfulnessMetric), ]: try: scores[key] = measure_deepeval_metric(metric_cls(**metric_kwargs), test_case) result[key] = scores[key]["score"] result.get("fallback_metrics", {}).pop(key, None) except Exception as exc: # noqa: BLE001 scores[key] = {"error": str(exc)} result["deepeval"] = scores return result def evaluate_generation_case_with_deepeval( case: dict, top_k: int, eval_model: str | None, threshold: float, include_reason: bool, ) -> dict: result = evaluate_generation_case(case, top_k) if result.get("error"): return result try: retrieval_chunks = hybrid_retrieve(case["question"], top_k=top_k, ticker=case.get("ticker")) except Exception as exc: # noqa: BLE001 result["deepeval_error"] = str(exc) return result fallback_scores = { "faithfulness": round(context_grounding_score(str(result.get("answer", "")), retrieval_chunks), 3), "answer_relevancy": round( lexical_answer_relevancy_score(case["question"], str(result.get("answer", ""))), 3, ), } result.setdefault("fallback_metrics", {}).update( { "faithfulness": "lexical_context_grounding", "answer_relevancy": "question_answer_token_overlap", } ) result = apply_deepeval_generation_scores( case, result, retrieval_chunks, str(result.get("answer", "")), eval_model, threshold, include_reason, ) for metric_name, fallback_score in fallback_scores.items(): if result.get(metric_name) is None: result[metric_name] = fallback_score return result def evaluate_multimodal_readiness(cases: list[dict]) -> dict: tickers = sorted({str(case.get("ticker", "")).upper() for case in cases if case.get("ticker")}) inventory = [] lookup_latencies = [] for ticker in tickers: started_at = time.perf_counter() artifacts = multimodal_artifacts(ticker) lookup_latencies.append((time.perf_counter() - started_at) * 1000) inventory.append( { "ticker": ticker, "has_chart": bool(artifacts["chart"]), "table_count": len(artifacts["tables"]), "pdf_count": len(artifacts["pdfs"]), } ) ready_all_three = sum( 1 for item in inventory if item["has_chart"] and item["table_count"] > 0 and item["pdf_count"] > 0 ) return { "tickers_evaluated": tickers, "inventory": inventory, "tickers_with_chart_table_pdf": ready_all_three, "artifact_lookup_latency_ms": latency_summary(lookup_latencies), "note": ( "Current system surfaces image/csv/pdf artifacts in UI and sources, " "but retrieval is still primarily text+dense/BM25 rather than true multimodal embedding." ), } def evaluate_performance(cases: list[dict], top_k: int, repeats: int) -> dict: retrieval_latencies = [] generation_latencies = [] retrieval_errors: dict[str, int] = {} generation_errors: dict[str, int] = {} per_case = [] for case in cases: case_retrieval_latencies = [] case_generation_latencies = [] for _ in range(repeats): try: retrieval_started_at = time.perf_counter() hybrid_retrieve(case["question"], top_k=top_k, ticker=case.get("ticker")) case_retrieval_latencies.append((time.perf_counter() - retrieval_started_at) * 1000) except Exception as exc: # noqa: BLE001 message = str(exc) retrieval_errors[message] = retrieval_errors.get(message, 0) + 1 try: generation_started_at = time.perf_counter() answer_question(case["question"], ticker=case.get("ticker"), top_k=top_k) case_generation_latencies.append((time.perf_counter() - generation_started_at) * 1000) except Exception as exc: # noqa: BLE001 message = str(exc) generation_errors[message] = generation_errors.get(message, 0) + 1 retrieval_latencies.extend(case_retrieval_latencies) generation_latencies.extend(case_generation_latencies) per_case.append( { "case_id": case["id"], "question": case["question"], "ticker": case.get("ticker"), "retrieval_latency_ms": latency_summary(case_retrieval_latencies), "generation_latency_ms": latency_summary(case_generation_latencies), } ) return { "retrieval_latency_ms": latency_summary(retrieval_latencies), "generation_latency_ms": latency_summary(generation_latencies), "retrieval_failure_count": sum(retrieval_errors.values()), "generation_failure_count": sum(generation_errors.values()), "retrieval_errors": retrieval_errors, "generation_errors": generation_errors, "multimodal_readiness": evaluate_multimodal_readiness(cases), "per_case": per_case, } def summarize_retrieval(results: list[dict]) -> dict: total = len(results) error_count = sum(1 for result in results if result.get("error")) strict_results = [result for result in results if result.get("strict_evaluation")] mrr_values = numeric_values(strict_results, "mrr") recall_at_k_values = numeric_values(strict_results, "recall_at_k") precision_at_k_values = numeric_values(strict_results, "precision_at_k") hit_rate_at_k_values = numeric_values(strict_results, "hit_rate_at_k") latencies = [result["latency_ms"] for result in results] return { "case_count": total, "error_count": error_count, "strict_case_count": len(strict_results), "smoke_case_count": total - len(strict_results), "mean_mrr": round(statistics.mean(mrr_values), 3) if mrr_values else None, "recall_at_k": mean_or_zero(recall_at_k_values) if recall_at_k_values else None, "precision_at_k": mean_or_zero(precision_at_k_values) if precision_at_k_values else None, "hit_rate_at_k": mean_or_zero(hit_rate_at_k_values) if hit_rate_at_k_values else None, "latency_ms": latency_summary(latencies), } def summarize_generation(results: list[dict]) -> dict: total = len(results) error_count = sum(1 for result in results if result.get("error")) answer_relevancy_values = numeric_values(results, "answer_relevancy") faithfulness_values = numeric_values(results, "faithfulness") numerical_accuracy_values = numeric_values(results, "numerical_accuracy") citation_accuracy_values = numeric_values(results, "citation_accuracy") latencies = [result["latency_ms"] for result in results] return { "case_count": total, "error_count": error_count, "latency_ms": latency_summary(latencies), "faithfulness": mean_or_zero(faithfulness_values) if faithfulness_values else None, "answer_relevancy": mean_or_zero(answer_relevancy_values) if answer_relevancy_values else None, "numerical_case_count": len(numerical_accuracy_values), "numerical_accuracy": mean_or_zero(numerical_accuracy_values) if numerical_accuracy_values else None, "citation_case_count": len(citation_accuracy_values), "citation_accuracy": mean_or_zero(citation_accuracy_values) if citation_accuracy_values else None, } def build_markdown_report(report: dict) -> str: retrieval_summary = report["retrieval"]["summary"] generation_summary = report["generation"]["summary"] performance_summary = report["performance"] lines = [ "# Danh gia he thong", "", f"Thoi gian tao bao cao: {report['generated_at_utc']}", f"DeepEval model: {report.get('eval_model') or 'N/A'}", f"Top-k: {report['top_k']}", f"So case: {report['case_count']}", "", "## 4.x.1 Danh gia Retrieval", f"- Strict cases: {retrieval_summary.get('strict_case_count', 0)}", f"- Smoke-only cases: {retrieval_summary.get('smoke_case_count', 0)}", f"- Mean MRR: {retrieval_summary['mean_mrr']}", f"- Recall@{report['top_k']}: {retrieval_summary['recall_at_k']}", f"- Precision@{report['top_k']}: {retrieval_summary['precision_at_k']}", f"- Hit Rate@{report['top_k']}: {retrieval_summary['hit_rate_at_k']}", "", "### Qualitative examples", ] for case in report["retrieval"]["cases"][:3]: lines.extend( [ f"- Case `{case['case_id']}`: {case['question']}", ( f" strict={case.get('strict_evaluation', False)} " f"mrr={case['mrr']} recall_at_k={case.get('recall_at_k', 'N/A')} " f"precision_at_k={case.get('precision_at_k', 'N/A')} " f"hit_rate_at_k={case.get('hit_rate_at_k', 'N/A')}" ), ] ) if case.get("deepeval_error"): lines.append(f" deepeval_error: {preview(case['deepeval_error'], 220)}") for chunk in case["qualitative_top_chunks"]: lines.append( f" top{chunk['rank']}: {chunk['source_path']} | score={chunk['score']} | {chunk['preview']}" ) lines.extend( [ "", "## 4.x.2 Danh gia Generation", f"- Faithfulness: {generation_summary.get('faithfulness', 'N/A')}", f"- Answer relevancy: {generation_summary.get('answer_relevancy', 'N/A')}", f"- Numerical cases: {generation_summary.get('numerical_case_count', 0)}", f"- Numerical accuracy: {generation_summary.get('numerical_accuracy', 'N/A')}", f"- Citation cases: {generation_summary.get('citation_case_count', 0)}", f"- Citation accuracy: {generation_summary.get('citation_accuracy', 'N/A')}", "", "### Qualitative examples", ] ) for case in report["generation"]["cases"][:3]: lines.extend( [ ( f"- Case `{case['case_id']}`: source_count={case['source_count']} " f"numerical_accuracy={case.get('numerical_accuracy', 'N/A')} " f"citation_accuracy={case.get('citation_accuracy', 'N/A')}" ), f" answer: {case['answer_preview']}", ] ) lines.append( f" answer_relevancy={case.get('answer_relevancy', 'N/A')} " f"faithfulness={case.get('faithfulness', 'N/A')}" ) if case.get("deepeval_error"): lines.append(f" deepeval_error: {preview(case['deepeval_error'], 220)}") for metric_name in ["answer_relevancy", "faithfulness"]: reason = case.get("deepeval", {}).get(metric_name, {}).get("reason") if reason: lines.append(f" {metric_name}_reason: {preview(reason, 220)}") for source in case["source_preview"]: lines.append( f" source: {source['source_path']} | modality={source['modality']} | ticker={source['ticker']}" ) multimodal = performance_summary["multimodal_readiness"] lines.extend( [ "", "## 4.x.3 Danh gia hieu nang he thong", f"- Retrieval P95 latency (ms): {performance_summary['retrieval_latency_ms']['p95_ms']}", f"- Answer P95 latency (ms): {performance_summary['generation_latency_ms']['p95_ms']}", f"- Retrieval failures: {performance_summary.get('retrieval_failure_count', 0)}", f"- Answer failures: {performance_summary.get('generation_failure_count', 0)}", f"- Tickers co du chart + table + pdf: {multimodal['tickers_with_chart_table_pdf']}", f"- Artifact lookup latency avg/p95 (ms): {multimodal['artifact_lookup_latency_ms']['avg_ms']} / {multimodal['artifact_lookup_latency_ms']['p95_ms']}", f"- Ghi chu multimodal: {multimodal['note']}", "", "### Multimodal inventory", ] ) for item in multimodal["inventory"]: lines.append( f"- {item['ticker']}: chart={item['has_chart']} tables={item['table_count']} pdfs={item['pdf_count']}" ) return "\n".join(lines) + "\n" def ensure_output_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) def main() -> int: args = parse_args() cases_path = Path(args.cases) output_dir = Path(args.output_dir) ensure_output_dir(output_dir) cases = load_eval_cases(cases_path) retrieval_cases = [evaluate_retrieval_case(case, args.top_k) for case in cases] generation_cases = [ evaluate_generation_case_with_deepeval( case, args.top_k, args.eval_model, args.deepeval_threshold, args.include_reason, ) for case in cases ] performance = evaluate_performance(cases, args.top_k, args.repeats) report = { "generated_at_utc": datetime.now(timezone.utc).isoformat(), "eval_model": effective_eval_model(args.eval_model), "deepeval_threshold": args.deepeval_threshold, "top_k": args.top_k, "repeats": args.repeats, "case_count": len(cases), "cases_path": cases_path.as_posix(), "retrieval": { "summary": summarize_retrieval(retrieval_cases), "cases": retrieval_cases, }, "generation": { "summary": summarize_generation(generation_cases), "cases": generation_cases, }, "performance": performance, } timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") json_path = output_dir / f"evaluation_report_{timestamp}.json" md_path = output_dir / f"evaluation_report_{timestamp}.md" json_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") md_path.write_text(build_markdown_report(report), encoding="utf-8") print(f"Saved JSON report: {json_path.as_posix()}") print(f"Saved Markdown report: {md_path.as_posix()}") print(json.dumps(report["retrieval"]["summary"], ensure_ascii=False, indent=2)) print(json.dumps(report["generation"]["summary"], ensure_ascii=False, indent=2)) print(json.dumps(report["performance"]["retrieval_latency_ms"], ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())