chatvns / app /evaluate.py
liamxdev's picture
Upload folder using huggingface_hub
34b531b verified
Raw
History Blame Contribute Delete
38.2 kB
from __future__ import annotations
import argparse
import json
import math
import os
import re
import statistics
import time
from datetime import datetime, timezone
from pathlib import Path
from app.config import DATA_DIR, GEMINI_API_KEY, GEMINI_MODEL
from app.multimodal import multimodal_artifacts
from app.rag import answer_question
from app.retriever import hybrid_retrieve
EVAL_CASES_PATH = DATA_DIR / "evaluation" / "eval_cases.json"
EVAL_REPORT_DIR = DATA_DIR / "evaluation" / "reports"
SUGGESTED_QUESTIONS_PATH = DATA_DIR / "processed" / "q&a.json"
TICKER_PATTERN = re.compile(r"\b[A-Z]{2,5}\b")
NUMBER_PATTERN = re.compile(r"[-+]?\d+(?:[.,]\d+)*(?:%|x)?")
STOPWORDS = {
"a",
"an",
"and",
"are",
"as",
"at",
"be",
"by",
"cho",
"co",
"cua",
"da",
"duoc",
"gi",
"hay",
"khi",
"khong",
"la",
"mot",
"neu",
"nhung",
"noi",
"nua",
"or",
"tai",
"the",
"thi",
"this",
"to",
"tren",
"tu",
"va",
"ve",
"voi",
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Evaluate retrieval, generation and performance.")
parser.add_argument("--cases", default=str(EVAL_CASES_PATH), help="Path to evaluation cases JSON.")
parser.add_argument("--top-k", type=int, default=5, help="Top-k retrieval cutoff.")
parser.add_argument(
"--repeats",
type=int,
default=3,
help="Number of repeated runs for latency measurements.",
)
parser.add_argument(
"--output-dir",
default=str(EVAL_REPORT_DIR),
help="Directory to write JSON and Markdown reports.",
)
parser.add_argument(
"--eval-model",
default=None,
help="Optional DeepEval model name. Defaults to GEMINI_MODEL when GEMINI_API_KEY is set.",
)
parser.add_argument(
"--deepeval-threshold",
type=float,
default=0.5,
help="Passing threshold for DeepEval metrics.",
)
parser.add_argument(
"--include-reason",
action="store_true",
help="Include DeepEval metric reasons in the JSON/Markdown reports.",
)
return parser.parse_args()
def normalize_text(text: str) -> str:
return " ".join(str(text).lower().replace("\n", " ").split())
def tokenize(text: str) -> list[str]:
cleaned = []
current = []
for char in normalize_text(text):
if char.isalnum():
current.append(char)
else:
if current:
cleaned.append("".join(current))
current = []
if current:
cleaned.append("".join(current))
return cleaned
def informative_tokens(text: str) -> list[str]:
return [token for token in tokenize(text) if len(token) > 2 and token not in STOPWORDS]
def token_set(text: str) -> set[str]:
return set(informative_tokens(text))
def overlap_score(candidate: str, reference: str) -> float:
reference_tokens = token_set(reference)
if not reference_tokens:
return 0.0
candidate_tokens = token_set(candidate)
return len(candidate_tokens & reference_tokens) / len(reference_tokens)
def mean_or_zero(values: list[float]) -> float:
if not values:
return 0.0
return round(statistics.mean(values), 3)
def numeric_values(results: list[dict], key: str) -> list[float]:
values = []
for result in results:
value = result.get(key)
if isinstance(value, (int, float)) and not isinstance(value, bool):
values.append(float(value))
return values
def preview(text: str, limit: int = 180) -> str:
compact = " ".join(str(text).split())
if len(compact) <= limit:
return compact
return compact[: limit - 3] + "..."
def percentile(values: list[float], pct: float) -> float:
if not values:
return 0.0
ordered = sorted(values)
if len(ordered) == 1:
return ordered[0]
rank = pct * (len(ordered) - 1)
lower = math.floor(rank)
upper = math.ceil(rank)
if lower == upper:
return ordered[lower]
fraction = rank - lower
return ordered[lower] * (1 - fraction) + ordered[upper] * fraction
def latency_summary(latencies_ms: list[float]) -> dict:
if not latencies_ms:
return {"count": 0, "avg_ms": 0.0, "p95_ms": 0.0, "min_ms": 0.0, "max_ms": 0.0}
return {
"count": len(latencies_ms),
"avg_ms": round(statistics.mean(latencies_ms), 2),
"p95_ms": round(percentile(latencies_ms, 0.95), 2),
"min_ms": round(min(latencies_ms), 2),
"max_ms": round(max(latencies_ms), 2),
}
def infer_case_ticker(question: str) -> str | None:
for match in TICKER_PATTERN.findall(question.upper()):
if match in {"HPG", "FPT", "VCB"}:
return match
return None
def build_default_eval_cases(limit: int = 9) -> list[dict]:
payload = {}
if SUGGESTED_QUESTIONS_PATH.exists():
payload = json.loads(SUGGESTED_QUESTIONS_PATH.read_text(encoding="utf-8"))
questions = []
for group in payload.get("suggested_questions", []):
if not isinstance(group, dict):
continue
for question in group.get("questions", []):
if isinstance(question, str) and question.strip():
questions.append(question.strip())
if not questions:
questions = [
"Tom tat nhanh co phieu HPG hien tai",
"FPT co nhung dong luc tang truong nao?",
"VCB co diem manh va rui ro gi?",
]
cases = []
for index, question in enumerate(questions[:limit], start=1):
cases.append(
{
"id": f"auto_{index:03d}",
"question": question,
"ticker": infer_case_ticker(question),
"expected_chunks": [],
"expected_answer_keywords": [],
"expected_source_keywords": [],
}
)
return cases
def write_default_eval_cases(path: Path, cases: list[dict]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"note": (
"Auto-generated starter cases. Add expected_chunks, expected_answer_keywords "
"and expected_source_keywords for stricter evaluation."
),
"cases": cases,
}
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def load_eval_cases(path: Path) -> list[dict]:
if not path.exists():
cases = build_default_eval_cases()
write_default_eval_cases(path, cases)
print(f"Created starter eval cases: {path.as_posix()}")
return cases
payload = json.loads(path.read_text(encoding="utf-8"))
if isinstance(payload, dict):
cases = payload.get("cases", [])
elif isinstance(payload, list):
cases = payload
else:
cases = []
return [case for case in cases if isinstance(case, dict)]
def chunk_matches_expectation(chunk, expectation: dict) -> bool:
source_path = normalize_text(getattr(chunk, "source_path", ""))
heading_path = normalize_text(" ".join(getattr(chunk, "heading_path", [])))
text = normalize_text(getattr(chunk, "text", ""))
modality = normalize_text(getattr(chunk, "modality", ""))
ticker = normalize_text(getattr(chunk, "ticker", ""))
scope = normalize_text(getattr(chunk, "scope", "") or getattr(chunk, "ticker", ""))
if expectation.get("ticker") and ticker != normalize_text(expectation["ticker"]):
return False
if expectation.get("scope") and scope != normalize_text(expectation["scope"]):
return False
if expectation.get("modality") and modality != normalize_text(expectation["modality"]):
return False
if expectation.get("source_path_contains"):
if normalize_text(expectation["source_path_contains"]) not in source_path:
return False
if expectation.get("heading_contains_any"):
if not any(normalize_text(value) in heading_path for value in expectation["heading_contains_any"]):
return False
if expectation.get("text_contains_any"):
if not any(normalize_text(value) in text for value in expectation["text_contains_any"]):
return False
return True
def expected_context_text(case: dict) -> str:
values = []
for key in ["expected_output", "expected_answer", "reference_answer"]:
if case.get(key):
values.append(str(case[key]))
values.extend(str(value) for value in case.get("expected_answer_keywords", []))
values.extend(str(value) for value in case.get("expected_source_keywords", []))
for expectation in case.get("expected_chunks", []):
values.extend(str(value) for value in expectation.get("text_contains_any", []))
values.extend(str(value) for value in expectation.get("heading_contains_any", []))
if expectation.get("source_path_contains"):
values.append(str(expectation["source_path_contains"]))
return " ".join(values)
def chunk_relevance_flags(case: dict, chunks) -> list[bool]:
expectations = case.get("expected_chunks", [])
if expectations:
return [
any(chunk_matches_expectation(chunk, expectation) for expectation in expectations)
for chunk in chunks
]
reference = expected_context_text(case) or case["question"]
return [overlap_score(chunk.text, reference) > 0 for chunk in chunks]
def evaluate_retrieval_case(case: dict, top_k: int) -> dict:
started_at = time.perf_counter()
try:
hits = hybrid_retrieve(case["question"], top_k=top_k, ticker=case.get("ticker"))
latency_ms = (time.perf_counter() - started_at) * 1000
except Exception as exc: # noqa: BLE001
return {
"case_id": case["id"],
"question": case["question"],
"ticker": case.get("ticker"),
"latency_ms": round((time.perf_counter() - started_at) * 1000, 2),
"top_k": top_k,
"expected_evidence_count": len(case.get("expected_chunks", [])),
"matched_evidence_count": 0,
"strict_evaluation": bool(case.get("expected_chunks")),
"recall_at_k": None,
"precision_at_k": None,
"hit_rate_at_k": None,
"first_relevant_rank": None,
"mrr": None,
"qualitative_top_chunks": [],
"error": str(exc),
}
expectations = case.get("expected_chunks", [])
matched_ranks: list[int] = []
for expectation in expectations:
matched_rank = None
for rank, chunk in enumerate(hits, start=1):
if chunk_matches_expectation(chunk, expectation):
matched_rank = rank
break
if matched_rank is not None:
matched_ranks.append(matched_rank)
expected_count = len(expectations)
strict_evaluation = expected_count > 0
first_relevant_rank = min(matched_ranks) if matched_ranks else None
coverage = (len(matched_ranks) / expected_count) if strict_evaluation else None
relevance_flags = chunk_relevance_flags(case, hits)
if strict_evaluation and relevance_flags and first_relevant_rank is None:
first_relevant_rank = next((rank for rank, flag in enumerate(relevance_flags, start=1) if flag), None)
evaluated_flags = relevance_flags[:top_k]
precision_denominator = len(evaluated_flags)
relevant_retrieved_count = sum(evaluated_flags) if strict_evaluation else 0
precision_at_k = (
relevant_retrieved_count / precision_denominator
if strict_evaluation and precision_denominator > 0
else (0.0 if strict_evaluation else None)
)
hit_rate_at_k = (
float(any(evaluated_flags)) if strict_evaluation else None
)
return {
"case_id": case["id"],
"question": case["question"],
"ticker": case.get("ticker"),
"latency_ms": round(latency_ms, 2),
"top_k": top_k,
"strict_evaluation": strict_evaluation,
"expected_evidence_count": expected_count,
"matched_evidence_count": len(matched_ranks),
"relevant_retrieved_count": relevant_retrieved_count if strict_evaluation else None,
"recall_at_k": round(coverage, 3) if coverage is not None else None,
"precision_at_k": round(precision_at_k, 3) if precision_at_k is not None else None,
"hit_rate_at_k": round(hit_rate_at_k, 3) if hit_rate_at_k is not None else None,
"first_relevant_rank": first_relevant_rank,
"mrr": round(1 / first_relevant_rank, 4) if first_relevant_rank else None,
"qualitative_top_chunks": [
{
"rank": rank,
"score": round(chunk.score, 4),
"ticker": chunk.ticker,
"scope": chunk.scope,
"modality": chunk.modality,
"source_path": chunk.source_path,
"heading_path": chunk.heading_path,
"preview": preview(chunk.text),
}
for rank, chunk in enumerate(hits[: min(3, len(hits))], start=1)
],
}
def source_keyword_match_score(sources: list[dict], expected_keywords: list[str]) -> float:
if not expected_keywords:
return 1.0
haystack = normalize_text(
" ".join(
f"{source.get('source_path', '')} {source.get('artifact_path', '')} {source.get('url', '')}"
for source in sources
)
)
hits = sum(1 for keyword in expected_keywords if normalize_text(keyword) in haystack)
return hits / len(expected_keywords)
def context_grounding_score(answer: str, retrieved_chunks) -> float:
answer_tokens = informative_tokens(answer)
if not answer_tokens:
return 0.0
context_tokens: set[str] = set()
for chunk in retrieved_chunks:
context_tokens.update(informative_tokens(chunk.text))
if not context_tokens:
return 0.0
shared = sum(1 for token in answer_tokens if token in context_tokens)
return shared / len(answer_tokens)
def lexical_answer_relevancy_score(question: str, answer: str) -> float:
question_tokens = token_set(question)
answer_tokens = token_set(answer)
if not question_tokens:
return 0.0
return len(question_tokens & answer_tokens) / len(question_tokens)
def extract_numbers(text: str) -> list[float]:
values = []
for raw_value in NUMBER_PATTERN.findall(text):
parsed = parse_number_token(raw_value)
if parsed is None:
continue
values.append(parsed)
return values
def parse_number_token(raw_value: str) -> float | None:
value = raw_value.strip().rstrip("%xX")
if not value:
return None
sign = ""
if value[0] in {"+", "-"}:
sign, value = value[0], value[1:]
if "," in value and "." in value:
last_comma = value.rfind(",")
last_dot = value.rfind(".")
if last_comma > last_dot:
normalized = value.replace(".", "").replace(",", ".")
else:
normalized = value.replace(",", "")
elif "," in value:
parts = value.split(",")
if len(parts) > 2 or len(parts[-1]) == 3:
normalized = "".join(parts)
else:
normalized = value.replace(",", ".")
elif "." in value:
parts = value.split(".")
if len(parts) > 2:
normalized = "".join(parts)
elif len(parts[-1]) == 3 and len(parts[0]) > 2:
normalized = "".join(parts)
else:
normalized = value
else:
normalized = value
try:
return float(f"{sign}{normalized}")
except ValueError:
return None
def numerical_accuracy_score(answer: str, case: dict) -> float | None:
expected_numbers = case.get("expected_numbers")
if expected_numbers is None:
return None
else:
expected_numbers = [float(value) for value in expected_numbers]
if not expected_numbers:
return None
answer_numbers = extract_numbers(answer)
if not answer_numbers:
return 0.0
matched = 0
remaining = answer_numbers[:]
for expected in expected_numbers:
tolerance = max(abs(expected) * 0.01, 0.01)
match_index = next(
(index for index, actual in enumerate(remaining) if math.isclose(actual, expected, abs_tol=tolerance)),
None,
)
if match_index is not None:
matched += 1
remaining.pop(match_index)
return matched / len(expected_numbers)
def citation_accuracy_score(sources: list[dict], case: dict) -> float | None:
expected_keywords = case.get("expected_source_keywords", [])
if expected_keywords:
return source_keyword_match_score(sources, expected_keywords)
expectations = case.get("expected_chunks", [])
if not expectations:
return None
matched = 0
for expectation in expectations:
expected_path = normalize_text(expectation.get("source_path_contains", ""))
expected_text = normalize_text(expectation.get("text_contains", ""))
for source in sources:
source_text = normalize_text(
" ".join(
str(source.get(key, ""))
for key in ["source_path", "artifact_path", "url", "title", "structure_type"]
)
)
if expected_path and expected_path in source_text:
matched += 1
break
if expected_text and expected_text in source_text:
matched += 1
break
return matched / len(expectations)
def evaluate_generation_case(case: dict, top_k: int) -> dict:
started_at = time.perf_counter()
try:
result = answer_question(case["question"], ticker=case.get("ticker"), top_k=top_k)
latency_ms = (time.perf_counter() - started_at) * 1000
except Exception as exc: # noqa: BLE001
return {
"case_id": case["id"],
"question": case["question"],
"ticker": case.get("ticker"),
"latency_ms": round((time.perf_counter() - started_at) * 1000, 2),
"source_count": 0,
"numerical_accuracy": None,
"citation_accuracy": None,
"has_sources": False,
"answer_preview": "",
"source_preview": [],
"error": str(exc),
}
answer = str(result.get("answer", ""))
sources = list(result.get("sources", []))
numerical_accuracy = numerical_accuracy_score(answer, case)
citation_accuracy = citation_accuracy_score(sources, case)
fallback_metrics = {}
if not case.get("expected_numbers") and not extract_numbers(expected_output_for_deepeval(case)):
fallback_metrics["numerical_accuracy"] = "no_expected_numbers"
if not case.get("expected_source_keywords") and not case.get("expected_chunks"):
fallback_metrics["citation_accuracy"] = "sources_present_without_expected_citations"
return {
"case_id": case["id"],
"question": case["question"],
"ticker": case.get("ticker"),
"latency_ms": round(latency_ms, 2),
"source_count": len(sources),
"numerical_accuracy": round(numerical_accuracy, 3) if numerical_accuracy is not None else None,
"citation_accuracy": round(citation_accuracy, 3) if citation_accuracy is not None else None,
"fallback_metrics": fallback_metrics,
"has_sources": bool(sources),
"answer": answer,
"answer_preview": preview(answer, limit=260),
"source_preview": [
{
"ticker": source.get("ticker"),
"modality": source.get("modality"),
"structure_type": source.get("structure_type"),
"source_path": source.get("source_path"),
"url": source.get("url"),
}
for source in sources[:3]
],
}
def expected_output_for_deepeval(case: dict) -> str:
return str(
case.get("expected_output")
or case.get("expected_answer")
or case.get("reference_answer")
or expected_context_text(case)
or ""
)
def deepeval_metric_kwargs(eval_model: str | None, threshold: float, include_reason: bool) -> dict:
kwargs = {
"threshold": threshold,
"include_reason": include_reason,
}
if eval_model and not eval_model.lower().startswith("gemini"):
kwargs["model"] = eval_model
elif GEMINI_API_KEY:
from deepeval.models.llms.gemini_model import GeminiModel
kwargs["model"] = GeminiModel(
model=eval_model or GEMINI_MODEL,
api_key=GEMINI_API_KEY or None,
temperature=0,
)
return kwargs
def effective_eval_model(eval_model: str | None) -> str | None:
if eval_model:
return eval_model
if GEMINI_API_KEY:
return GEMINI_MODEL
return None
def measure_deepeval_metric(metric, test_case) -> dict:
metric.measure(test_case)
return {
"score": round(float(getattr(metric, "score", 0.0) or 0.0), 3),
"reason": getattr(metric, "reason", None),
"success": bool(getattr(metric, "success", False)),
}
def apply_deepeval_generation_scores(
case: dict,
result: dict,
chunks,
answer: str,
eval_model: str | None,
threshold: float,
include_reason: bool,
) -> dict:
try:
from deepeval.metrics import AnswerRelevancyMetric, FaithfulnessMetric
from deepeval.test_case import LLMTestCase
except ImportError as exc:
result["deepeval_error"] = (
"DeepEval is not installed. Install the optional dependency with "
"`pip install deepeval` before running evaluation."
)
result["deepeval_import_error"] = str(exc)
return result
retrieval_context = [chunk.text for chunk in chunks]
test_case_kwargs = {
"input": case["question"],
"actual_output": answer,
"retrieval_context": retrieval_context,
}
expected_output = expected_output_for_deepeval(case)
if expected_output:
test_case_kwargs["expected_output"] = expected_output
test_case = LLMTestCase(**test_case_kwargs)
metric_kwargs = deepeval_metric_kwargs(eval_model, threshold, include_reason)
scores = {}
for key, metric_cls in [
("answer_relevancy", AnswerRelevancyMetric),
("faithfulness", FaithfulnessMetric),
]:
try:
scores[key] = measure_deepeval_metric(metric_cls(**metric_kwargs), test_case)
result[key] = scores[key]["score"]
result.get("fallback_metrics", {}).pop(key, None)
except Exception as exc: # noqa: BLE001
scores[key] = {"error": str(exc)}
result["deepeval"] = scores
return result
def evaluate_generation_case_with_deepeval(
case: dict,
top_k: int,
eval_model: str | None,
threshold: float,
include_reason: bool,
) -> dict:
result = evaluate_generation_case(case, top_k)
if result.get("error"):
return result
try:
retrieval_chunks = hybrid_retrieve(case["question"], top_k=top_k, ticker=case.get("ticker"))
except Exception as exc: # noqa: BLE001
result["deepeval_error"] = str(exc)
return result
fallback_scores = {
"faithfulness": round(context_grounding_score(str(result.get("answer", "")), retrieval_chunks), 3),
"answer_relevancy": round(
lexical_answer_relevancy_score(case["question"], str(result.get("answer", ""))),
3,
),
}
result.setdefault("fallback_metrics", {}).update(
{
"faithfulness": "lexical_context_grounding",
"answer_relevancy": "question_answer_token_overlap",
}
)
result = apply_deepeval_generation_scores(
case,
result,
retrieval_chunks,
str(result.get("answer", "")),
eval_model,
threshold,
include_reason,
)
for metric_name, fallback_score in fallback_scores.items():
if result.get(metric_name) is None:
result[metric_name] = fallback_score
return result
def evaluate_multimodal_readiness(cases: list[dict]) -> dict:
tickers = sorted({str(case.get("ticker", "")).upper() for case in cases if case.get("ticker")})
inventory = []
lookup_latencies = []
for ticker in tickers:
started_at = time.perf_counter()
artifacts = multimodal_artifacts(ticker)
lookup_latencies.append((time.perf_counter() - started_at) * 1000)
inventory.append(
{
"ticker": ticker,
"has_chart": bool(artifacts["chart"]),
"table_count": len(artifacts["tables"]),
"pdf_count": len(artifacts["pdfs"]),
}
)
ready_all_three = sum(
1
for item in inventory
if item["has_chart"] and item["table_count"] > 0 and item["pdf_count"] > 0
)
return {
"tickers_evaluated": tickers,
"inventory": inventory,
"tickers_with_chart_table_pdf": ready_all_three,
"artifact_lookup_latency_ms": latency_summary(lookup_latencies),
"note": (
"Current system surfaces image/csv/pdf artifacts in UI and sources, "
"but retrieval is still primarily text+dense/BM25 rather than true multimodal embedding."
),
}
def evaluate_performance(cases: list[dict], top_k: int, repeats: int) -> dict:
retrieval_latencies = []
generation_latencies = []
retrieval_errors: dict[str, int] = {}
generation_errors: dict[str, int] = {}
per_case = []
for case in cases:
case_retrieval_latencies = []
case_generation_latencies = []
for _ in range(repeats):
try:
retrieval_started_at = time.perf_counter()
hybrid_retrieve(case["question"], top_k=top_k, ticker=case.get("ticker"))
case_retrieval_latencies.append((time.perf_counter() - retrieval_started_at) * 1000)
except Exception as exc: # noqa: BLE001
message = str(exc)
retrieval_errors[message] = retrieval_errors.get(message, 0) + 1
try:
generation_started_at = time.perf_counter()
answer_question(case["question"], ticker=case.get("ticker"), top_k=top_k)
case_generation_latencies.append((time.perf_counter() - generation_started_at) * 1000)
except Exception as exc: # noqa: BLE001
message = str(exc)
generation_errors[message] = generation_errors.get(message, 0) + 1
retrieval_latencies.extend(case_retrieval_latencies)
generation_latencies.extend(case_generation_latencies)
per_case.append(
{
"case_id": case["id"],
"question": case["question"],
"ticker": case.get("ticker"),
"retrieval_latency_ms": latency_summary(case_retrieval_latencies),
"generation_latency_ms": latency_summary(case_generation_latencies),
}
)
return {
"retrieval_latency_ms": latency_summary(retrieval_latencies),
"generation_latency_ms": latency_summary(generation_latencies),
"retrieval_failure_count": sum(retrieval_errors.values()),
"generation_failure_count": sum(generation_errors.values()),
"retrieval_errors": retrieval_errors,
"generation_errors": generation_errors,
"multimodal_readiness": evaluate_multimodal_readiness(cases),
"per_case": per_case,
}
def summarize_retrieval(results: list[dict]) -> dict:
total = len(results)
error_count = sum(1 for result in results if result.get("error"))
strict_results = [result for result in results if result.get("strict_evaluation")]
mrr_values = numeric_values(strict_results, "mrr")
recall_at_k_values = numeric_values(strict_results, "recall_at_k")
precision_at_k_values = numeric_values(strict_results, "precision_at_k")
hit_rate_at_k_values = numeric_values(strict_results, "hit_rate_at_k")
latencies = [result["latency_ms"] for result in results]
return {
"case_count": total,
"error_count": error_count,
"strict_case_count": len(strict_results),
"smoke_case_count": total - len(strict_results),
"mean_mrr": round(statistics.mean(mrr_values), 3) if mrr_values else None,
"recall_at_k": mean_or_zero(recall_at_k_values) if recall_at_k_values else None,
"precision_at_k": mean_or_zero(precision_at_k_values) if precision_at_k_values else None,
"hit_rate_at_k": mean_or_zero(hit_rate_at_k_values) if hit_rate_at_k_values else None,
"latency_ms": latency_summary(latencies),
}
def summarize_generation(results: list[dict]) -> dict:
total = len(results)
error_count = sum(1 for result in results if result.get("error"))
answer_relevancy_values = numeric_values(results, "answer_relevancy")
faithfulness_values = numeric_values(results, "faithfulness")
numerical_accuracy_values = numeric_values(results, "numerical_accuracy")
citation_accuracy_values = numeric_values(results, "citation_accuracy")
latencies = [result["latency_ms"] for result in results]
return {
"case_count": total,
"error_count": error_count,
"latency_ms": latency_summary(latencies),
"faithfulness": mean_or_zero(faithfulness_values) if faithfulness_values else None,
"answer_relevancy": mean_or_zero(answer_relevancy_values) if answer_relevancy_values else None,
"numerical_case_count": len(numerical_accuracy_values),
"numerical_accuracy": mean_or_zero(numerical_accuracy_values) if numerical_accuracy_values else None,
"citation_case_count": len(citation_accuracy_values),
"citation_accuracy": mean_or_zero(citation_accuracy_values) if citation_accuracy_values else None,
}
def build_markdown_report(report: dict) -> str:
retrieval_summary = report["retrieval"]["summary"]
generation_summary = report["generation"]["summary"]
performance_summary = report["performance"]
lines = [
"# Danh gia he thong",
"",
f"Thoi gian tao bao cao: {report['generated_at_utc']}",
f"DeepEval model: {report.get('eval_model') or 'N/A'}",
f"Top-k: {report['top_k']}",
f"So case: {report['case_count']}",
"",
"## 4.x.1 Danh gia Retrieval",
f"- Strict cases: {retrieval_summary.get('strict_case_count', 0)}",
f"- Smoke-only cases: {retrieval_summary.get('smoke_case_count', 0)}",
f"- Mean MRR: {retrieval_summary['mean_mrr']}",
f"- Recall@{report['top_k']}: {retrieval_summary['recall_at_k']}",
f"- Precision@{report['top_k']}: {retrieval_summary['precision_at_k']}",
f"- Hit Rate@{report['top_k']}: {retrieval_summary['hit_rate_at_k']}",
"",
"### Qualitative examples",
]
for case in report["retrieval"]["cases"][:3]:
lines.extend(
[
f"- Case `{case['case_id']}`: {case['question']}",
(
f" strict={case.get('strict_evaluation', False)} "
f"mrr={case['mrr']} recall_at_k={case.get('recall_at_k', 'N/A')} "
f"precision_at_k={case.get('precision_at_k', 'N/A')} "
f"hit_rate_at_k={case.get('hit_rate_at_k', 'N/A')}"
),
]
)
if case.get("deepeval_error"):
lines.append(f" deepeval_error: {preview(case['deepeval_error'], 220)}")
for chunk in case["qualitative_top_chunks"]:
lines.append(
f" top{chunk['rank']}: {chunk['source_path']} | score={chunk['score']} | {chunk['preview']}"
)
lines.extend(
[
"",
"## 4.x.2 Danh gia Generation",
f"- Faithfulness: {generation_summary.get('faithfulness', 'N/A')}",
f"- Answer relevancy: {generation_summary.get('answer_relevancy', 'N/A')}",
f"- Numerical cases: {generation_summary.get('numerical_case_count', 0)}",
f"- Numerical accuracy: {generation_summary.get('numerical_accuracy', 'N/A')}",
f"- Citation cases: {generation_summary.get('citation_case_count', 0)}",
f"- Citation accuracy: {generation_summary.get('citation_accuracy', 'N/A')}",
"",
"### Qualitative examples",
]
)
for case in report["generation"]["cases"][:3]:
lines.extend(
[
(
f"- Case `{case['case_id']}`: source_count={case['source_count']} "
f"numerical_accuracy={case.get('numerical_accuracy', 'N/A')} "
f"citation_accuracy={case.get('citation_accuracy', 'N/A')}"
),
f" answer: {case['answer_preview']}",
]
)
lines.append(
f" answer_relevancy={case.get('answer_relevancy', 'N/A')} "
f"faithfulness={case.get('faithfulness', 'N/A')}"
)
if case.get("deepeval_error"):
lines.append(f" deepeval_error: {preview(case['deepeval_error'], 220)}")
for metric_name in ["answer_relevancy", "faithfulness"]:
reason = case.get("deepeval", {}).get(metric_name, {}).get("reason")
if reason:
lines.append(f" {metric_name}_reason: {preview(reason, 220)}")
for source in case["source_preview"]:
lines.append(
f" source: {source['source_path']} | modality={source['modality']} | ticker={source['ticker']}"
)
multimodal = performance_summary["multimodal_readiness"]
lines.extend(
[
"",
"## 4.x.3 Danh gia hieu nang he thong",
f"- Retrieval P95 latency (ms): {performance_summary['retrieval_latency_ms']['p95_ms']}",
f"- Answer P95 latency (ms): {performance_summary['generation_latency_ms']['p95_ms']}",
f"- Retrieval failures: {performance_summary.get('retrieval_failure_count', 0)}",
f"- Answer failures: {performance_summary.get('generation_failure_count', 0)}",
f"- Tickers co du chart + table + pdf: {multimodal['tickers_with_chart_table_pdf']}",
f"- Artifact lookup latency avg/p95 (ms): {multimodal['artifact_lookup_latency_ms']['avg_ms']} / {multimodal['artifact_lookup_latency_ms']['p95_ms']}",
f"- Ghi chu multimodal: {multimodal['note']}",
"",
"### Multimodal inventory",
]
)
for item in multimodal["inventory"]:
lines.append(
f"- {item['ticker']}: chart={item['has_chart']} tables={item['table_count']} pdfs={item['pdf_count']}"
)
return "\n".join(lines) + "\n"
def ensure_output_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def main() -> int:
args = parse_args()
cases_path = Path(args.cases)
output_dir = Path(args.output_dir)
ensure_output_dir(output_dir)
cases = load_eval_cases(cases_path)
retrieval_cases = [evaluate_retrieval_case(case, args.top_k) for case in cases]
generation_cases = [
evaluate_generation_case_with_deepeval(
case,
args.top_k,
args.eval_model,
args.deepeval_threshold,
args.include_reason,
)
for case in cases
]
performance = evaluate_performance(cases, args.top_k, args.repeats)
report = {
"generated_at_utc": datetime.now(timezone.utc).isoformat(),
"eval_model": effective_eval_model(args.eval_model),
"deepeval_threshold": args.deepeval_threshold,
"top_k": args.top_k,
"repeats": args.repeats,
"case_count": len(cases),
"cases_path": cases_path.as_posix(),
"retrieval": {
"summary": summarize_retrieval(retrieval_cases),
"cases": retrieval_cases,
},
"generation": {
"summary": summarize_generation(generation_cases),
"cases": generation_cases,
},
"performance": performance,
}
timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
json_path = output_dir / f"evaluation_report_{timestamp}.json"
md_path = output_dir / f"evaluation_report_{timestamp}.md"
json_path.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
md_path.write_text(build_markdown_report(report), encoding="utf-8")
print(f"Saved JSON report: {json_path.as_posix()}")
print(f"Saved Markdown report: {md_path.as_posix()}")
print(json.dumps(report["retrieval"]["summary"], ensure_ascii=False, indent=2))
print(json.dumps(report["generation"]["summary"], ensure_ascii=False, indent=2))
print(json.dumps(report["performance"]["retrieval_latency_ms"], ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())