Spaces:
Sleeping
Sleeping
| import re | |
| from typing import List, Optional, Dict, Any | |
| from app.schemas.evaluation_schema import ( | |
| AnswerTestCase, | |
| AnswerEvaluationRunRequest, | |
| AnswerSingleResult, | |
| AnswerEvaluationSummary, | |
| AnswerEvaluationReport | |
| ) | |
| from app.evaluation.answer_eval_storage import load_answer_test_cases | |
| from app.generation.answer_service import answer_question | |
| STOPWORDS = { | |
| "the", "a", "an", "and", "or", "of", "to", "in", "on", "by", "for", | |
| "with", "from", "is", "are", "was", "were", "be", "been", "it", | |
| "this", "that", "as", "at", "which", "what", "how", "why" | |
| } | |
| def run_answer_evaluation( | |
| request: AnswerEvaluationRunRequest | |
| ) -> AnswerEvaluationReport: | |
| all_test_cases = load_answer_test_cases() | |
| if request.test_case_ids: | |
| selected_ids = set(request.test_case_ids) | |
| test_cases = [ | |
| test_case for test_case in all_test_cases | |
| if test_case.test_case_id in selected_ids | |
| ] | |
| else: | |
| test_cases = all_test_cases | |
| results = [] | |
| for test_case in test_cases: | |
| result = evaluate_single_answer_test_case( | |
| test_case=test_case, | |
| use_llm_override=request.use_llm_override, | |
| retrieval_mode_override=request.retrieval_mode_override | |
| ) | |
| results.append(result) | |
| summary = build_answer_evaluation_summary(results) | |
| return AnswerEvaluationReport( | |
| summary=summary, | |
| results=results | |
| ) | |
| def evaluate_single_answer_test_case( | |
| test_case: AnswerTestCase, | |
| use_llm_override: Optional[bool] = None, | |
| retrieval_mode_override: Optional[str] = None | |
| ) -> AnswerSingleResult: | |
| use_llm = ( | |
| use_llm_override | |
| if use_llm_override is not None | |
| else test_case.use_llm | |
| ) | |
| retrieval_mode = retrieval_mode_override or test_case.retrieval_mode | |
| answer_output = answer_question( | |
| query=test_case.question, | |
| document_id=test_case.document_id, | |
| top_k=test_case.top_k, | |
| retrieval_mode=retrieval_mode, | |
| use_reranker=test_case.use_reranker, | |
| use_llm=use_llm | |
| ) | |
| answer = answer_output.get("answer", "") | |
| citations = answer_output.get("citations", []) | |
| sources = answer_output.get("sources", []) | |
| answer_word_count = count_words(answer) | |
| citation_present = has_citation(answer) | |
| source_count = len(sources) | |
| matched_keywords, missing_keywords, keyword_match_ratio = evaluate_keywords( | |
| answer=answer, | |
| expected_keywords=test_case.expected_answer_keywords | |
| ) | |
| forbidden_keywords_found = find_forbidden_keywords( | |
| answer=answer, | |
| forbidden_keywords=test_case.forbidden_answer_keywords | |
| ) | |
| groundedness_score = compute_groundedness_score( | |
| answer=answer, | |
| sources=sources | |
| ) | |
| groundedness_passed = ( | |
| groundedness_score >= test_case.minimum_groundedness_score | |
| ) | |
| failure_reasons = [] | |
| if answer_word_count < test_case.minimum_answer_words: | |
| failure_reasons.append( | |
| f"Answer is too short. Expected at least {test_case.minimum_answer_words} words." | |
| ) | |
| if test_case.require_citations and not citation_present: | |
| failure_reasons.append("Answer does not contain required citations.") | |
| if test_case.require_sources and source_count == 0: | |
| failure_reasons.append("Answer does not include any retrieved sources.") | |
| if test_case.expected_answer_keywords: | |
| if keyword_match_ratio < test_case.minimum_keyword_match_ratio: | |
| failure_reasons.append( | |
| "Answer did not match enough expected keywords." | |
| ) | |
| if forbidden_keywords_found: | |
| failure_reasons.append( | |
| "Answer contains forbidden keywords." | |
| ) | |
| if not groundedness_passed: | |
| failure_reasons.append( | |
| "Answer does not appear grounded enough in retrieved sources." | |
| ) | |
| passed = len(failure_reasons) == 0 | |
| return AnswerSingleResult( | |
| test_case_id=test_case.test_case_id, | |
| question=test_case.question, | |
| passed=passed, | |
| failure_reasons=failure_reasons, | |
| answer=answer, | |
| answer_strategy=answer_output.get("answer_strategy"), | |
| used_llm=answer_output.get("used_llm", False), | |
| used_reranker=answer_output.get("used_reranker", False), | |
| retrieval_mode=answer_output.get("retrieval_mode", retrieval_mode), | |
| answer_word_count=answer_word_count, | |
| citation_present=citation_present, | |
| source_count=source_count, | |
| keyword_match_ratio=keyword_match_ratio, | |
| matched_keywords=matched_keywords, | |
| missing_keywords=missing_keywords, | |
| forbidden_keywords_found=forbidden_keywords_found, | |
| groundedness_score=groundedness_score, | |
| groundedness_passed=groundedness_passed, | |
| citations_preview=simplify_citations(citations), | |
| sources_preview=simplify_sources(sources) | |
| ) | |
| def count_words(text: str) -> int: | |
| return len(re.findall(r"[a-zA-Z0-9_]+", text or "")) | |
| def has_citation(text: str) -> bool: | |
| if not text: | |
| return False | |
| return bool(re.search(r"\[S\d+\]", text)) | |
| def evaluate_keywords( | |
| answer: str, | |
| expected_keywords: List[str] | |
| ): | |
| if not expected_keywords: | |
| return [], [], None | |
| answer_lower = answer.lower() | |
| matched_keywords = [] | |
| missing_keywords = [] | |
| for keyword in expected_keywords: | |
| keyword_lower = keyword.lower().strip() | |
| if keyword_lower in answer_lower: | |
| matched_keywords.append(keyword) | |
| else: | |
| missing_keywords.append(keyword) | |
| keyword_match_ratio = round( | |
| len(matched_keywords) / len(expected_keywords), | |
| 4 | |
| ) | |
| return matched_keywords, missing_keywords, keyword_match_ratio | |
| def find_forbidden_keywords( | |
| answer: str, | |
| forbidden_keywords: List[str] | |
| ) -> List[str]: | |
| if not forbidden_keywords: | |
| return [] | |
| answer_lower = answer.lower() | |
| found = [] | |
| for keyword in forbidden_keywords: | |
| keyword_lower = keyword.lower().strip() | |
| if keyword_lower in answer_lower: | |
| found.append(keyword) | |
| return found | |
| def tokenize_for_groundedness(text: str) -> set: | |
| words = re.findall(r"[a-zA-Z0-9_]+", (text or "").lower()) | |
| tokens = { | |
| word for word in words | |
| if word not in STOPWORDS and len(word) > 2 | |
| } | |
| return tokens | |
| def compute_groundedness_score( | |
| answer: str, | |
| sources: List[Dict[str, Any]] | |
| ) -> float: | |
| answer_tokens = tokenize_for_groundedness(answer) | |
| if not answer_tokens: | |
| return 0.0 | |
| source_text = " ".join( | |
| source.get("content", "") | |
| for source in sources | |
| ) | |
| source_tokens = tokenize_for_groundedness(source_text) | |
| if not source_tokens: | |
| return 0.0 | |
| overlap = answer_tokens.intersection(source_tokens) | |
| score = len(overlap) / len(answer_tokens) | |
| return round(score, 4) | |
| def simplify_citations(citations: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| simplified = [] | |
| for citation in citations[:5]: | |
| simplified.append( | |
| { | |
| "source_id": citation.get("source_id"), | |
| "source_file_name": citation.get("source_file_name"), | |
| "page_number": citation.get("page_number"), | |
| "citation_text": citation.get("citation_text") | |
| } | |
| ) | |
| return simplified | |
| def simplify_sources(sources: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| simplified = [] | |
| for source in sources[:5]: | |
| content = source.get("content", "") | |
| simplified.append( | |
| { | |
| "source_id": source.get("source_id"), | |
| "score": source.get("score"), | |
| "chunk_id": source.get("chunk_id"), | |
| "source_file_name": source.get("source_file_name"), | |
| "page_number": source.get("page_number"), | |
| "content_preview": content[:250] | |
| } | |
| ) | |
| return simplified | |
| def build_answer_evaluation_summary( | |
| results: List[AnswerSingleResult] | |
| ) -> AnswerEvaluationSummary: | |
| total_cases = len(results) | |
| if total_cases == 0: | |
| return AnswerEvaluationSummary( | |
| total_cases=0, | |
| passed_cases=0, | |
| failed_cases=0, | |
| pass_rate=0.0, | |
| average_groundedness_score=0.0, | |
| average_answer_word_count=0.0 | |
| ) | |
| passed_cases = sum(1 for result in results if result.passed) | |
| failed_cases = total_cases - passed_cases | |
| pass_rate = round(passed_cases / total_cases, 4) | |
| citation_pass_rate = round( | |
| sum(1 for result in results if result.citation_present) / total_cases, | |
| 4 | |
| ) | |
| source_presence_rate = round( | |
| sum(1 for result in results if result.source_count > 0) / total_cases, | |
| 4 | |
| ) | |
| keyword_results = [ | |
| result for result in results | |
| if result.keyword_match_ratio is not None | |
| ] | |
| keyword_pass_rate = None | |
| if keyword_results: | |
| keyword_pass_rate = round( | |
| sum( | |
| 1 for result in keyword_results | |
| if result.keyword_match_ratio is not None | |
| and result.keyword_match_ratio >= 0.5 | |
| ) / len(keyword_results), | |
| 4 | |
| ) | |
| groundedness_pass_rate = round( | |
| sum(1 for result in results if result.groundedness_passed) / total_cases, | |
| 4 | |
| ) | |
| average_groundedness_score = round( | |
| sum(result.groundedness_score for result in results) / total_cases, | |
| 4 | |
| ) | |
| average_answer_word_count = round( | |
| sum(result.answer_word_count for result in results) / total_cases, | |
| 2 | |
| ) | |
| return AnswerEvaluationSummary( | |
| total_cases=total_cases, | |
| passed_cases=passed_cases, | |
| failed_cases=failed_cases, | |
| pass_rate=pass_rate, | |
| citation_pass_rate=citation_pass_rate, | |
| source_presence_rate=source_presence_rate, | |
| keyword_pass_rate=keyword_pass_rate, | |
| groundedness_pass_rate=groundedness_pass_rate, | |
| average_groundedness_score=average_groundedness_score, | |
| average_answer_word_count=average_answer_word_count | |
| ) | |