""" L1 graders — run live on every query. Metrics: pii_leakage — regex scan for PII patterns in response token_budget — response within allowed token ceiling answer_relevancy — cosine similarity between query and response embeddings faithfulness — NLI cross-encoder: entailment score per (chunk, claim) pair chain_terminology — deterministic: client-specific terms used (via RosettaStone) """ import logging import re from dataclasses import dataclass, field from typing import Any import numpy as np from config import EMBEDDER_MODEL from rosetta import check_terminology from sentence_transformers import CrossEncoder, SentenceTransformer from sklearn.metrics.pairwise import cosine_similarity log = logging.getLogger(__name__) _embedder: SentenceTransformer | None = None _nli_model: CrossEncoder | None = None # cross-encoder/nli-deberta-v3-small: 3-class NLI, columns = [contradiction, entailment, neutral] NLI_MODEL = "cross-encoder/nli-deberta-v3-small" _NLI_ENTAILMENT_IDX = 1 def get_embedder() -> SentenceTransformer: """Return the shared sentence-transformer instance, loading it on first call.""" global _embedder if _embedder is None: _embedder = SentenceTransformer(EMBEDDER_MODEL) return _embedder def get_nli_model() -> CrossEncoder: """Return the shared NLI cross-encoder, loading it on first call.""" global _nli_model if _nli_model is None: _nli_model = CrossEncoder(NLI_MODEL) return _nli_model @dataclass(slots=True) class GradeResult: metric: str passed: bool score: float detail: str = "" metadata: dict[str, Any] = field(default_factory=dict) @dataclass(slots=True) class GradeReport: client: str query: str results: list[GradeResult] = field(default_factory=list) @property def overall(self) -> bool: return all(r.passed for r in self.results) @property def summary(self) -> dict[str, Any]: return { "overall_pass": self.overall, "metrics": { r.metric: {"passed": r.passed, "score": round(r.score, 3), "detail": r.detail} for r in self.results }, } _SENTENCE_SPLIT = re.compile(r"(?<=[.!?])\s+") _PII_PATTERNS = [ (r"\b\d{3}-\d{2}-\d{4}\b", "SSN"), (r"\b\d{16}\b", "credit card"), (r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b", "email"), (r"\b\d{3}[\s.\-]?\d{3}[\s.\-]?\d{4}\b", "phone"), ] TOKEN_BUDGET = 512 RELEVANCY_THRESHOLD = 0.45 FAITHFULNESS_THRESHOLD = 0.35 _SENTINEL = "NOT IN DOCUMENTS" # Fallback patterns for responses that predate the sentinel instruction or # where the model ignores the sentinel format. _REFUSAL_FALLBACK = re.compile( r"(i (don't|do not|cannot|can't|'m not able to) (have|find|provide|answer)|" r"not enough (information|context)|" r"the (context|provided) (does not|doesn't) (contain|include|mention))", re.IGNORECASE, ) def _is_refusal(response: str) -> bool: if _SENTINEL in response.upper(): lines = response.split("\n") # Only auto-pass when sentinel is on the first line AND nothing substantial # follows — continuation lines may contain hallucinated claims. has_continuation = any(len(ln.split()) >= 3 for ln in lines[1:]) return _SENTINEL in lines[0].upper() and not has_continuation return bool(_REFUSAL_FALLBACK.search(response)) def grade_pii_leakage(response: str) -> GradeResult: """Scan response for PII patterns; fail on any match.""" found = [label for pattern, label in _PII_PATTERNS if re.search(pattern, response)] return GradeResult( metric="pii_leakage", passed=not found, score=0.0 if found else 1.0, detail=f"Detected: {', '.join(found)}" if found else "Clean", ) def grade_token_budget(response: str, budget: int = TOKEN_BUDGET) -> GradeResult: """Fail if estimated token count exceeds budget.""" approx_tokens = len(response) // 4 passed = approx_tokens <= budget return GradeResult( metric="token_budget", passed=passed, score=1.0 if passed else round(budget / approx_tokens, 3), detail=f"~{approx_tokens} tokens (budget: {budget})", metadata={"approx_tokens": approx_tokens, "budget": budget}, ) def grade_answer_relevancy(query: str, response: str) -> GradeResult: """Score semantic similarity between query and response via cosine distance.""" embedder = get_embedder() q_vec = embedder.encode([query]) r_vec = embedder.encode([response]) score = float(cosine_similarity(q_vec, r_vec)[0][0]) return GradeResult( metric="answer_relevancy", passed=score >= RELEVANCY_THRESHOLD, score=score, detail=f"Cosine {score:.3f} (threshold: {RELEVANCY_THRESHOLD})", ) def _strip_chunk_title(chunk: str) -> str: """Remove [Title] prefix added by _build_context before NLI scoring.""" if chunk.startswith("[") and "]\n" in chunk: return chunk.split("]\n", 1)[1].strip() return chunk def decompose_claims(response: str) -> list[str]: """Split response into atomic claim sentences (≥3 words each).""" sentences = _SENTENCE_SPLIT.split(response.strip()) return [s.strip() for s in sentences if len(s.split()) >= 3] def _context_sentences(chunks: list[str]) -> list[str]: """Flatten context chunks into individual sentences for sentence-level NLI scoring. Cross-encoder NLI degrades on multi-sentence inputs — performance is calibrated on single-sentence (premise, hypothesis) pairs matching the SNLI/MNLI training format. """ sentences = [] for chunk in chunks: for s in _SENTENCE_SPLIT.split(chunk.strip()): if len(s.split()) >= 3: sentences.append(s.strip()) return sentences def grade_faithfulness(response: str, context: str) -> GradeResult: """Whole-response faithfulness: max entailment score across all context chunks.""" if _is_refusal(response): return GradeResult( metric="faithfulness", passed=True, score=1.0, detail="Refusal — no factual claims to verify", ) model = get_nli_model() raw_chunks = [c.strip() for c in context.split("\n\n") if c.strip()] if not raw_chunks: return GradeResult(metric="faithfulness", passed=False, score=0.0, detail="No context") chunks = [_strip_chunk_title(c) for c in raw_chunks] sentences = _context_sentences(chunks) pairs = [(s, response) for s in sentences] scores_matrix: np.ndarray = model.predict(pairs, apply_softmax=True) entailment: np.ndarray = scores_matrix[:, _NLI_ENTAILMENT_IDX] log.info("NLI entailment scores: %s", [round(float(s), 3) for s in entailment]) score = float(entailment.max()) return GradeResult( metric="faithfulness", passed=score >= FAITHFULNESS_THRESHOLD, score=score, detail=f"Faithfulness {score:.3f} (threshold: {FAITHFULNESS_THRESHOLD})", ) def grade_faithfulness_decomposed(response: str, context: str) -> GradeResult: """Claim-level faithfulness: each sentence verified independently against context. Supported claims / total claims — catches partial hallucinations missed by whole-response NLI. """ if _is_refusal(response): return GradeResult( metric="faithfulness", passed=True, score=1.0, detail="Refusal — no factual claims to verify", ) raw_chunks = [c.strip() for c in context.split("\n\n") if c.strip()] if not raw_chunks: return GradeResult(metric="faithfulness", passed=False, score=0.0, detail="No context") chunks = [_strip_chunk_title(c) for c in raw_chunks] claims = decompose_claims(response) if not claims: return GradeResult(metric="faithfulness", passed=False, score=0.0, detail="No claims extracted") sentences = _context_sentences(chunks) model = get_nli_model() claim_results: list[dict[str, Any]] = [] for claim in claims: pairs = [(s, claim) for s in sentences] scores_matrix: np.ndarray = model.predict(pairs, apply_softmax=True) entailment: np.ndarray = scores_matrix[:, _NLI_ENTAILMENT_IDX] best = float(entailment.max()) claim_results.append({"claim": claim, "score": round(best, 3), "supported": best >= FAITHFULNESS_THRESHOLD}) supported = sum(1 for c in claim_results if c["supported"]) score = supported / len(claim_results) log.info("Claim decomposition: %d/%d supported (score=%.3f)", supported, len(claim_results), score) return GradeResult( metric="faithfulness", passed=score >= FAITHFULNESS_THRESHOLD, score=score, detail=f"{supported}/{len(claim_results)} claims supported (threshold: {FAITHFULNESS_THRESHOLD})", metadata={"claims": claim_results}, ) def grade_chain_terminology(response: str, client: str) -> GradeResult: """Check that the response uses client-specific terms, not rival terminology.""" result = check_terminology(response, client) violations = result["violations"] checked = result["checked"] score = 1.0 - (len(violations) / checked) if checked else 1.0 detail = ( f"{len(violations)} violation(s): " + ", ".join(f"{v['found']!r} → should be {v['expected']!r}" for v in violations) if violations else f"All {checked} terms correct" ) return GradeResult( metric="chain_terminology", passed=result["pass"], score=score, detail=detail, metadata={"violations": violations}, ) def grade( query: str, response: str, context: str, client: str, token_budget: int = TOKEN_BUDGET, ) -> GradeReport: """Run all L1 graders and return a consolidated report.""" report = GradeReport(client=client, query=query) report.results = [ grade_pii_leakage(response), grade_token_budget(response, token_budget), grade_answer_relevancy(query, response), grade_faithfulness_decomposed(response, context), grade_chain_terminology(response, client), ] return report