clienttarget-python / src /profiling /python-service /hallucination_guard.py
iDevBuddy
feat: Phase 1 β€” AI Client Acquisition System
bd28470
raw
history blame
5.48 kB
"""
Hallucination Guard v2 β€” Grounded Verification
Old approach: "Ask LLM for confidence" β†’ LLM grades own exam β†’ useless
New approach: Cross-reference every claim against evidence β†’ real verification
Every LLM output field is checked:
- Employee count β†’ matches scraped data?
- Industry β†’ matches detected industry?
- AI readiness "high" β†’ do we actually have AI job postings?
- PII in output β†’ strip immediately
"""
import re
import logging
logger = logging.getLogger(__name__)
def validate_profile_grounded(profile: dict, evidence: dict) -> dict:
"""
Cross-check profile output against evidence.
Returns grounding result with corrections.
"""
verified = []
unverified = []
corrections = {}
# ── Employee count ────────────────────────────────────────
summary = str(profile.get("profile_summary", ""))
known_emp = evidence.get("employee_count")
emp_match = re.search(r'(\d[\d,]+)\s*(employees?|people|staff)', summary, re.I)
if emp_match and known_emp:
claimed = int(emp_match.group(1).replace(",", ""))
if abs(claimed - known_emp) > known_emp * 0.3:
corrections["employee_count"] = {"claimed": claimed, "actual": known_emp}
verified.append("employee_count_corrected")
else:
verified.append("employee_count_accurate")
# ── AI readiness vs actual signals ────────────────────────
claimed_readiness = profile.get("ai_readiness", "")
ai_jobs = evidence.get("ai_job_count", 0)
tech_stack = evidence.get("tech_stack", [])
if claimed_readiness == "high" and ai_jobs == 0 and len(tech_stack) == 0:
corrections["ai_readiness"] = {"claimed": "high", "actual": "low"}
verified.append("ai_readiness_corrected")
elif claimed_readiness == "low" and ai_jobs >= 3:
corrections["ai_readiness"] = {"claimed": "low", "actual": "high"}
verified.append("ai_readiness_corrected")
else:
verified.append("ai_readiness_plausible")
# ── Company name in summary ───────────────────────────────
known_name = evidence.get("name", "")
if known_name and len(known_name) > 3:
name_words = known_name.lower().split()
summary_lower = summary.lower()
if any(w in summary_lower for w in name_words if len(w) > 2):
verified.append("company_name_present")
else:
unverified.append("company_name_may_differ")
# ── Evidence claims ───────────────────────────────────────
evidence_used = profile.get("evidence_used", [])
if isinstance(evidence_used, list):
all_evidence_text = " ".join([
str(evidence.get("website_text", "")),
" ".join(evidence.get("tech_stack", [])),
" ".join(evidence.get("pain_signals", [])),
str(evidence.get("description", "")),
]).lower()
for claim in evidence_used:
claim_words = str(claim).lower().split()[:4]
if any(w in all_evidence_text for w in claim_words if len(w) > 3):
verified.append(f"evidence_grounded: {str(claim)[:30]}")
else:
unverified.append(f"evidence_unverifiable: {str(claim)[:30]}")
# ── PII check ─────────────────────────────────────────────
output_str = str(profile)
email_found = re.search(r'[\w.+-]+@[\w-]+\.[a-z]{2,}', output_str)
phone_found = re.search(r'\+?\d[\d\s\-().]{8,}', output_str)
if email_found:
unverified.append("pii_email_in_output")
if phone_found:
unverified.append("pii_phone_in_output")
# ── Grounding score ───────────────────────────────────────
total = len(verified) + len(unverified)
grounding_score = len(verified) / total if total > 0 else 0.5
result = {
"is_grounded": grounding_score >= 0.6,
"grounding_score": round(grounding_score, 2),
"verified_claims": verified,
"unverified_claims": unverified,
"corrections": corrections,
}
if not result["is_grounded"]:
logger.warning(f"Profile failed grounding: score={grounding_score:.2f}, corrections={len(corrections)}")
return result
def validate_score_grounded(score: dict, profile: dict) -> dict:
"""Validate scoring output for consistency."""
issues = []
total = score.get("total_score", -1)
if not (0 <= total <= 100):
issues.append(f"invalid_total_score:{total}")
tier = score.get("tier")
if tier not in ("hot", "warm", "nurture", "archive"):
issues.append(f"invalid_tier:{tier}")
# Cross-check tier vs score
expected_tier = (
"hot" if total >= 85 else
"warm" if total >= 70 else
"nurture" if total >= 50 else
"archive"
)
if tier != expected_tier:
issues.append(f"tier_score_mismatch: score={total} tier={tier} expected={expected_tier}")
score["tier"] = expected_tier # auto-correct
return {
"is_valid": len(issues) == 0,
"issues": issues,
}