| | """ |
| | Critic Agent |
| | |
| | Validates generated answers for hallucination and factual accuracy. |
| | Follows FAANG best practices for production RAG systems. |
| | |
| | Key Features: |
| | - Hallucination detection |
| | - Citation verification |
| | - Factual consistency checking |
| | - Confidence scoring |
| | - Actionable feedback for self-correction |
| | """ |
| |
|
| | from typing import List, Optional, Dict, Any, Tuple |
| | from pydantic import BaseModel, Field |
| | from loguru import logger |
| | from enum import Enum |
| | import json |
| | import re |
| |
|
| | try: |
| | import httpx |
| | HTTPX_AVAILABLE = True |
| | except ImportError: |
| | HTTPX_AVAILABLE = False |
| |
|
| | from .synthesizer import SynthesisResult, Citation |
| | from .reranker import RankedResult |
| |
|
| |
|
| | class IssueType(str, Enum): |
| | """Types of validation issues.""" |
| | HALLUCINATION = "hallucination" |
| | UNSUPPORTED_CLAIM = "unsupported_claim" |
| | INCORRECT_CITATION = "incorrect_citation" |
| | CONTRADICTION = "contradiction" |
| | INCOMPLETE = "incomplete" |
| | FACTUAL_ERROR = "factual_error" |
| |
|
| |
|
| | class ValidationIssue(BaseModel): |
| | """A single validation issue found.""" |
| | issue_type: IssueType |
| | severity: float = Field(ge=0.0, le=1.0) |
| | description: str |
| | problematic_text: Optional[str] = None |
| | suggestion: Optional[str] = None |
| | citation_index: Optional[int] = None |
| |
|
| |
|
| | class CriticResult(BaseModel): |
| | """Result of answer validation.""" |
| | is_valid: bool |
| | confidence: float |
| | issues: List[ValidationIssue] |
| |
|
| | |
| | hallucination_score: float = Field(ge=0.0, le=1.0) |
| | citation_accuracy: float = Field(ge=0.0, le=1.0) |
| | factual_consistency: float = Field(ge=0.0, le=1.0) |
| |
|
| | |
| | needs_revision: bool = False |
| | revision_suggestions: List[str] = Field(default_factory=list) |
| |
|
| |
|
| | class CriticConfig(BaseModel): |
| | """Configuration for critic agent.""" |
| | |
| | model: str = Field(default="llama3.2:3b") |
| | base_url: str = Field(default="http://localhost:11434") |
| | temperature: float = Field(default=0.1) |
| |
|
| | |
| | hallucination_threshold: float = Field(default=0.3) |
| | citation_accuracy_threshold: float = Field(default=0.7) |
| | overall_confidence_threshold: float = Field(default=0.6) |
| |
|
| | |
| | check_hallucination: bool = Field(default=True) |
| | check_citations: bool = Field(default=True) |
| | check_consistency: bool = Field(default=True) |
| |
|
| |
|
| | class CriticAgent: |
| | """ |
| | Validates generated answers for quality and accuracy. |
| | |
| | Capabilities: |
| | 1. Hallucination detection |
| | 2. Citation verification |
| | 3. Factual consistency checking |
| | 4. Actionable revision suggestions |
| | """ |
| |
|
| | HALLUCINATION_PROMPT = """Analyze this answer for hallucination - information NOT supported by the provided sources. |
| | |
| | SOURCES: |
| | {sources} |
| | |
| | ANSWER: |
| | {answer} |
| | |
| | For each claim in the answer, determine if it is: |
| | 1. SUPPORTED - Directly supported by the sources |
| | 2. PARTIALLY_SUPPORTED - Somewhat supported but with additions |
| | 3. UNSUPPORTED - Not found in sources (hallucination) |
| | |
| | Respond with JSON: |
| | {{ |
| | "claims": [ |
| | {{"text": "claim text", "status": "SUPPORTED|PARTIALLY_SUPPORTED|UNSUPPORTED", "source_index": 1 or null}} |
| | ], |
| | "hallucination_score": 0.0-1.0, |
| | "issues": ["list of specific issues found"] |
| | }}""" |
| |
|
| | CITATION_PROMPT = """Verify that each citation in this answer correctly references the source material. |
| | |
| | SOURCES: |
| | {sources} |
| | |
| | ANSWER WITH CITATIONS: |
| | {answer} |
| | |
| | For each citation [N], check if the claim it supports is actually in source N. |
| | |
| | Respond with JSON: |
| | {{ |
| | "citation_checks": [ |
| | {{"citation_index": 1, "is_accurate": true/false, "reason": "explanation"}} |
| | ], |
| | "overall_accuracy": 0.0-1.0 |
| | }}""" |
| |
|
| | def __init__(self, config: Optional[CriticConfig] = None): |
| | """ |
| | Initialize Critic Agent. |
| | |
| | Args: |
| | config: Critic configuration |
| | """ |
| | self.config = config or CriticConfig() |
| | logger.info(f"CriticAgent initialized (model={self.config.model})") |
| |
|
| | def validate( |
| | self, |
| | synthesis_result: SynthesisResult, |
| | sources: List[RankedResult], |
| | ) -> CriticResult: |
| | """ |
| | Validate a synthesized answer. |
| | |
| | Args: |
| | synthesis_result: The generated answer with citations |
| | sources: Source chunks used for generation |
| | |
| | Returns: |
| | CriticResult with validation details |
| | """ |
| | issues = [] |
| | hallucination_score = 0.0 |
| | citation_accuracy = 1.0 |
| | factual_consistency = 1.0 |
| |
|
| | |
| | if synthesis_result.abstained: |
| | return CriticResult( |
| | is_valid=True, |
| | confidence=1.0, |
| | issues=[], |
| | hallucination_score=0.0, |
| | citation_accuracy=1.0, |
| | factual_consistency=1.0, |
| | ) |
| |
|
| | |
| | if self.config.check_hallucination and HTTPX_AVAILABLE: |
| | h_score, h_issues = self._check_hallucination( |
| | synthesis_result.answer, |
| | sources, |
| | ) |
| | hallucination_score = h_score |
| | issues.extend(h_issues) |
| |
|
| | |
| | if self.config.check_citations and synthesis_result.citations: |
| | c_accuracy, c_issues = self._check_citations( |
| | synthesis_result.answer, |
| | synthesis_result.citations, |
| | sources, |
| | ) |
| | citation_accuracy = c_accuracy |
| | issues.extend(c_issues) |
| |
|
| | |
| | if self.config.check_consistency: |
| | f_score, f_issues = self._check_consistency( |
| | synthesis_result.answer, |
| | sources, |
| | ) |
| | factual_consistency = f_score |
| | issues.extend(f_issues) |
| |
|
| | |
| | confidence = ( |
| | 0.4 * (1 - hallucination_score) + |
| | 0.4 * citation_accuracy + |
| | 0.2 * factual_consistency |
| | ) |
| |
|
| | |
| | is_valid = ( |
| | hallucination_score < self.config.hallucination_threshold and |
| | citation_accuracy >= self.config.citation_accuracy_threshold and |
| | confidence >= self.config.overall_confidence_threshold |
| | ) |
| |
|
| | |
| | needs_revision = not is_valid and len(issues) > 0 |
| | revision_suggestions = self._generate_revision_suggestions(issues) if needs_revision else [] |
| |
|
| | return CriticResult( |
| | is_valid=is_valid, |
| | confidence=confidence, |
| | issues=issues, |
| | hallucination_score=hallucination_score, |
| | citation_accuracy=citation_accuracy, |
| | factual_consistency=factual_consistency, |
| | needs_revision=needs_revision, |
| | revision_suggestions=revision_suggestions, |
| | ) |
| |
|
| | def _check_hallucination( |
| | self, |
| | answer: str, |
| | sources: List[RankedResult], |
| | ) -> Tuple[float, List[ValidationIssue]]: |
| | """Check for hallucination using LLM.""" |
| | |
| | source_text = self._format_sources(sources) |
| |
|
| | prompt = self.HALLUCINATION_PROMPT.format( |
| | sources=source_text, |
| | answer=answer, |
| | ) |
| |
|
| | try: |
| | with httpx.Client(timeout=30.0) as client: |
| | response = client.post( |
| | f"{self.config.base_url}/api/generate", |
| | json={ |
| | "model": self.config.model, |
| | "prompt": prompt, |
| | "stream": False, |
| | "options": { |
| | "temperature": self.config.temperature, |
| | "num_predict": 1024, |
| | }, |
| | }, |
| | ) |
| | response.raise_for_status() |
| | result = response.json() |
| |
|
| | |
| | response_text = result.get("response", "") |
| | data = self._parse_json_response(response_text) |
| |
|
| | hallucination_score = data.get("hallucination_score", 0.0) |
| |
|
| | issues = [] |
| | for claim in data.get("claims", []): |
| | if claim.get("status") == "UNSUPPORTED": |
| | issues.append(ValidationIssue( |
| | issue_type=IssueType.HALLUCINATION, |
| | severity=0.8, |
| | description=f"Unsupported claim: {claim.get('text', '')}", |
| | problematic_text=claim.get("text"), |
| | suggestion="Remove or find supporting source", |
| | )) |
| | elif claim.get("status") == "PARTIALLY_SUPPORTED": |
| | issues.append(ValidationIssue( |
| | issue_type=IssueType.UNSUPPORTED_CLAIM, |
| | severity=0.4, |
| | description=f"Partially supported: {claim.get('text', '')}", |
| | problematic_text=claim.get("text"), |
| | suggestion="Verify claim against source", |
| | )) |
| |
|
| | return hallucination_score, issues |
| |
|
| | except Exception as e: |
| | logger.warning(f"Hallucination check failed: {e}") |
| | |
| | return self._heuristic_hallucination_check(answer, sources) |
| |
|
| | def _heuristic_hallucination_check( |
| | self, |
| | answer: str, |
| | sources: List[RankedResult], |
| | ) -> Tuple[float, List[ValidationIssue]]: |
| | """Simple heuristic hallucination check.""" |
| | |
| | source_text = " ".join(s.text.lower() for s in sources) |
| | answer_lower = answer.lower() |
| |
|
| | |
| | |
| | answer_words = set(re.findall(r'\b[A-Z][a-z]+\b', answer)) |
| | source_words = set(re.findall(r'\b[A-Z][a-z]+\b', " ".join(s.text for s in sources))) |
| |
|
| | unsupported_entities = answer_words - source_words |
| | |
| | common_words = {"The", "This", "That", "However", "Therefore", "Additionally", "Based", "According"} |
| | unsupported_entities = unsupported_entities - common_words |
| |
|
| | issues = [] |
| | for entity in list(unsupported_entities)[:3]: |
| | issues.append(ValidationIssue( |
| | issue_type=IssueType.HALLUCINATION, |
| | severity=0.5, |
| | description=f"Entity '{entity}' not found in sources", |
| | problematic_text=entity, |
| | )) |
| |
|
| | |
| | if answer_words: |
| | score = len(unsupported_entities) / len(answer_words) |
| | else: |
| | score = 0.0 |
| |
|
| | return min(score, 1.0), issues |
| |
|
| | def _check_citations( |
| | self, |
| | answer: str, |
| | citations: List[Citation], |
| | sources: List[RankedResult], |
| | ) -> Tuple[float, List[ValidationIssue]]: |
| | """Verify citation accuracy.""" |
| | if not citations: |
| | |
| | return 0.0, [ValidationIssue( |
| | issue_type=IssueType.UNSUPPORTED_CLAIM, |
| | severity=0.6, |
| | description="Answer contains no citations", |
| | suggestion="Add citations to support claims", |
| | )] |
| |
|
| | |
| | source_text = self._format_sources(sources) |
| |
|
| | if HTTPX_AVAILABLE: |
| | try: |
| | prompt = self.CITATION_PROMPT.format( |
| | sources=source_text, |
| | answer=answer, |
| | ) |
| |
|
| | with httpx.Client(timeout=30.0) as client: |
| | response = client.post( |
| | f"{self.config.base_url}/api/generate", |
| | json={ |
| | "model": self.config.model, |
| | "prompt": prompt, |
| | "stream": False, |
| | "options": { |
| | "temperature": self.config.temperature, |
| | "num_predict": 512, |
| | }, |
| | }, |
| | ) |
| | response.raise_for_status() |
| | result = response.json() |
| |
|
| | response_text = result.get("response", "") |
| | data = self._parse_json_response(response_text) |
| |
|
| | accuracy = data.get("overall_accuracy", 1.0) |
| |
|
| | issues = [] |
| | for check in data.get("citation_checks", []): |
| | if not check.get("is_accurate", True): |
| | issues.append(ValidationIssue( |
| | issue_type=IssueType.INCORRECT_CITATION, |
| | severity=0.6, |
| | description=f"Citation [{check.get('citation_index')}]: {check.get('reason', 'Inaccurate')}", |
| | citation_index=check.get("citation_index"), |
| | suggestion="Verify citation matches source", |
| | )) |
| |
|
| | return accuracy, issues |
| |
|
| | except Exception as e: |
| | logger.warning(f"Citation check failed: {e}") |
| |
|
| | |
| | citation_pattern = r'\[(\d+)\]' |
| | used_citations = set(int(m) for m in re.findall(citation_pattern, answer)) |
| |
|
| | if not used_citations: |
| | return 0.5, [] |
| |
|
| | |
| | valid_indices = set(range(1, len(sources) + 1)) |
| | invalid = used_citations - valid_indices |
| |
|
| | issues = [] |
| | for idx in invalid: |
| | issues.append(ValidationIssue( |
| | issue_type=IssueType.INCORRECT_CITATION, |
| | severity=0.7, |
| | description=f"Citation [{idx}] references non-existent source", |
| | citation_index=idx, |
| | )) |
| |
|
| | accuracy = 1.0 - (len(invalid) / len(used_citations)) if used_citations else 1.0 |
| | return accuracy, issues |
| |
|
| | def _check_consistency( |
| | self, |
| | answer: str, |
| | sources: List[RankedResult], |
| | ) -> Tuple[float, List[ValidationIssue]]: |
| | """Check for internal and external consistency.""" |
| | issues = [] |
| |
|
| | |
| | contradictions = self._detect_contradictions(answer) |
| | for contradiction in contradictions: |
| | issues.append(ValidationIssue( |
| | issue_type=IssueType.CONTRADICTION, |
| | severity=0.7, |
| | description=contradiction, |
| | )) |
| |
|
| | |
| | |
| | if len(answer) < 50 and len(sources) > 0: |
| | issues.append(ValidationIssue( |
| | issue_type=IssueType.INCOMPLETE, |
| | severity=0.4, |
| | description="Answer may be incomplete given available sources", |
| | suggestion="Expand answer to include more relevant information", |
| | )) |
| |
|
| | score = 1.0 - (0.2 * len(issues)) |
| | return max(score, 0.0), issues |
| |
|
| | def _detect_contradictions(self, text: str) -> List[str]: |
| | """Simple contradiction detection.""" |
| | contradictions = [] |
| |
|
| | |
| | sentences = text.split('.') |
| | for i, sent in enumerate(sentences): |
| | sent_lower = sent.lower() |
| | |
| | if any(c in sent_lower for c in ["however", "but", "although"]): |
| | |
| | pass |
| |
|
| | return contradictions |
| |
|
| | def _format_sources(self, sources: List[RankedResult]) -> str: |
| | """Format sources for prompt.""" |
| | parts = [] |
| | for i, source in enumerate(sources, 1): |
| | parts.append(f"[{i}] {source.text[:500]}") |
| | return "\n\n".join(parts) |
| |
|
| | def _parse_json_response(self, text: str) -> Dict[str, Any]: |
| | """Parse JSON from LLM response.""" |
| | try: |
| | json_match = re.search(r'\{[\s\S]*\}', text) |
| | if json_match: |
| | return json.loads(json_match.group()) |
| | except json.JSONDecodeError: |
| | pass |
| | return {} |
| |
|
| | def _generate_revision_suggestions( |
| | self, |
| | issues: List[ValidationIssue], |
| | ) -> List[str]: |
| | """Generate actionable revision suggestions.""" |
| | suggestions = [] |
| |
|
| | for issue in issues: |
| | if issue.suggestion: |
| | suggestions.append(issue.suggestion) |
| | elif issue.issue_type == IssueType.HALLUCINATION: |
| | suggestions.append( |
| | f"Remove or verify: {issue.problematic_text or 'unsupported claim'}" |
| | ) |
| | elif issue.issue_type == IssueType.INCORRECT_CITATION: |
| | suggestions.append( |
| | f"Fix citation [{issue.citation_index}] to match source" |
| | ) |
| |
|
| | return list(set(suggestions))[:5] |
| |
|