rwttrter / backend /agents.py
plexdx's picture
Upload 26 files
64d289f verified
"""
agents.py β€” Prefect-orchestrated multi-agent evaluation layer.
Two concurrent agents evaluate each claim:
1. misinformation_task β†’ Groq mixtral-8x7b-32768
Given: claim + top-3 RAG evidence chunks + trust score
Output: color (red|yellow|green), confidence, explanation, sources
2. hallucination_task β†’ Claude Haiku (runs ONLY on AI chat platforms)
Given: claim text
Output: color (purple|green), confidence, explanation
Checks for: fabricated citations, statistical impossibilities,
internal contradictions, LLM-specific failure patterns
Both tasks run concurrently via asyncio.gather. Prefect merges results,
picks higher-severity color, returns the final AnalysisResult.
Why Prefect over Celery:
- Dynamic DAG-based orchestration (no pre-declared task graph)
- Native async support β€” no gevent hacks needed
- Built-in retry with exponential backoff per task
- Far better observability: every flow run gets a full execution trace
- Deployable without a separate worker process (embedded server mode)
"""
import asyncio
import time
from typing import Literal
import structlog
from litellm import acompletion
from prefect import flow, task
from prefect.tasks import task_input_hash
from core.config import HighlightColor, Platform, Settings, get_settings
from core.models import AnalysisResult, EvidenceChunk, GrokSensorResult, RAGResult, SourceRef, TrustScore
log = structlog.get_logger(__name__)
# ---------------------------------------------------------------------------
# Color severity ordering (higher index = more severe)
# ---------------------------------------------------------------------------
SEVERITY: dict[HighlightColor, int] = {
HighlightColor.GREEN: 0,
HighlightColor.YELLOW: 1,
HighlightColor.RED: 2,
HighlightColor.PURPLE: 3,
}
# ---------------------------------------------------------------------------
# LiteLLM prompts
# ---------------------------------------------------------------------------
MISINFO_SYSTEM = """You are a professional fact-checker with access to recent evidence.
Analyze the claim against the evidence chunks and trust score. Output ONLY valid JSON.
Output schema (no markdown, no preamble):
{
"color": "red" | "yellow" | "green",
"confidence": <integer 0-100>,
"explanation": "<2-3 sentence explanation for the hover card>",
"verdict_label": "<8 words max, e.g. 'Debunked by Reuters and AP'>",
"sources": ["<url1>", "<url2>", "<url3>"]
}
Color logic:
- "green": Claim is factually accurate, corroborated by β‰₯2 independent sources, trust score β‰₯ 0.65
- "yellow": Claim is unverified, breaking news, or evidence is weak/contradictory
- "red": Claim is demonstrably false, debunked by β‰₯2 sources, OR trust score < 0.25, OR community note active"""
MISINFO_USER_TMPL = """Claim: {claim}
Trust score: {trust_score:.2f} (0=untrustworthy, 1=highly trusted)
Author verified: {verified}
Active Community Note: {has_note}{note_text_part}
Corroborating sources in database: {source_count}
Evidence chunks (cosine similarity descending):
{evidence_text}
Analyze and output JSON."""
HALLUCINATION_SYSTEM = """You are an LLM output auditor specializing in detecting AI hallucinations.
Analyze the following text that was generated by an AI system. Output ONLY valid JSON.
Output schema:
{
"color": "purple" | "green",
"confidence": <integer 0-100>,
"explanation": "<specific explanation of what's wrong, or confirmation it's accurate>"
}
Check for:
1. Fabricated citations: URLs, paper titles, author names that don't exist
2. Statistical impossibilities: numbers that exceed known bounds (e.g., "500% of people")
3. Internal contradictions: statements that contradict each other within the text
4. Temporal paradoxes: referencing future events as past, or anachronistic details
5. Entity confusion: mixing attributes of different real-world entities
Color "purple" only if you find a clear, specific hallucination pattern.
Color "green" if the text appears factually coherent (you cannot verify external facts)."""
# ---------------------------------------------------------------------------
# Prefect tasks β€” each is independently retried with exponential backoff
# ---------------------------------------------------------------------------
@task(
name="misinformation-agent",
retries=2,
retry_delay_seconds=[1, 3],
cache_key_fn=task_input_hash,
cache_expiration=None,
log_prints=False,
)
async def misinformation_task(
claim: str,
evidence: list[EvidenceChunk],
trust: TrustScore,
grok: GrokSensorResult,
settings: Settings,
) -> dict:
"""
Groq mixtral-8x7b-32768 evaluates the claim against RAG evidence.
32k context window accommodates all 8 evidence chunks comfortably.
"""
# Build evidence text block (top-3 by cosine score for the prompt)
top_evidence = sorted(evidence, key=lambda e: e.score, reverse=True)[:3]
evidence_text = "\n\n".join(
f"[{i+1}] Source: {e.domain} (similarity: {e.score:.3f})\n{e.text[:400]}"
for i, e in enumerate(top_evidence)
) or "No evidence chunks retrieved (claim may be too recent or niche)."
note_part = f"\nCommunity Note: {trust.community_note_text}" if trust.community_note_text else ""
user_prompt = MISINFO_USER_TMPL.format(
claim=claim[:500],
trust_score=trust.score,
verified=trust.author_verified,
has_note=trust.has_community_note,
note_text_part=note_part,
source_count=trust.corroborating_sources,
evidence_text=evidence_text,
)
# LiteLLM routes to Groq β€” swap to "openai/gpt-4o" or "groq/llama3-70b-8192"
# by changing a single string, zero code changes elsewhere
response = await acompletion(
model=settings.misinformation_model,
messages=[
{"role": "system", "content": MISINFO_SYSTEM},
{"role": "user", "content": user_prompt},
],
response_format={"type": "json_object"},
temperature=0.1,
max_tokens=400,
api_key=settings.groq_api_key or None,
)
import json
raw = response.choices[0].message.content or "{}"
return json.loads(raw)
@task(
name="hallucination-agent",
retries=2,
retry_delay_seconds=[1, 3],
log_prints=False,
)
async def hallucination_task(claim: str, settings: Settings) -> dict:
"""
Groq llama3-70b-8192 audits AI-generated text for hallucination patterns.
Previously Claude Haiku β€” now fully free via Groq, same prompt, same output schema.
Only invoked when the source platform is an AI chat interface.
"""
response = await acompletion(
model=settings.hallucination_model, # groq/llama3-70b-8192
messages=[
{"role": "system", "content": HALLUCINATION_SYSTEM},
{"role": "user", "content": f"Audit this AI-generated text:\n\n{claim[:1000]}"},
],
response_format={"type": "json_object"},
temperature=0.0,
max_tokens=300,
api_key=settings.groq_api_key or None,
)
import json
raw = response.choices[0].message.content or "{}"
return json.loads(raw)
def _demo_misinfo_result(trust_score: float, has_note: bool) -> dict:
"""Deterministic demo result when LLM keys are absent."""
if has_note or trust_score < 0.25:
return {
"color": "red", "confidence": 82,
"explanation": "Demo mode: trust score below threshold and/or active community note detected.",
"verdict_label": "Low trust signal detected",
"sources": [],
}
elif trust_score < 0.55:
return {
"color": "yellow", "confidence": 61,
"explanation": "Demo mode: insufficient corroboration to confirm or deny this claim.",
"verdict_label": "Unverified β€” insufficient evidence",
"sources": [],
}
return {
"color": "green", "confidence": 78,
"explanation": "Demo mode: claim appears well-corroborated based on trust graph signals.",
"verdict_label": "Appears credible",
"sources": [],
}
def _demo_hallucination_result() -> dict:
return {
"color": "purple", "confidence": 71,
"explanation": "Demo mode: AI-generated content detected. Unable to verify external citations without live API.",
}
# ---------------------------------------------------------------------------
# Main Prefect flow
# ---------------------------------------------------------------------------
@flow(name="fact-intelligence-pipeline", log_prints=False)
async def evaluate_claim(
claim: str,
claim_hash: str,
element_id: str,
platform: Platform,
rag_result: RAGResult,
grok_result: GrokSensorResult,
settings: Settings | None = None,
) -> AnalysisResult:
"""
Orchestrates the full multi-agent evaluation as a Prefect flow.
Concurrent execution:
- misinformation_task always runs
- hallucination_task runs only for AI chat platforms
Results are merged by taking the higher-severity color.
The final AnalysisResult is returned directly (no Celery queue needed).
"""
cfg = settings or get_settings()
t0 = time.perf_counter()
is_ai_platform = platform in (Platform.CHATGPT, Platform.CLAUDE, Platform.GEMINI)
# Determine whether to use demo mode
use_demo = cfg.demo_mode or not cfg.has_groq
if use_demo:
misinfo_raw = _demo_misinfo_result(rag_result.trust.score, grok_result.community_note)
halluc_raw = _demo_hallucination_result() if is_ai_platform else None
else:
# Concurrently run both agents when applicable
# Both agents now use Groq (free) β€” no Anthropic key needed
if is_ai_platform and cfg.has_groq:
misinfo_raw, halluc_raw = await asyncio.gather(
misinformation_task(claim, rag_result.evidence, rag_result.trust, grok_result, cfg),
hallucination_task(claim, cfg),
)
else:
misinfo_raw = await misinformation_task(
claim, rag_result.evidence, rag_result.trust, grok_result, cfg
)
halluc_raw = None
# --- Merge results: pick higher-severity color ---
misinfo_color = HighlightColor(misinfo_raw.get("color", "yellow"))
final_color = misinfo_color
final_confidence = misinfo_raw.get("confidence", 50)
final_explanation = misinfo_raw.get("explanation", "")
final_verdict = misinfo_raw.get("verdict_label", "Under review")
if halluc_raw:
halluc_color = HighlightColor(halluc_raw.get("color", "green"))
if SEVERITY[halluc_color] > SEVERITY[final_color]:
final_color = halluc_color
final_confidence = halluc_raw.get("confidence", final_confidence)
final_explanation = halluc_raw.get("explanation", final_explanation)
final_verdict = "AI hallucination detected"
# Build SourceRef list from evidence + misinfo agent sources
raw_sources: list[str] = misinfo_raw.get("sources", [])
evidence_sources = [e.source_url for e in rag_result.evidence[:3] if e.source_url]
combined = list(dict.fromkeys(raw_sources + evidence_sources))[:3] # deduplicated, max 3
source_refs = [
SourceRef(
url=url,
domain=_extract_domain(url),
favicon_url=f"https://www.google.com/s2/favicons?domain={_extract_domain(url)}&sz=16",
snippet="",
)
for url in combined
]
latency_ms = round((time.perf_counter() - t0) * 1000, 2)
log.info(
"agents.flow.complete",
color=final_color,
confidence=final_confidence,
platform=platform,
latency_ms=latency_ms,
demo=use_demo,
)
return AnalysisResult(
element_id=element_id,
content_hash=claim_hash,
platform=platform,
color=final_color,
confidence=final_confidence,
verdict_label=final_verdict,
explanation=final_explanation,
sources=source_refs,
gatekeeper_label="fact",
trust_score=rag_result.trust.score,
velocity=grok_result.velocity,
has_community_note=grok_result.community_note,
latency_ms=latency_ms,
)
def _extract_domain(url: str) -> str:
try:
from urllib.parse import urlparse
return urlparse(url).netloc.lstrip("www.")
except Exception:
return url