| """ |
| L1 graders β run live on every query. |
| |
| Metrics: |
| pii_leakage β regex scan for PII patterns in response |
| token_budget β response within allowed token ceiling |
| answer_relevancy β cosine similarity between query and response embeddings |
| faithfulness β NLI cross-encoder: entailment score per (chunk, claim) pair |
| chain_terminology β deterministic: client-specific terms used (via RosettaStone) |
| """ |
|
|
| import logging |
| import re |
| from dataclasses import dataclass, field |
| from typing import Any |
|
|
| import numpy as np |
| from config import EMBEDDER_MODEL |
| from rosetta import check_terminology |
| from sentence_transformers import CrossEncoder, SentenceTransformer |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| log = logging.getLogger(__name__) |
|
|
| _embedder: SentenceTransformer | None = None |
| _nli_model: CrossEncoder | None = None |
|
|
| |
| NLI_MODEL = "cross-encoder/nli-deberta-v3-small" |
| _NLI_ENTAILMENT_IDX = 1 |
|
|
|
|
| def get_embedder() -> SentenceTransformer: |
| """Return the shared sentence-transformer instance, loading it on first call.""" |
| global _embedder |
| if _embedder is None: |
| _embedder = SentenceTransformer(EMBEDDER_MODEL) |
| return _embedder |
|
|
|
|
| def get_nli_model() -> CrossEncoder: |
| """Return the shared NLI cross-encoder, loading it on first call.""" |
| global _nli_model |
| if _nli_model is None: |
| _nli_model = CrossEncoder(NLI_MODEL) |
| return _nli_model |
|
|
|
|
| @dataclass(slots=True) |
| class GradeResult: |
| metric: str |
| passed: bool |
| score: float |
| detail: str = "" |
| metadata: dict[str, Any] = field(default_factory=dict) |
|
|
|
|
| @dataclass(slots=True) |
| class GradeReport: |
| client: str |
| query: str |
| results: list[GradeResult] = field(default_factory=list) |
|
|
| @property |
| def overall(self) -> bool: |
| return all(r.passed for r in self.results) |
|
|
| @property |
| def summary(self) -> dict[str, Any]: |
| return { |
| "overall_pass": self.overall, |
| "metrics": { |
| r.metric: {"passed": r.passed, "score": round(r.score, 3), "detail": r.detail} |
| for r in self.results |
| }, |
| } |
|
|
|
|
| _SENTENCE_SPLIT = re.compile(r"(?<=[.!?])\s+") |
|
|
| _PII_PATTERNS = [ |
| (r"\b\d{3}-\d{2}-\d{4}\b", "SSN"), |
| (r"\b\d{16}\b", "credit card"), |
| (r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b", "email"), |
| (r"\b\d{3}[\s.\-]?\d{3}[\s.\-]?\d{4}\b", "phone"), |
| ] |
|
|
| TOKEN_BUDGET = 512 |
| RELEVANCY_THRESHOLD = 0.45 |
| FAITHFULNESS_THRESHOLD = 0.35 |
|
|
| _SENTINEL = "NOT IN DOCUMENTS" |
|
|
| |
| |
| _REFUSAL_FALLBACK = re.compile( |
| r"(i (don't|do not|cannot|can't|'m not able to) (have|find|provide|answer)|" |
| r"not enough (information|context)|" |
| r"the (context|provided) (does not|doesn't) (contain|include|mention))", |
| re.IGNORECASE, |
| ) |
|
|
|
|
| def _is_refusal(response: str) -> bool: |
| if _SENTINEL in response.upper(): |
| lines = response.split("\n") |
| |
| |
| has_continuation = any(len(ln.split()) >= 3 for ln in lines[1:]) |
| return _SENTINEL in lines[0].upper() and not has_continuation |
| return bool(_REFUSAL_FALLBACK.search(response)) |
|
|
|
|
| def grade_pii_leakage(response: str) -> GradeResult: |
| """Scan response for PII patterns; fail on any match.""" |
| found = [label for pattern, label in _PII_PATTERNS if re.search(pattern, response)] |
| return GradeResult( |
| metric="pii_leakage", |
| passed=not found, |
| score=0.0 if found else 1.0, |
| detail=f"Detected: {', '.join(found)}" if found else "Clean", |
| ) |
|
|
|
|
| def grade_token_budget(response: str, budget: int = TOKEN_BUDGET) -> GradeResult: |
| """Fail if estimated token count exceeds budget.""" |
| approx_tokens = len(response) // 4 |
| passed = approx_tokens <= budget |
| return GradeResult( |
| metric="token_budget", |
| passed=passed, |
| score=1.0 if passed else round(budget / approx_tokens, 3), |
| detail=f"~{approx_tokens} tokens (budget: {budget})", |
| metadata={"approx_tokens": approx_tokens, "budget": budget}, |
| ) |
|
|
|
|
| def grade_answer_relevancy(query: str, response: str) -> GradeResult: |
| """Score semantic similarity between query and response via cosine distance.""" |
| embedder = get_embedder() |
| q_vec = embedder.encode([query]) |
| r_vec = embedder.encode([response]) |
| score = float(cosine_similarity(q_vec, r_vec)[0][0]) |
| return GradeResult( |
| metric="answer_relevancy", |
| passed=score >= RELEVANCY_THRESHOLD, |
| score=score, |
| detail=f"Cosine {score:.3f} (threshold: {RELEVANCY_THRESHOLD})", |
| ) |
|
|
|
|
| def _strip_chunk_title(chunk: str) -> str: |
| """Remove [Title] prefix added by _build_context before NLI scoring.""" |
| if chunk.startswith("[") and "]\n" in chunk: |
| return chunk.split("]\n", 1)[1].strip() |
| return chunk |
|
|
|
|
| def decompose_claims(response: str) -> list[str]: |
| """Split response into atomic claim sentences (β₯3 words each).""" |
| sentences = _SENTENCE_SPLIT.split(response.strip()) |
| return [s.strip() for s in sentences if len(s.split()) >= 3] |
|
|
|
|
| def _context_sentences(chunks: list[str]) -> list[str]: |
| """Flatten context chunks into individual sentences for sentence-level NLI scoring. |
| |
| Cross-encoder NLI degrades on multi-sentence inputs β performance is calibrated |
| on single-sentence (premise, hypothesis) pairs matching the SNLI/MNLI training format. |
| """ |
| sentences = [] |
| for chunk in chunks: |
| for s in _SENTENCE_SPLIT.split(chunk.strip()): |
| if len(s.split()) >= 3: |
| sentences.append(s.strip()) |
| return sentences |
|
|
|
|
| def grade_faithfulness(response: str, context: str) -> GradeResult: |
| """Whole-response faithfulness: max entailment score across all context chunks.""" |
| if _is_refusal(response): |
| return GradeResult( |
| metric="faithfulness", passed=True, score=1.0, |
| detail="Refusal β no factual claims to verify", |
| ) |
| model = get_nli_model() |
| raw_chunks = [c.strip() for c in context.split("\n\n") if c.strip()] |
| if not raw_chunks: |
| return GradeResult(metric="faithfulness", passed=False, score=0.0, detail="No context") |
| chunks = [_strip_chunk_title(c) for c in raw_chunks] |
| sentences = _context_sentences(chunks) |
| pairs = [(s, response) for s in sentences] |
| scores_matrix: np.ndarray = model.predict(pairs, apply_softmax=True) |
| entailment: np.ndarray = scores_matrix[:, _NLI_ENTAILMENT_IDX] |
| log.info("NLI entailment scores: %s", [round(float(s), 3) for s in entailment]) |
| score = float(entailment.max()) |
| return GradeResult( |
| metric="faithfulness", |
| passed=score >= FAITHFULNESS_THRESHOLD, |
| score=score, |
| detail=f"Faithfulness {score:.3f} (threshold: {FAITHFULNESS_THRESHOLD})", |
| ) |
|
|
|
|
| def grade_faithfulness_decomposed(response: str, context: str) -> GradeResult: |
| """Claim-level faithfulness: each sentence verified independently against context. |
| |
| Supported claims / total claims β catches partial hallucinations missed by whole-response NLI. |
| """ |
| if _is_refusal(response): |
| return GradeResult( |
| metric="faithfulness", passed=True, score=1.0, |
| detail="Refusal β no factual claims to verify", |
| ) |
| raw_chunks = [c.strip() for c in context.split("\n\n") if c.strip()] |
| if not raw_chunks: |
| return GradeResult(metric="faithfulness", passed=False, score=0.0, detail="No context") |
|
|
| chunks = [_strip_chunk_title(c) for c in raw_chunks] |
| claims = decompose_claims(response) |
| if not claims: |
| return GradeResult(metric="faithfulness", passed=False, score=0.0, detail="No claims extracted") |
|
|
| sentences = _context_sentences(chunks) |
| model = get_nli_model() |
| claim_results: list[dict[str, Any]] = [] |
|
|
| for claim in claims: |
| pairs = [(s, claim) for s in sentences] |
| scores_matrix: np.ndarray = model.predict(pairs, apply_softmax=True) |
| entailment: np.ndarray = scores_matrix[:, _NLI_ENTAILMENT_IDX] |
| best = float(entailment.max()) |
| claim_results.append({"claim": claim, "score": round(best, 3), "supported": best >= FAITHFULNESS_THRESHOLD}) |
|
|
| supported = sum(1 for c in claim_results if c["supported"]) |
| score = supported / len(claim_results) |
| log.info("Claim decomposition: %d/%d supported (score=%.3f)", supported, len(claim_results), score) |
|
|
| return GradeResult( |
| metric="faithfulness", |
| passed=score >= FAITHFULNESS_THRESHOLD, |
| score=score, |
| detail=f"{supported}/{len(claim_results)} claims supported (threshold: {FAITHFULNESS_THRESHOLD})", |
| metadata={"claims": claim_results}, |
| ) |
|
|
|
|
| def grade_chain_terminology(response: str, client: str) -> GradeResult: |
| """Check that the response uses client-specific terms, not rival terminology.""" |
| result = check_terminology(response, client) |
| violations = result["violations"] |
| checked = result["checked"] |
| score = 1.0 - (len(violations) / checked) if checked else 1.0 |
| detail = ( |
| f"{len(violations)} violation(s): " + |
| ", ".join(f"{v['found']!r} β should be {v['expected']!r}" for v in violations) |
| if violations else f"All {checked} terms correct" |
| ) |
| return GradeResult( |
| metric="chain_terminology", |
| passed=result["pass"], |
| score=score, |
| detail=detail, |
| metadata={"violations": violations}, |
| ) |
|
|
|
|
| def grade( |
| query: str, |
| response: str, |
| context: str, |
| client: str, |
| token_budget: int = TOKEN_BUDGET, |
| ) -> GradeReport: |
| """Run all L1 graders and return a consolidated report.""" |
| report = GradeReport(client=client, query=query) |
| report.results = [ |
| grade_pii_leakage(response), |
| grade_token_budget(response, token_budget), |
| grade_answer_relevancy(query, response), |
| grade_faithfulness_decomposed(response, context), |
| grade_chain_terminology(response, client), |
| ] |
| return report |
|
|