Spaces:
Running
Running
| """ | |
| verifier.py | |
| =========== | |
| Phase 7 β Answer Grounding & Hallucination Detection | |
| Verifies that every sentence in an LLM-generated answer is supported by the | |
| retrieved context chunks. Uses a Natural Language Inference (NLI) cross-encoder | |
| to classify each (context, claim) pair as: | |
| SUPPORTED β context entails the claim (entailment score β₯ threshold) | |
| UNVERIFIED β context neither supports nor contradicts (neutral) | |
| CONTRADICTED β context explicitly contradicts the claim | |
| NLI Model | |
| --------- | |
| cross-encoder/nli-deberta-v3-small | |
| - ~180 MB, runs on CPU | |
| - Label order: {0: contradiction, 1: entailment, 2: neutral} | |
| - Input: (premise=context_chunk, hypothesis=answer_sentence) | |
| Two-step verification process | |
| ------------------------------ | |
| 1. Sentence splitting | |
| Split the LLM answer into individual claims using NLTK sentence tokenizer. | |
| 2. Claim-context entailment | |
| For each sentence, pair it against every retrieved context chunk. | |
| The chunk with the highest entailment score is the best support. | |
| Classify the sentence based on that best score. | |
| 3. Citation verification (optional) | |
| Extract [1], [2], ... references from the answer. | |
| Check that the cited chunk actually supports the citing sentence. | |
| Grounding score | |
| --------------- | |
| grounding_score = supported_sentences / total_sentences | |
| Range: 0.0 (fully hallucinated) β 1.0 (fully grounded) | |
| Typical acceptable threshold: β₯ 0.7 | |
| Usage | |
| ----- | |
| from src.verifier import AnswerVerifier | |
| verifier = AnswerVerifier() | |
| result = verifier.verify(answer=answer_text, chunks=retrieved_chunks) | |
| print(f"Grounding score: {result['grounding_score']:.0%}") | |
| for r in result["sentence_results"]: | |
| verdict = r["verdict"] | |
| sent = r["sentence"][:80] | |
| print(f" [{verdict:12s}] {sent}") | |
| # Citation check | |
| cites = verifier.check_citations(answer=answer_text, chunks=retrieved_chunks) | |
| for c in cites: | |
| print(f" {c['citation']} {c['status']} entail={c.get('entail_score','n/a')}") | |
| """ | |
| import logging | |
| import re | |
| from pathlib import Path | |
| # ββ Logging ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level = logging.INFO, | |
| format = "%(asctime)s %(levelname)-8s %(message)s", | |
| ) | |
| log = logging.getLogger(__name__) | |
| # ββ Constants ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| NLI_MODEL = "cross-encoder/nli-deberta-v3-small" | |
| ENTAIL_IDX = 1 # confirmed: {0: contradiction, 1: entailment, 2: neutral} | |
| CONTRA_IDX = 0 | |
| NEUTRAL_IDX = 2 | |
| ENTAIL_THRESHOLD = 0.50 # sentence classified as SUPPORTED if entail β₯ this | |
| CONTRA_THRESHOLD = 0.40 # sentence classified as CONTRADICTED if contra β₯ this | |
| MIN_SENTENCE_LEN = 20 # shorter fragments are skipped (headers, bullet markers) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ANSWER VERIFIER | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class AnswerVerifier: | |
| """ | |
| Verifies LLM answer grounding against retrieved context chunks using NLI. | |
| Attributes | |
| ---------- | |
| entail_threshold : float | |
| Minimum entailment probability for a sentence to be SUPPORTED. | |
| contra_threshold : float | |
| Minimum contradiction probability to flag a sentence as CONTRADICTED. | |
| """ | |
| def __init__( | |
| self, | |
| nli_model : str = NLI_MODEL, | |
| entail_threshold : float = ENTAIL_THRESHOLD, | |
| contra_threshold : float = CONTRA_THRESHOLD, | |
| ): | |
| from sentence_transformers import CrossEncoder | |
| log.info(f"Loading NLI model: {nli_model}") | |
| self._nli = CrossEncoder(nli_model, max_length=512) | |
| self.entail_threshold = entail_threshold | |
| self.contra_threshold = contra_threshold | |
| log.info("NLI model ready.") | |
| # ββ Sentence splitting ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def split_sentences(self, text: str) -> list[str]: | |
| """ | |
| Split answer text into individual sentences. | |
| Uses NLTK sent_tokenize for reliable sentence boundary detection. | |
| Filters out very short fragments (bullet markers, standalone numbers). | |
| """ | |
| try: | |
| import nltk | |
| try: | |
| sentences = nltk.sent_tokenize(text) | |
| except LookupError: | |
| nltk.download("punkt_tab", quiet=True) | |
| sentences = nltk.sent_tokenize(text) | |
| except ImportError: | |
| # Simple regex fallback if NLTK unavailable | |
| sentences = re.split(r"(?<=[.!?])\s+(?=[A-Z\[\(])", text.strip()) | |
| return [s.strip() for s in sentences if len(s.strip()) >= MIN_SENTENCE_LEN] | |
| # ββ Core verification βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def verify( | |
| self, | |
| answer : str, | |
| chunks : list[dict], | |
| verbose : bool = False, | |
| ) -> dict: | |
| """ | |
| Verify every sentence in `answer` against the retrieved `chunks`. | |
| Args: | |
| answer : LLM-generated answer string | |
| chunks : list of chunk dicts with keys: id, text, metadata, score | |
| verbose : if True, log each sentence verdict | |
| Returns: | |
| { | |
| "grounding_score" : float (0.0β1.0), | |
| "total_sentences" : int, | |
| "supported" : int, | |
| "unverified" : int, | |
| "contradicted" : int, | |
| "sentence_results" : list[dict], | |
| } | |
| """ | |
| # ββ Check for prescribed "not found" phrase ββββββββββββββββββββββββ | |
| NOT_FOUND_PHRASES = [ | |
| "not contain enough information", | |
| "not available in the provided", | |
| "cannot find", | |
| "no information", | |
| ] | |
| if any(p in answer.lower() for p in NOT_FOUND_PHRASES): | |
| log.info("Answer contains explicit 'not found' phrase β trivially grounded.") | |
| return { | |
| "grounding_score" : 1.0, | |
| "total_sentences" : 0, | |
| "supported" : 0, | |
| "unverified" : 0, | |
| "contradicted" : 0, | |
| "sentence_results" : [], | |
| "note" : "LLM correctly reported no relevant context found.", | |
| } | |
| sentences = self.split_sentences(answer) | |
| context_texts = [c["text"] for c in chunks] | |
| if not sentences: | |
| return { | |
| "grounding_score": 0.0, "total_sentences": 0, | |
| "supported": 0, "unverified": 0, "contradicted": 0, | |
| "sentence_results": [], | |
| } | |
| sentence_results = [] | |
| for sent in sentences: | |
| # Pair each sentence against ALL context chunks as premises | |
| pairs = [(ctx, sent) for ctx in context_texts] | |
| scores = self._nli.predict(pairs, apply_softmax=True) | |
| # Find the chunk with the highest entailment score for this sentence | |
| best_idx = max(range(len(scores)), key=lambda i: float(scores[i][ENTAIL_IDX])) | |
| best_scores = scores[best_idx] | |
| entail_prob = float(best_scores[ENTAIL_IDX]) | |
| contra_prob = float(best_scores[CONTRA_IDX]) | |
| neutral_prob = float(best_scores[NEUTRAL_IDX]) | |
| # Classify | |
| if entail_prob >= self.entail_threshold: | |
| verdict = "SUPPORTED" | |
| elif contra_prob >= self.contra_threshold: | |
| verdict = "CONTRADICTED" | |
| else: | |
| verdict = "UNVERIFIED" | |
| result = { | |
| "sentence" : sent, | |
| "verdict" : verdict, | |
| "entail_prob" : round(entail_prob, 3), | |
| "contra_prob" : round(contra_prob, 3), | |
| "neutral_prob" : round(neutral_prob, 3), | |
| "best_chunk_id" : chunks[best_idx]["id"], | |
| "best_chunk_text" : context_texts[best_idx][:120], | |
| } | |
| sentence_results.append(result) | |
| if verbose: | |
| log.info( | |
| f" [{verdict:12s}] entail={entail_prob:.2f} " | |
| f"contra={contra_prob:.2f} | {sent[:70]!r}" | |
| ) | |
| supported = sum(1 for r in sentence_results if r["verdict"] == "SUPPORTED") | |
| unverified = sum(1 for r in sentence_results if r["verdict"] == "UNVERIFIED") | |
| contradicted = sum(1 for r in sentence_results if r["verdict"] == "CONTRADICTED") | |
| grounding = supported / len(sentence_results) | |
| return { | |
| "grounding_score" : round(grounding, 3), | |
| "total_sentences" : len(sentence_results), | |
| "supported" : supported, | |
| "unverified" : unverified, | |
| "contradicted" : contradicted, | |
| "sentence_results" : sentence_results, | |
| } | |
| # ββ Citation verification βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def check_citations( | |
| self, | |
| answer : str, | |
| chunks : list[dict], | |
| ) -> list[dict]: | |
| """ | |
| Verify that [1], [2], ... citations in the answer refer to the right chunk. | |
| For each citation, checks whether the cited chunk actually entails the | |
| sentence containing the citation. | |
| Args: | |
| answer : LLM-generated answer with inline citations like [1], [2] | |
| chunks : list of chunk dicts (same order as context was assembled) | |
| Returns: | |
| list of dicts: | |
| citation : "[1]" | |
| sentence : the sentence containing the citation (first 120 chars) | |
| chunk_id : ID of the cited chunk | |
| entail_score : how well the chunk supports the sentence | |
| status : "CORRECT" | "QUESTIONABLE" | "OUT_OF_RANGE" | "NO_CITATIONS" | |
| """ | |
| sentences = self.split_sentences(answer) | |
| results = [] | |
| for sent in sentences: | |
| cite_nums = re.findall(r"\[(\d+)\]", sent) | |
| for num_str in cite_nums: | |
| idx = int(num_str) - 1 # citations are 1-indexed | |
| if idx < 0 or idx >= len(chunks): | |
| results.append({ | |
| "citation" : f"[{num_str}]", | |
| "sentence" : sent[:120], | |
| "chunk_id" : None, | |
| "entail_score": None, | |
| "status" : "OUT_OF_RANGE", | |
| }) | |
| continue | |
| chunk_text = chunks[idx]["text"] | |
| scores = self._nli.predict( | |
| [(chunk_text, sent)], apply_softmax=True | |
| ) | |
| entail_prob = round(float(scores[0][ENTAIL_IDX]), 3) | |
| results.append({ | |
| "citation" : f"[{num_str}]", | |
| "sentence" : sent[:120], | |
| "chunk_id" : chunks[idx]["id"], | |
| "entail_score": entail_prob, | |
| "status" : "CORRECT" if entail_prob >= 0.35 else "QUESTIONABLE", | |
| }) | |
| if not results: | |
| return [{"status": "NO_CITATIONS", "note": "Answer contains no [n] citations."}] | |
| return results | |
| # ββ Summary report ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def report(self, verification_result: dict, citation_result: list[dict] = None) -> str: | |
| """ | |
| Format a human-readable grounding report. | |
| """ | |
| r = verification_result | |
| score = r["grounding_score"] | |
| total = r["total_sentences"] | |
| if score >= 0.85: | |
| grade = "PASS β well-grounded" | |
| elif score >= 0.60: | |
| grade = "REVIEW β partially grounded" | |
| else: | |
| grade = "FAIL β high hallucination risk" | |
| lines = [ | |
| "=" * 65, | |
| f" Grounding Report", | |
| "=" * 65, | |
| f" Score : {score:.0%} ({r['supported']}/{total} sentences supported)", | |
| f" Verdict : {grade}", | |
| f" Breakdown: {r['supported']} supported | " | |
| f"{r['unverified']} unverified | {r['contradicted']} contradicted", | |
| "-" * 65, | |
| ] | |
| for i, s in enumerate(r["sentence_results"], 1): | |
| icon = {"SUPPORTED": "β", "UNVERIFIED": "?", "CONTRADICTED": "β"}.get( | |
| s["verdict"], " " | |
| ) | |
| lines.append( | |
| f" [{i}] {icon} [{s['verdict']:12s}] " | |
| f"e={s['entail_prob']:.2f} c={s['contra_prob']:.2f}" | |
| ) | |
| lines.append(f" {s['sentence'][:90]}") | |
| lines.append(f" β best match: {s['best_chunk_id']}") | |
| if citation_result: | |
| lines += ["", "-" * 65, " Citation Check"] | |
| for c in citation_result: | |
| if c.get("status") == "NO_CITATIONS": | |
| lines.append(" No inline citations found in answer.") | |
| else: | |
| score_str = f"entail={c['entail_score']:.2f}" if c["entail_score"] else "n/a" | |
| lines.append( | |
| f" {c['citation']} [{c['status']:12s}] {score_str} " | |
| f"chunk={c['chunk_id']}" | |
| ) | |
| lines.append("=" * 65) | |
| return "\n".join(lines) | |