""" main.py — FastAPI WebSocket hub with Redis-backed claim caching. Architecture: Browser extension → WS connection → ConnectionManager ↓ Redis cache check (xxhash key) ↓ miss Gatekeeper (Groq) ↓ fact RAG pipeline + Trust graph ↓ Prefect multi-agent flow ↓ AnalysisResult → WS push to extension """ from __future__ import annotations import asyncio import hashlib import json import os import time from contextlib import asynccontextmanager from typing import Any import orjson import redis.asyncio as aioredis import structlog import xxhash from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from pydantic import BaseModel, Field from pydantic_settings import BaseSettings from agents import run_analysis_flow from gatekeeper import classify_claim from rag_pipeline import build_rag_context # ── Logging ────────────────────────────────────────────────────────────────── structlog.configure( processors=[ structlog.stdlib.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.dev.ConsoleRenderer(colors=False), ], wrapper_class=structlog.make_filtering_bound_logger(20), context_class=dict, logger_factory=structlog.PrintLoggerFactory(), ) log = structlog.get_logger(__name__) # ── Settings ────────────────────────────────────────────────────────────────── class Settings(BaseSettings): groq_api_key: str = os.getenv("GROQ_API_KEY", "") anthropic_api_key: str = os.getenv("ANTHROPIC_API_KEY", "") openai_api_key: str = os.getenv("OPENAI_API_KEY", "") redis_url: str = os.getenv("REDIS_URL", "redis://localhost:6379") qdrant_url: str = os.getenv("QDRANT_URL", "http://localhost:6333") memgraph_host: str = os.getenv("MEMGRAPH_HOST", "localhost") memgraph_port: int = int(os.getenv("MEMGRAPH_PORT", "7687")) redpanda_brokers: str = os.getenv("REDPANDA_BROKERS", "localhost:9092") x_bearer_token: str = os.getenv("X_BEARER_TOKEN", "") # TTL seconds: Green/Red = 6h, Yellow = 15min, Purple = no cache cache_ttl_green_red: int = 21600 cache_ttl_yellow: int = 900 class Config: env_file = ".env" settings = Settings() # ── Redis client ────────────────────────────────────────────────────────────── redis_client: aioredis.Redis | None = None async def get_redis() -> aioredis.Redis: global redis_client if redis_client is None: try: redis_client = aioredis.from_url( settings.redis_url, encoding="utf-8", decode_responses=True, socket_connect_timeout=2, ) await redis_client.ping() log.info("redis.connected", url=settings.redis_url) except Exception as exc: log.warning("redis.unavailable", error=str(exc)) redis_client = None return redis_client # type: ignore[return-value] def claim_cache_key(claim_hash: str) -> str: return f"fact:v1:{claim_hash}" async def cache_get(claim_hash: str) -> dict[str, Any] | None: try: r = await get_redis() if r is None: return None raw = await r.get(claim_cache_key(claim_hash)) return orjson.loads(raw) if raw else None except Exception: return None async def cache_set(claim_hash: str, result: dict[str, Any]) -> None: try: r = await get_redis() if r is None: return color = result.get("color", "yellow") ttl = ( settings.cache_ttl_green_red if color in ("green", "red") else settings.cache_ttl_yellow if color == "yellow" else None # purple — never cache ) if ttl is not None: await r.setex( claim_cache_key(claim_hash), ttl, orjson.dumps(result).decode(), ) except Exception as exc: log.warning("cache.set_failed", error=str(exc)) # ── WebSocket Connection Manager ────────────────────────────────────────────── class ConnectionManager: """Thread-safe registry of active WebSocket connections.""" def __init__(self) -> None: self._connections: dict[str, WebSocket] = {} self._lock = asyncio.Lock() async def connect(self, ws: WebSocket, client_id: str) -> None: await ws.accept() async with self._lock: self._connections[client_id] = ws log.info("ws.connected", client_id=client_id, total=len(self._connections)) async def disconnect(self, client_id: str) -> None: async with self._lock: self._connections.pop(client_id, None) log.info("ws.disconnected", client_id=client_id, total=len(self._connections)) async def send(self, client_id: str, payload: dict[str, Any]) -> None: async with self._lock: ws = self._connections.get(client_id) if ws: try: await ws.send_text(orjson.dumps(payload).decode()) except Exception as exc: log.warning("ws.send_failed", client_id=client_id, error=str(exc)) await self.disconnect(client_id) async def broadcast(self, payload: dict[str, Any]) -> None: async with self._lock: targets = list(self._connections.items()) await asyncio.gather( *[ws.send_text(orjson.dumps(payload).decode()) for _, ws in targets], return_exceptions=True, ) @property def count(self) -> int: return len(self._connections) manager = ConnectionManager() # ── Request/Response models ─────────────────────────────────────────────────── class AnalysisBatch(BaseModel): """Incoming batch from the browser extension.""" client_id: str claims: list[str] = Field(..., min_length=1, max_length=20) platform: str = Field(default="web") # x, instagram, youtube, chatgpt, claude, gemini, web timestamp: float = Field(default_factory=time.time) class AnalysisResult(BaseModel): """Outgoing result per-claim.""" claim_hash: str claim_text: str color: str # green | yellow | red | purple confidence: int # 0–100 verdict: str explanation: str sources: list[str] trust_score: float cached: bool = False processing_ms: float = 0.0 # ── Lifespan ────────────────────────────────────────────────────────────────── @asynccontextmanager async def lifespan(_app: FastAPI): log.info("startup", version="1.0.0") await get_redis() yield log.info("shutdown") if redis_client: await redis_client.aclose() # ── FastAPI app ─────────────────────────────────────────────────────────────── app = FastAPI( title="Fact & Hallucination Intelligence Engine", version="1.0.0", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/", response_class=HTMLResponse) async def root(): return HTMLResponse(""" Fact Engine — Status

⚡ FACT ENGINE

System Online

WebSocket: ws://[host]/ws/{client_id}

Health: /health

""") @app.get("/health") async def health(): redis_ok = False try: r = await get_redis() if r: await r.ping() redis_ok = True except Exception: pass return { "status": "ok", "connections": manager.count, "redis": redis_ok, "timestamp": time.time(), } @app.websocket("/ws/{client_id}") async def websocket_endpoint(ws: WebSocket, client_id: str): await manager.connect(ws, client_id) try: while True: raw = await ws.receive_text() try: data = orjson.loads(raw) except Exception: await manager.send(client_id, {"error": "invalid_json"}) continue batch = AnalysisBatch.model_validate(data) # Process claims concurrently (max 5 at once to avoid rate limits) sem = asyncio.Semaphore(5) tasks = [process_claim(sem, claim, batch.platform) for claim in batch.claims] results = await asyncio.gather(*tasks, return_exceptions=True) response_items = [] for res in results: if isinstance(res, Exception): log.error("claim.process_error", error=str(res)) else: response_items.append(res.model_dump()) await manager.send(client_id, { "type": "analysis_batch", "results": response_items, "request_timestamp": batch.timestamp, }) except WebSocketDisconnect: await manager.disconnect(client_id) except Exception as exc: log.error("ws.error", client_id=client_id, error=str(exc)) await manager.disconnect(client_id) async def process_claim( sem: asyncio.Semaphore, claim_text: str, platform: str, ) -> AnalysisResult: """ Full pipeline per claim: 1. xxhash → Redis cache check (skip pipeline on hit) 2. Gatekeeper (Groq): fact vs. noise filter 3. RAG pipeline: embed → Qdrant ANN → Memgraph trust score 4. Prefect multi-agent flow: misinformation + hallucination tasks 5. Cache result, return AnalysisResult """ async with sem: t0 = time.perf_counter() claim_hash = xxhash.xxh64(claim_text.encode()).hexdigest() # Step 1 — Cache check cached = await cache_get(claim_hash) if cached: return AnalysisResult(**{**cached, "cached": True}) # Step 2 — Gatekeeper gate = await classify_claim(claim_text) if gate.label == "noise": # Return a neutral result without running the expensive pipeline result = AnalysisResult( claim_hash=claim_hash, claim_text=claim_text, color="green", confidence=50, verdict="Opinion / Social noise", explanation=gate.reason, sources=[], trust_score=0.5, processing_ms=(time.perf_counter() - t0) * 1000, ) return result # Step 3 — RAG + trust scoring rag_ctx = await build_rag_context(claim_text, claim_hash) # Step 4 — Multi-agent Prefect flow analysis = await run_analysis_flow( claim_text=claim_text, claim_hash=claim_hash, platform=platform, rag_context=rag_ctx, ) result = AnalysisResult( claim_hash=claim_hash, claim_text=claim_text, color=analysis.color, confidence=analysis.confidence, verdict=analysis.verdict, explanation=analysis.explanation, sources=analysis.sources, trust_score=rag_ctx.trust_score, processing_ms=(time.perf_counter() - t0) * 1000, ) # Step 5 — Cache await cache_set(claim_hash, result.model_dump()) return result if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host="0.0.0.0", port=int(os.getenv("PORT", "7860")), # HF Spaces default port reload=False, log_level="info", )