""" main.py — FastAPI WebSocket hub with Redis-backed claim caching. Architecture: Browser extension → WS connection → ConnectionManager ↓ Redis cache check (xxhash key) ↓ miss Gatekeeper (Groq) ↓ fact RAG pipeline + Trust graph ↓ Prefect multi-agent flow ↓ AnalysisResult → WS push to extension """ from __future__ import annotations import asyncio import hashlib import json import os import time from contextlib import asynccontextmanager from typing import Any import orjson import redis.asyncio as aioredis import structlog import xxhash from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from pydantic import BaseModel, Field from pydantic_settings import BaseSettings from agents import run_analysis_flow from gatekeeper import classify_claim from rag_pipeline import build_rag_context # ── Logging ────────────────────────────────────────────────────────────────── structlog.configure( processors=[ structlog.stdlib.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.dev.ConsoleRenderer(colors=False), ], wrapper_class=structlog.make_filtering_bound_logger(20), context_class=dict, logger_factory=structlog.PrintLoggerFactory(), ) log = structlog.get_logger(__name__) # ── Settings ────────────────────────────────────────────────────────────────── class Settings(BaseSettings): groq_api_key: str = os.getenv("GROQ_API_KEY", "") anthropic_api_key: str = os.getenv("ANTHROPIC_API_KEY", "") openai_api_key: str = os.getenv("OPENAI_API_KEY", "") redis_url: str = os.getenv("REDIS_URL", "redis://localhost:6379") qdrant_url: str = os.getenv("QDRANT_URL", "http://localhost:6333") memgraph_host: str = os.getenv("MEMGRAPH_HOST", "localhost") memgraph_port: int = int(os.getenv("MEMGRAPH_PORT", "7687")) redpanda_brokers: str = os.getenv("REDPANDA_BROKERS", "localhost:9092") x_bearer_token: str = os.getenv("X_BEARER_TOKEN", "") # TTL seconds: Green/Red = 6h, Yellow = 15min, Purple = no cache cache_ttl_green_red: int = 21600 cache_ttl_yellow: int = 900 class Config: env_file = ".env" settings = Settings() # ── Redis client ────────────────────────────────────────────────────────────── redis_client: aioredis.Redis | None = None async def get_redis() -> aioredis.Redis: global redis_client if redis_client is None: try: redis_client = aioredis.from_url( settings.redis_url, encoding="utf-8", decode_responses=True, socket_connect_timeout=2, ) await redis_client.ping() log.info("redis.connected", url=settings.redis_url) except Exception as exc: log.warning("redis.unavailable", error=str(exc)) redis_client = None return redis_client # type: ignore[return-value] def claim_cache_key(claim_hash: str) -> str: return f"fact:v1:{claim_hash}" async def cache_get(claim_hash: str) -> dict[str, Any] | None: try: r = await get_redis() if r is None: return None raw = await r.get(claim_cache_key(claim_hash)) return orjson.loads(raw) if raw else None except Exception: return None async def cache_set(claim_hash: str, result: dict[str, Any]) -> None: try: r = await get_redis() if r is None: return color = result.get("color", "yellow") ttl = ( settings.cache_ttl_green_red if color in ("green", "red") else settings.cache_ttl_yellow if color == "yellow" else None # purple — never cache ) if ttl is not None: await r.setex( claim_cache_key(claim_hash), ttl, orjson.dumps(result).decode(), ) except Exception as exc: log.warning("cache.set_failed", error=str(exc)) # ── WebSocket Connection Manager ────────────────────────────────────────────── class ConnectionManager: """Thread-safe registry of active WebSocket connections.""" def __init__(self) -> None: self._connections: dict[str, WebSocket] = {} self._lock = asyncio.Lock() async def connect(self, ws: WebSocket, client_id: str) -> None: await ws.accept() async with self._lock: self._connections[client_id] = ws log.info("ws.connected", client_id=client_id, total=len(self._connections)) async def disconnect(self, client_id: str) -> None: async with self._lock: self._connections.pop(client_id, None) log.info("ws.disconnected", client_id=client_id, total=len(self._connections)) async def send(self, client_id: str, payload: dict[str, Any]) -> None: async with self._lock: ws = self._connections.get(client_id) if ws: try: await ws.send_text(orjson.dumps(payload).decode()) except Exception as exc: log.warning("ws.send_failed", client_id=client_id, error=str(exc)) await self.disconnect(client_id) async def broadcast(self, payload: dict[str, Any]) -> None: async with self._lock: targets = list(self._connections.items()) await asyncio.gather( *[ws.send_text(orjson.dumps(payload).decode()) for _, ws in targets], return_exceptions=True, ) @property def count(self) -> int: return len(self._connections) manager = ConnectionManager() # ── Request/Response models ─────────────────────────────────────────────────── class AnalysisBatch(BaseModel): """Incoming batch from the browser extension.""" client_id: str claims: list[str] = Field(..., min_length=1, max_length=20) platform: str = Field(default="web") # x, instagram, youtube, chatgpt, claude, gemini, web timestamp: float = Field(default_factory=time.time) class AnalysisResult(BaseModel): """Outgoing result per-claim.""" claim_hash: str claim_text: str color: str # green | yellow | red | purple confidence: int # 0–100 verdict: str explanation: str sources: list[str] trust_score: float cached: bool = False processing_ms: float = 0.0 # ── Lifespan ────────────────────────────────────────────────────────────────── @asynccontextmanager async def lifespan(_app: FastAPI): log.info("startup", version="1.0.0") await get_redis() yield log.info("shutdown") if redis_client: await redis_client.aclose() # ── FastAPI app ─────────────────────────────────────────────────────────────── app = FastAPI( title="Fact & Hallucination Intelligence Engine", version="1.0.0", lifespan=lifespan, ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/", response_class=HTMLResponse) async def root(): return HTMLResponse("""
System Online
WebSocket: ws://[host]/ws/{client_id}
Health: /health