| """ |
| agents.py β Prefect-orchestrated multi-agent evaluation layer. |
| |
| Two concurrent agents evaluate each claim: |
| |
| 1. misinformation_task β Groq mixtral-8x7b-32768 |
| Given: claim + top-3 RAG evidence chunks + trust score |
| Output: color (red|yellow|green), confidence, explanation, sources |
| |
| 2. hallucination_task β Claude Haiku (runs ONLY on AI chat platforms) |
| Given: claim text |
| Output: color (purple|green), confidence, explanation |
| Checks for: fabricated citations, statistical impossibilities, |
| internal contradictions, LLM-specific failure patterns |
| |
| Both tasks run concurrently via asyncio.gather. Prefect merges results, |
| picks higher-severity color, returns the final AnalysisResult. |
| |
| Why Prefect over Celery: |
| - Dynamic DAG-based orchestration (no pre-declared task graph) |
| - Native async support β no gevent hacks needed |
| - Built-in retry with exponential backoff per task |
| - Far better observability: every flow run gets a full execution trace |
| - Deployable without a separate worker process (embedded server mode) |
| """ |
|
|
| import asyncio |
| import time |
| from typing import Literal |
|
|
| import structlog |
| from litellm import acompletion |
| from prefect import flow, task |
| from prefect.tasks import task_input_hash |
|
|
| from core.config import HighlightColor, Platform, Settings, get_settings |
| from core.models import AnalysisResult, EvidenceChunk, GrokSensorResult, RAGResult, SourceRef, TrustScore |
|
|
| log = structlog.get_logger(__name__) |
|
|
| |
| |
| |
| SEVERITY: dict[HighlightColor, int] = { |
| HighlightColor.GREEN: 0, |
| HighlightColor.YELLOW: 1, |
| HighlightColor.RED: 2, |
| HighlightColor.PURPLE: 3, |
| } |
|
|
| |
| |
| |
|
|
| MISINFO_SYSTEM = """You are a professional fact-checker with access to recent evidence. |
| Analyze the claim against the evidence chunks and trust score. Output ONLY valid JSON. |
| |
| Output schema (no markdown, no preamble): |
| { |
| "color": "red" | "yellow" | "green", |
| "confidence": <integer 0-100>, |
| "explanation": "<2-3 sentence explanation for the hover card>", |
| "verdict_label": "<8 words max, e.g. 'Debunked by Reuters and AP'>", |
| "sources": ["<url1>", "<url2>", "<url3>"] |
| } |
| |
| Color logic: |
| - "green": Claim is factually accurate, corroborated by β₯2 independent sources, trust score β₯ 0.65 |
| - "yellow": Claim is unverified, breaking news, or evidence is weak/contradictory |
| - "red": Claim is demonstrably false, debunked by β₯2 sources, OR trust score < 0.25, OR community note active""" |
|
|
| MISINFO_USER_TMPL = """Claim: {claim} |
| |
| Trust score: {trust_score:.2f} (0=untrustworthy, 1=highly trusted) |
| Author verified: {verified} |
| Active Community Note: {has_note}{note_text_part} |
| Corroborating sources in database: {source_count} |
| |
| Evidence chunks (cosine similarity descending): |
| {evidence_text} |
| |
| Analyze and output JSON.""" |
|
|
| HALLUCINATION_SYSTEM = """You are an LLM output auditor specializing in detecting AI hallucinations. |
| Analyze the following text that was generated by an AI system. Output ONLY valid JSON. |
| |
| Output schema: |
| { |
| "color": "purple" | "green", |
| "confidence": <integer 0-100>, |
| "explanation": "<specific explanation of what's wrong, or confirmation it's accurate>" |
| } |
| |
| Check for: |
| 1. Fabricated citations: URLs, paper titles, author names that don't exist |
| 2. Statistical impossibilities: numbers that exceed known bounds (e.g., "500% of people") |
| 3. Internal contradictions: statements that contradict each other within the text |
| 4. Temporal paradoxes: referencing future events as past, or anachronistic details |
| 5. Entity confusion: mixing attributes of different real-world entities |
| |
| Color "purple" only if you find a clear, specific hallucination pattern. |
| Color "green" if the text appears factually coherent (you cannot verify external facts).""" |
|
|
|
|
| |
| |
| |
|
|
| @task( |
| name="misinformation-agent", |
| retries=2, |
| retry_delay_seconds=[1, 3], |
| cache_key_fn=task_input_hash, |
| cache_expiration=None, |
| log_prints=False, |
| ) |
| async def misinformation_task( |
| claim: str, |
| evidence: list[EvidenceChunk], |
| trust: TrustScore, |
| grok: GrokSensorResult, |
| settings: Settings, |
| ) -> dict: |
| """ |
| Groq mixtral-8x7b-32768 evaluates the claim against RAG evidence. |
| 32k context window accommodates all 8 evidence chunks comfortably. |
| """ |
| |
| top_evidence = sorted(evidence, key=lambda e: e.score, reverse=True)[:3] |
| evidence_text = "\n\n".join( |
| f"[{i+1}] Source: {e.domain} (similarity: {e.score:.3f})\n{e.text[:400]}" |
| for i, e in enumerate(top_evidence) |
| ) or "No evidence chunks retrieved (claim may be too recent or niche)." |
|
|
| note_part = f"\nCommunity Note: {trust.community_note_text}" if trust.community_note_text else "" |
|
|
| user_prompt = MISINFO_USER_TMPL.format( |
| claim=claim[:500], |
| trust_score=trust.score, |
| verified=trust.author_verified, |
| has_note=trust.has_community_note, |
| note_text_part=note_part, |
| source_count=trust.corroborating_sources, |
| evidence_text=evidence_text, |
| ) |
|
|
| |
| |
| response = await acompletion( |
| model=settings.misinformation_model, |
| messages=[ |
| {"role": "system", "content": MISINFO_SYSTEM}, |
| {"role": "user", "content": user_prompt}, |
| ], |
| response_format={"type": "json_object"}, |
| temperature=0.1, |
| max_tokens=400, |
| api_key=settings.groq_api_key or None, |
| ) |
|
|
| import json |
| raw = response.choices[0].message.content or "{}" |
| return json.loads(raw) |
|
|
|
|
| @task( |
| name="hallucination-agent", |
| retries=2, |
| retry_delay_seconds=[1, 3], |
| log_prints=False, |
| ) |
| async def hallucination_task(claim: str, settings: Settings) -> dict: |
| """ |
| Groq llama3-70b-8192 audits AI-generated text for hallucination patterns. |
| Previously Claude Haiku β now fully free via Groq, same prompt, same output schema. |
| Only invoked when the source platform is an AI chat interface. |
| """ |
| response = await acompletion( |
| model=settings.hallucination_model, |
| messages=[ |
| {"role": "system", "content": HALLUCINATION_SYSTEM}, |
| {"role": "user", "content": f"Audit this AI-generated text:\n\n{claim[:1000]}"}, |
| ], |
| response_format={"type": "json_object"}, |
| temperature=0.0, |
| max_tokens=300, |
| api_key=settings.groq_api_key or None, |
| ) |
|
|
| import json |
| raw = response.choices[0].message.content or "{}" |
| return json.loads(raw) |
|
|
|
|
| def _demo_misinfo_result(trust_score: float, has_note: bool) -> dict: |
| """Deterministic demo result when LLM keys are absent.""" |
| if has_note or trust_score < 0.25: |
| return { |
| "color": "red", "confidence": 82, |
| "explanation": "Demo mode: trust score below threshold and/or active community note detected.", |
| "verdict_label": "Low trust signal detected", |
| "sources": [], |
| } |
| elif trust_score < 0.55: |
| return { |
| "color": "yellow", "confidence": 61, |
| "explanation": "Demo mode: insufficient corroboration to confirm or deny this claim.", |
| "verdict_label": "Unverified β insufficient evidence", |
| "sources": [], |
| } |
| return { |
| "color": "green", "confidence": 78, |
| "explanation": "Demo mode: claim appears well-corroborated based on trust graph signals.", |
| "verdict_label": "Appears credible", |
| "sources": [], |
| } |
|
|
|
|
| def _demo_hallucination_result() -> dict: |
| return { |
| "color": "purple", "confidence": 71, |
| "explanation": "Demo mode: AI-generated content detected. Unable to verify external citations without live API.", |
| } |
|
|
|
|
| |
| |
| |
|
|
| @flow(name="fact-intelligence-pipeline", log_prints=False) |
| async def evaluate_claim( |
| claim: str, |
| claim_hash: str, |
| element_id: str, |
| platform: Platform, |
| rag_result: RAGResult, |
| grok_result: GrokSensorResult, |
| settings: Settings | None = None, |
| ) -> AnalysisResult: |
| """ |
| Orchestrates the full multi-agent evaluation as a Prefect flow. |
| |
| Concurrent execution: |
| - misinformation_task always runs |
| - hallucination_task runs only for AI chat platforms |
| |
| Results are merged by taking the higher-severity color. |
| The final AnalysisResult is returned directly (no Celery queue needed). |
| """ |
| cfg = settings or get_settings() |
| t0 = time.perf_counter() |
|
|
| is_ai_platform = platform in (Platform.CHATGPT, Platform.CLAUDE, Platform.GEMINI) |
|
|
| |
| use_demo = cfg.demo_mode or not cfg.has_groq |
|
|
| if use_demo: |
| misinfo_raw = _demo_misinfo_result(rag_result.trust.score, grok_result.community_note) |
| halluc_raw = _demo_hallucination_result() if is_ai_platform else None |
| else: |
| |
| |
| if is_ai_platform and cfg.has_groq: |
| misinfo_raw, halluc_raw = await asyncio.gather( |
| misinformation_task(claim, rag_result.evidence, rag_result.trust, grok_result, cfg), |
| hallucination_task(claim, cfg), |
| ) |
| else: |
| misinfo_raw = await misinformation_task( |
| claim, rag_result.evidence, rag_result.trust, grok_result, cfg |
| ) |
| halluc_raw = None |
|
|
| |
| misinfo_color = HighlightColor(misinfo_raw.get("color", "yellow")) |
| final_color = misinfo_color |
| final_confidence = misinfo_raw.get("confidence", 50) |
| final_explanation = misinfo_raw.get("explanation", "") |
| final_verdict = misinfo_raw.get("verdict_label", "Under review") |
|
|
| if halluc_raw: |
| halluc_color = HighlightColor(halluc_raw.get("color", "green")) |
| if SEVERITY[halluc_color] > SEVERITY[final_color]: |
| final_color = halluc_color |
| final_confidence = halluc_raw.get("confidence", final_confidence) |
| final_explanation = halluc_raw.get("explanation", final_explanation) |
| final_verdict = "AI hallucination detected" |
|
|
| |
| raw_sources: list[str] = misinfo_raw.get("sources", []) |
| evidence_sources = [e.source_url for e in rag_result.evidence[:3] if e.source_url] |
| combined = list(dict.fromkeys(raw_sources + evidence_sources))[:3] |
|
|
| source_refs = [ |
| SourceRef( |
| url=url, |
| domain=_extract_domain(url), |
| favicon_url=f"https://www.google.com/s2/favicons?domain={_extract_domain(url)}&sz=16", |
| snippet="", |
| ) |
| for url in combined |
| ] |
|
|
| latency_ms = round((time.perf_counter() - t0) * 1000, 2) |
|
|
| log.info( |
| "agents.flow.complete", |
| color=final_color, |
| confidence=final_confidence, |
| platform=platform, |
| latency_ms=latency_ms, |
| demo=use_demo, |
| ) |
|
|
| return AnalysisResult( |
| element_id=element_id, |
| content_hash=claim_hash, |
| platform=platform, |
| color=final_color, |
| confidence=final_confidence, |
| verdict_label=final_verdict, |
| explanation=final_explanation, |
| sources=source_refs, |
| gatekeeper_label="fact", |
| trust_score=rag_result.trust.score, |
| velocity=grok_result.velocity, |
| has_community_note=grok_result.community_note, |
| latency_ms=latency_ms, |
| ) |
|
|
|
|
| def _extract_domain(url: str) -> str: |
| try: |
| from urllib.parse import urlparse |
| return urlparse(url).netloc.lstrip("www.") |
| except Exception: |
| return url |
|
|