""" agents.py — Prefect-orchestrated multi-agent evaluation layer. Two concurrent agents evaluate each claim: 1. misinformation_task → Groq mixtral-8x7b-32768 Given: claim + top-3 RAG evidence chunks + trust score Output: color (red|yellow|green), confidence, explanation, sources 2. hallucination_task → Claude Haiku (runs ONLY on AI chat platforms) Given: claim text Output: color (purple|green), confidence, explanation Checks for: fabricated citations, statistical impossibilities, internal contradictions, LLM-specific failure patterns Both tasks run concurrently via asyncio.gather. Prefect merges results, picks higher-severity color, returns the final AnalysisResult. Why Prefect over Celery: - Dynamic DAG-based orchestration (no pre-declared task graph) - Native async support — no gevent hacks needed - Built-in retry with exponential backoff per task - Far better observability: every flow run gets a full execution trace - Deployable without a separate worker process (embedded server mode) """ import asyncio import time from typing import Literal import structlog from litellm import acompletion from prefect import flow, task from prefect.tasks import task_input_hash from core.config import HighlightColor, Platform, Settings, get_settings from core.models import AnalysisResult, EvidenceChunk, GrokSensorResult, RAGResult, SourceRef, TrustScore log = structlog.get_logger(__name__) # --------------------------------------------------------------------------- # Color severity ordering (higher index = more severe) # --------------------------------------------------------------------------- SEVERITY: dict[HighlightColor, int] = { HighlightColor.GREEN: 0, HighlightColor.YELLOW: 1, HighlightColor.RED: 2, HighlightColor.PURPLE: 3, } # --------------------------------------------------------------------------- # LiteLLM prompts # --------------------------------------------------------------------------- MISINFO_SYSTEM = """You are a professional fact-checker with access to recent evidence. Analyze the claim against the evidence chunks and trust score. Output ONLY valid JSON. Output schema (no markdown, no preamble): { "color": "red" | "yellow" | "green", "confidence": , "explanation": "<2-3 sentence explanation for the hover card>", "verdict_label": "<8 words max, e.g. 'Debunked by Reuters and AP'>", "sources": ["", "", ""] } Color logic: - "green": Claim is factually accurate, corroborated by ≥2 independent sources, trust score ≥ 0.65 - "yellow": Claim is unverified, breaking news, or evidence is weak/contradictory - "red": Claim is demonstrably false, debunked by ≥2 sources, OR trust score < 0.25, OR community note active""" MISINFO_USER_TMPL = """Claim: {claim} Trust score: {trust_score:.2f} (0=untrustworthy, 1=highly trusted) Author verified: {verified} Active Community Note: {has_note}{note_text_part} Corroborating sources in database: {source_count} Evidence chunks (cosine similarity descending): {evidence_text} Analyze and output JSON.""" HALLUCINATION_SYSTEM = """You are an LLM output auditor specializing in detecting AI hallucinations. Analyze the following text that was generated by an AI system. Output ONLY valid JSON. Output schema: { "color": "purple" | "green", "confidence": , "explanation": "" } Check for: 1. Fabricated citations: URLs, paper titles, author names that don't exist 2. Statistical impossibilities: numbers that exceed known bounds (e.g., "500% of people") 3. Internal contradictions: statements that contradict each other within the text 4. Temporal paradoxes: referencing future events as past, or anachronistic details 5. Entity confusion: mixing attributes of different real-world entities Color "purple" only if you find a clear, specific hallucination pattern. Color "green" if the text appears factually coherent (you cannot verify external facts).""" # --------------------------------------------------------------------------- # Prefect tasks — each is independently retried with exponential backoff # --------------------------------------------------------------------------- @task( name="misinformation-agent", retries=2, retry_delay_seconds=[1, 3], cache_key_fn=task_input_hash, cache_expiration=None, log_prints=False, ) async def misinformation_task( claim: str, evidence: list[EvidenceChunk], trust: TrustScore, grok: GrokSensorResult, settings: Settings, ) -> dict: """ Groq mixtral-8x7b-32768 evaluates the claim against RAG evidence. 32k context window accommodates all 8 evidence chunks comfortably. """ # Build evidence text block (top-3 by cosine score for the prompt) top_evidence = sorted(evidence, key=lambda e: e.score, reverse=True)[:3] evidence_text = "\n\n".join( f"[{i+1}] Source: {e.domain} (similarity: {e.score:.3f})\n{e.text[:400]}" for i, e in enumerate(top_evidence) ) or "No evidence chunks retrieved (claim may be too recent or niche)." note_part = f"\nCommunity Note: {trust.community_note_text}" if trust.community_note_text else "" user_prompt = MISINFO_USER_TMPL.format( claim=claim[:500], trust_score=trust.score, verified=trust.author_verified, has_note=trust.has_community_note, note_text_part=note_part, source_count=trust.corroborating_sources, evidence_text=evidence_text, ) # LiteLLM routes to Groq — swap to "openai/gpt-4o" or "groq/llama3-70b-8192" # by changing a single string, zero code changes elsewhere response = await acompletion( model=settings.misinformation_model, messages=[ {"role": "system", "content": MISINFO_SYSTEM}, {"role": "user", "content": user_prompt}, ], response_format={"type": "json_object"}, temperature=0.1, max_tokens=400, api_key=settings.groq_api_key or None, ) import json raw = response.choices[0].message.content or "{}" return json.loads(raw) @task( name="hallucination-agent", retries=2, retry_delay_seconds=[1, 3], log_prints=False, ) async def hallucination_task(claim: str, settings: Settings) -> dict: """ Groq llama3-70b-8192 audits AI-generated text for hallucination patterns. Previously Claude Haiku — now fully free via Groq, same prompt, same output schema. Only invoked when the source platform is an AI chat interface. """ response = await acompletion( model=settings.hallucination_model, # groq/llama3-70b-8192 messages=[ {"role": "system", "content": HALLUCINATION_SYSTEM}, {"role": "user", "content": f"Audit this AI-generated text:\n\n{claim[:1000]}"}, ], response_format={"type": "json_object"}, temperature=0.0, max_tokens=300, api_key=settings.groq_api_key or None, ) import json raw = response.choices[0].message.content or "{}" return json.loads(raw) def _demo_misinfo_result(trust_score: float, has_note: bool) -> dict: """Deterministic demo result when LLM keys are absent.""" if has_note or trust_score < 0.25: return { "color": "red", "confidence": 82, "explanation": "Demo mode: trust score below threshold and/or active community note detected.", "verdict_label": "Low trust signal detected", "sources": [], } elif trust_score < 0.55: return { "color": "yellow", "confidence": 61, "explanation": "Demo mode: insufficient corroboration to confirm or deny this claim.", "verdict_label": "Unverified — insufficient evidence", "sources": [], } return { "color": "green", "confidence": 78, "explanation": "Demo mode: claim appears well-corroborated based on trust graph signals.", "verdict_label": "Appears credible", "sources": [], } def _demo_hallucination_result() -> dict: return { "color": "purple", "confidence": 71, "explanation": "Demo mode: AI-generated content detected. Unable to verify external citations without live API.", } # --------------------------------------------------------------------------- # Main Prefect flow # --------------------------------------------------------------------------- @flow(name="fact-intelligence-pipeline", log_prints=False) async def evaluate_claim( claim: str, claim_hash: str, element_id: str, platform: Platform, rag_result: RAGResult, grok_result: GrokSensorResult, settings: Settings | None = None, ) -> AnalysisResult: """ Orchestrates the full multi-agent evaluation as a Prefect flow. Concurrent execution: - misinformation_task always runs - hallucination_task runs only for AI chat platforms Results are merged by taking the higher-severity color. The final AnalysisResult is returned directly (no Celery queue needed). """ cfg = settings or get_settings() t0 = time.perf_counter() is_ai_platform = platform in (Platform.CHATGPT, Platform.CLAUDE, Platform.GEMINI) # Determine whether to use demo mode use_demo = cfg.demo_mode or not cfg.has_groq if use_demo: misinfo_raw = _demo_misinfo_result(rag_result.trust.score, grok_result.community_note) halluc_raw = _demo_hallucination_result() if is_ai_platform else None else: # Concurrently run both agents when applicable # Both agents now use Groq (free) — no Anthropic key needed if is_ai_platform and cfg.has_groq: misinfo_raw, halluc_raw = await asyncio.gather( misinformation_task(claim, rag_result.evidence, rag_result.trust, grok_result, cfg), hallucination_task(claim, cfg), ) else: misinfo_raw = await misinformation_task( claim, rag_result.evidence, rag_result.trust, grok_result, cfg ) halluc_raw = None # --- Merge results: pick higher-severity color --- misinfo_color = HighlightColor(misinfo_raw.get("color", "yellow")) final_color = misinfo_color final_confidence = misinfo_raw.get("confidence", 50) final_explanation = misinfo_raw.get("explanation", "") final_verdict = misinfo_raw.get("verdict_label", "Under review") if halluc_raw: halluc_color = HighlightColor(halluc_raw.get("color", "green")) if SEVERITY[halluc_color] > SEVERITY[final_color]: final_color = halluc_color final_confidence = halluc_raw.get("confidence", final_confidence) final_explanation = halluc_raw.get("explanation", final_explanation) final_verdict = "AI hallucination detected" # Build SourceRef list from evidence + misinfo agent sources raw_sources: list[str] = misinfo_raw.get("sources", []) evidence_sources = [e.source_url for e in rag_result.evidence[:3] if e.source_url] combined = list(dict.fromkeys(raw_sources + evidence_sources))[:3] # deduplicated, max 3 source_refs = [ SourceRef( url=url, domain=_extract_domain(url), favicon_url=f"https://www.google.com/s2/favicons?domain={_extract_domain(url)}&sz=16", snippet="", ) for url in combined ] latency_ms = round((time.perf_counter() - t0) * 1000, 2) log.info( "agents.flow.complete", color=final_color, confidence=final_confidence, platform=platform, latency_ms=latency_ms, demo=use_demo, ) return AnalysisResult( element_id=element_id, content_hash=claim_hash, platform=platform, color=final_color, confidence=final_confidence, verdict_label=final_verdict, explanation=final_explanation, sources=source_refs, gatekeeper_label="fact", trust_score=rag_result.trust.score, velocity=grok_result.velocity, has_community_note=grok_result.community_note, latency_ms=latency_ms, ) def _extract_domain(url: str) -> str: try: from urllib.parse import urlparse return urlparse(url).netloc.lstrip("www.") except Exception: return url