mbochniak01
Fix sentinel edge cases: hallucination combo guard + UI formatting
8d335e4
"""
L1 graders β€” run live on every query.
Metrics:
pii_leakage β€” regex scan for PII patterns in response
token_budget β€” response within allowed token ceiling
answer_relevancy β€” cosine similarity between query and response embeddings
faithfulness β€” NLI cross-encoder: entailment score per (chunk, claim) pair
chain_terminology β€” deterministic: client-specific terms used (via RosettaStone)
"""
import logging
import re
from dataclasses import dataclass, field
from typing import Any
import numpy as np
from config import EMBEDDER_MODEL
from rosetta import check_terminology
from sentence_transformers import CrossEncoder, SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
log = logging.getLogger(__name__)
_embedder: SentenceTransformer | None = None
_nli_model: CrossEncoder | None = None
# cross-encoder/nli-deberta-v3-small: 3-class NLI, columns = [contradiction, entailment, neutral]
NLI_MODEL = "cross-encoder/nli-deberta-v3-small"
_NLI_ENTAILMENT_IDX = 1
def get_embedder() -> SentenceTransformer:
"""Return the shared sentence-transformer instance, loading it on first call."""
global _embedder
if _embedder is None:
_embedder = SentenceTransformer(EMBEDDER_MODEL)
return _embedder
def get_nli_model() -> CrossEncoder:
"""Return the shared NLI cross-encoder, loading it on first call."""
global _nli_model
if _nli_model is None:
_nli_model = CrossEncoder(NLI_MODEL)
return _nli_model
@dataclass(slots=True)
class GradeResult:
metric: str
passed: bool
score: float
detail: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass(slots=True)
class GradeReport:
client: str
query: str
results: list[GradeResult] = field(default_factory=list)
@property
def overall(self) -> bool:
return all(r.passed for r in self.results)
@property
def summary(self) -> dict[str, Any]:
return {
"overall_pass": self.overall,
"metrics": {
r.metric: {"passed": r.passed, "score": round(r.score, 3), "detail": r.detail}
for r in self.results
},
}
_SENTENCE_SPLIT = re.compile(r"(?<=[.!?])\s+")
_PII_PATTERNS = [
(r"\b\d{3}-\d{2}-\d{4}\b", "SSN"),
(r"\b\d{16}\b", "credit card"),
(r"\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b", "email"),
(r"\b\d{3}[\s.\-]?\d{3}[\s.\-]?\d{4}\b", "phone"),
]
TOKEN_BUDGET = 512
RELEVANCY_THRESHOLD = 0.45
FAITHFULNESS_THRESHOLD = 0.35
_SENTINEL = "NOT IN DOCUMENTS"
# Fallback patterns for responses that predate the sentinel instruction or
# where the model ignores the sentinel format.
_REFUSAL_FALLBACK = re.compile(
r"(i (don't|do not|cannot|can't|'m not able to) (have|find|provide|answer)|"
r"not enough (information|context)|"
r"the (context|provided) (does not|doesn't) (contain|include|mention))",
re.IGNORECASE,
)
def _is_refusal(response: str) -> bool:
if _SENTINEL in response.upper():
lines = response.split("\n")
# Only auto-pass when sentinel is on the first line AND nothing substantial
# follows β€” continuation lines may contain hallucinated claims.
has_continuation = any(len(ln.split()) >= 3 for ln in lines[1:])
return _SENTINEL in lines[0].upper() and not has_continuation
return bool(_REFUSAL_FALLBACK.search(response))
def grade_pii_leakage(response: str) -> GradeResult:
"""Scan response for PII patterns; fail on any match."""
found = [label for pattern, label in _PII_PATTERNS if re.search(pattern, response)]
return GradeResult(
metric="pii_leakage",
passed=not found,
score=0.0 if found else 1.0,
detail=f"Detected: {', '.join(found)}" if found else "Clean",
)
def grade_token_budget(response: str, budget: int = TOKEN_BUDGET) -> GradeResult:
"""Fail if estimated token count exceeds budget."""
approx_tokens = len(response) // 4
passed = approx_tokens <= budget
return GradeResult(
metric="token_budget",
passed=passed,
score=1.0 if passed else round(budget / approx_tokens, 3),
detail=f"~{approx_tokens} tokens (budget: {budget})",
metadata={"approx_tokens": approx_tokens, "budget": budget},
)
def grade_answer_relevancy(query: str, response: str) -> GradeResult:
"""Score semantic similarity between query and response via cosine distance."""
embedder = get_embedder()
q_vec = embedder.encode([query])
r_vec = embedder.encode([response])
score = float(cosine_similarity(q_vec, r_vec)[0][0])
return GradeResult(
metric="answer_relevancy",
passed=score >= RELEVANCY_THRESHOLD,
score=score,
detail=f"Cosine {score:.3f} (threshold: {RELEVANCY_THRESHOLD})",
)
def _strip_chunk_title(chunk: str) -> str:
"""Remove [Title] prefix added by _build_context before NLI scoring."""
if chunk.startswith("[") and "]\n" in chunk:
return chunk.split("]\n", 1)[1].strip()
return chunk
def decompose_claims(response: str) -> list[str]:
"""Split response into atomic claim sentences (β‰₯3 words each)."""
sentences = _SENTENCE_SPLIT.split(response.strip())
return [s.strip() for s in sentences if len(s.split()) >= 3]
def _context_sentences(chunks: list[str]) -> list[str]:
"""Flatten context chunks into individual sentences for sentence-level NLI scoring.
Cross-encoder NLI degrades on multi-sentence inputs β€” performance is calibrated
on single-sentence (premise, hypothesis) pairs matching the SNLI/MNLI training format.
"""
sentences = []
for chunk in chunks:
for s in _SENTENCE_SPLIT.split(chunk.strip()):
if len(s.split()) >= 3:
sentences.append(s.strip())
return sentences
def grade_faithfulness(response: str, context: str) -> GradeResult:
"""Whole-response faithfulness: max entailment score across all context chunks."""
if _is_refusal(response):
return GradeResult(
metric="faithfulness", passed=True, score=1.0,
detail="Refusal β€” no factual claims to verify",
)
model = get_nli_model()
raw_chunks = [c.strip() for c in context.split("\n\n") if c.strip()]
if not raw_chunks:
return GradeResult(metric="faithfulness", passed=False, score=0.0, detail="No context")
chunks = [_strip_chunk_title(c) for c in raw_chunks]
sentences = _context_sentences(chunks)
pairs = [(s, response) for s in sentences]
scores_matrix: np.ndarray = model.predict(pairs, apply_softmax=True)
entailment: np.ndarray = scores_matrix[:, _NLI_ENTAILMENT_IDX]
log.info("NLI entailment scores: %s", [round(float(s), 3) for s in entailment])
score = float(entailment.max())
return GradeResult(
metric="faithfulness",
passed=score >= FAITHFULNESS_THRESHOLD,
score=score,
detail=f"Faithfulness {score:.3f} (threshold: {FAITHFULNESS_THRESHOLD})",
)
def grade_faithfulness_decomposed(response: str, context: str) -> GradeResult:
"""Claim-level faithfulness: each sentence verified independently against context.
Supported claims / total claims β€” catches partial hallucinations missed by whole-response NLI.
"""
if _is_refusal(response):
return GradeResult(
metric="faithfulness", passed=True, score=1.0,
detail="Refusal β€” no factual claims to verify",
)
raw_chunks = [c.strip() for c in context.split("\n\n") if c.strip()]
if not raw_chunks:
return GradeResult(metric="faithfulness", passed=False, score=0.0, detail="No context")
chunks = [_strip_chunk_title(c) for c in raw_chunks]
claims = decompose_claims(response)
if not claims:
return GradeResult(metric="faithfulness", passed=False, score=0.0, detail="No claims extracted")
sentences = _context_sentences(chunks)
model = get_nli_model()
claim_results: list[dict[str, Any]] = []
for claim in claims:
pairs = [(s, claim) for s in sentences]
scores_matrix: np.ndarray = model.predict(pairs, apply_softmax=True)
entailment: np.ndarray = scores_matrix[:, _NLI_ENTAILMENT_IDX]
best = float(entailment.max())
claim_results.append({"claim": claim, "score": round(best, 3), "supported": best >= FAITHFULNESS_THRESHOLD})
supported = sum(1 for c in claim_results if c["supported"])
score = supported / len(claim_results)
log.info("Claim decomposition: %d/%d supported (score=%.3f)", supported, len(claim_results), score)
return GradeResult(
metric="faithfulness",
passed=score >= FAITHFULNESS_THRESHOLD,
score=score,
detail=f"{supported}/{len(claim_results)} claims supported (threshold: {FAITHFULNESS_THRESHOLD})",
metadata={"claims": claim_results},
)
def grade_chain_terminology(response: str, client: str) -> GradeResult:
"""Check that the response uses client-specific terms, not rival terminology."""
result = check_terminology(response, client)
violations = result["violations"]
checked = result["checked"]
score = 1.0 - (len(violations) / checked) if checked else 1.0
detail = (
f"{len(violations)} violation(s): " +
", ".join(f"{v['found']!r} β†’ should be {v['expected']!r}" for v in violations)
if violations else f"All {checked} terms correct"
)
return GradeResult(
metric="chain_terminology",
passed=result["pass"],
score=score,
detail=detail,
metadata={"violations": violations},
)
def grade(
query: str,
response: str,
context: str,
client: str,
token_budget: int = TOKEN_BUDGET,
) -> GradeReport:
"""Run all L1 graders and return a consolidated report."""
report = GradeReport(client=client, query=query)
report.results = [
grade_pii_leakage(response),
grade_token_budget(response, token_budget),
grade_answer_relevancy(query, response),
grade_faithfulness_decomposed(response, context),
grade_chain_terminology(response, client),
]
return report