| """ |
| app.py β Single entry point for HuggingFace Spaces. |
| |
| Run with: |
| uv run python app.py β HuggingFace Spaces / production |
| uv run uvicorn app:app --reload β local dev |
| |
| Lifecycle on startup: |
| 1. Configures structured logging |
| 2. Waits for Redis / Qdrant / Memgraph to be healthy (skipped in DEMO_MODE) |
| 3. Initialises Qdrant collection + Memgraph schema |
| 4. Seeds demo evidence chunks into Qdrant |
| 5. Warms up BGE-M3 embedder in the background |
| 6. Serves FastAPI on port 7860 (HuggingFace default) |
| |
| WebSocket message lifecycle (per text segment): |
| 1. Extension sends TextBatch β Redis cache check (xxhash key) |
| 2. Cache miss β Gatekeeper (Groq llama3-8b, <120 ms p95) |
| 3. Noise β dropped. Fact β continue |
| 4. Concurrent: RAG pipeline (BGE-M3 + Qdrant + Memgraph) + Grok sensor |
| 5. Prefect flow: misinformation agent + hallucination agent (both Groq, free) |
| 6. AnalysisResult cached in Redis (TTL: 6 h green/red, 15 min yellow, no-cache purple) |
| 7. Result streamed back over WebSocket β extension applies DOM highlight + hover card |
| """ |
|
|
| import asyncio |
| import os |
| import sys |
| import time |
| from contextlib import asynccontextmanager |
| from typing import Any |
|
|
| import orjson |
| import redis.asyncio as aioredis |
| import structlog |
| import xxhash |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import HTMLResponse |
| from pydantic import ValidationError |
|
|
| |
| |
| |
| from core.logging import configure_logging |
| from core.config import HighlightColor, Platform, get_settings |
|
|
| settings = get_settings() |
| configure_logging( |
| log_level=settings.log_level, |
| json_output=os.environ.get("JSON_LOGS", "false").lower() == "true", |
| ) |
| log = structlog.get_logger("app") |
|
|
| |
| |
| |
| from agents import evaluate_claim |
| from core.models import AnalysisResult, GatekeeperResult, TextBatch, WSInbound, WSOutbound |
| from gatekeeper import classify_claim |
| from grok_sensor import query_grok_sensor |
| from rag_pipeline import run_rag_pipeline |
|
|
| |
| |
| |
|
|
| async def _wait_for_redis(url: str, timeout: int = 30) -> bool: |
| deadline = time.time() + timeout |
| while time.time() < deadline: |
| try: |
| r = await aioredis.from_url(url, decode_responses=True) |
| await r.ping() |
| await r.aclose() |
| return True |
| except Exception: |
| await asyncio.sleep(1) |
| return False |
|
|
|
|
| async def _wait_for_qdrant(host: str, port: int, timeout: int = 30) -> bool: |
| import httpx |
| deadline = time.time() + timeout |
| while time.time() < deadline: |
| try: |
| async with httpx.AsyncClient(timeout=2.0) as client: |
| resp = await client.get(f"http://{host}:{port}/readyz") |
| if resp.status_code == 200: |
| return True |
| except Exception: |
| await asyncio.sleep(1) |
| return False |
|
|
|
|
| async def _wait_for_memgraph(host: str, port: int, timeout: int = 30) -> bool: |
| from neo4j import AsyncGraphDatabase |
| deadline = time.time() + timeout |
| while time.time() < deadline: |
| try: |
| driver = AsyncGraphDatabase.driver( |
| f"bolt://{host}:{port}", |
| auth=("", settings.memgraph_password), |
| encrypted=False, |
| ) |
| async with driver.session() as session: |
| await session.run("RETURN 1;") |
| await driver.close() |
| return True |
| except Exception: |
| await asyncio.sleep(2) |
| return False |
|
|
|
|
| |
| |
| |
|
|
| _DEMO_EVIDENCE = [ |
| { |
| "text": "mRNA vaccines demonstrated sustained immune responses lasting 18-24 months across multiple peer-reviewed studies.", |
| "url": "https://www.nejm.org/doi/10.1056/NEJMoa2034577", |
| "domain": "nejm.org", |
| }, |
| { |
| "text": "The Federal Reserve raised interest rates by 75 basis points in June 2022, the largest single hike since 1994.", |
| "url": "https://reuters.com/markets/us/fed-hikes-rates-2022-06-15", |
| "domain": "reuters.com", |
| }, |
| { |
| "text": "Amazon deforestation data showed over 11,000 sq km lost in a single year at record levels.", |
| "url": "https://apnews.com/article/amazon-deforestation-record", |
| "domain": "apnews.com", |
| }, |
| { |
| "text": "The United Nations projects global population will peak around 10.4 billion in the 2080s based on current demographic trends.", |
| "url": "https://www.un.org/development/desa/pd/", |
| "domain": "un.org", |
| }, |
| { |
| "text": "Renewable energy accounted for 30% of global electricity generation in 2023 according to the International Energy Agency.", |
| "url": "https://www.iea.org/reports/renewables-2023", |
| "domain": "iea.org", |
| }, |
| { |
| "text": "Social media use exceeding 3 hours daily correlates with higher anxiety rates in adolescents per multiple longitudinal studies.", |
| "url": "https://jamanetwork.com/journals/jamapediatrics/fullarticle/2767581", |
| "domain": "jamanetwork.com", |
| }, |
| ] |
|
|
|
|
| async def _seed_demo_data() -> None: |
| """Upsert demo evidence chunks into Qdrant so the demo UI returns real RAG results.""" |
| import uuid |
| from qdrant_client.models import PointStruct |
| from rag_pipeline import embed_texts, get_qdrant |
|
|
| log.info("demo.seed.start", count=len(_DEMO_EVIDENCE)) |
| client = await get_qdrant(settings) |
| texts = [e["text"] for e in _DEMO_EVIDENCE] |
| vectors = await embed_texts(texts) |
|
|
| points = [ |
| PointStruct( |
| id=str(uuid.uuid4()), |
| vector=vec, |
| payload={ |
| "text": ev["text"], |
| "source_url": ev["url"], |
| "domain": ev["domain"], |
| "platform": "news", |
| "content_hash": f"demo_{i:04d}", |
| "ingested_at_ts": time.time(), |
| "author_handle": "demo_seed", |
| "bias_rating": "center", |
| }, |
| ) |
| for i, (ev, vec) in enumerate(zip(_DEMO_EVIDENCE, vectors)) |
| ] |
| await client.upsert(collection_name=settings.qdrant_collection, points=points) |
| log.info("demo.seed.complete", count=len(points)) |
|
|
|
|
| |
| |
| |
|
|
| _redis: aioredis.Redis | None = None |
|
|
|
|
| async def get_redis() -> aioredis.Redis: |
| global _redis |
| if _redis is None: |
| _redis = await aioredis.from_url(settings.redis_url, decode_responses=True) |
| return _redis |
|
|
|
|
| |
| |
| |
|
|
| class ConnectionManager: |
| def __init__(self) -> None: |
| self.active: dict[str, WebSocket] = {} |
|
|
| async def connect(self, session_id: str, ws: WebSocket) -> None: |
| await ws.accept() |
| self.active[session_id] = ws |
| log.info("ws.connected", session_id=session_id, total=len(self.active)) |
|
|
| def disconnect(self, session_id: str) -> None: |
| self.active.pop(session_id, None) |
| log.info("ws.disconnected", session_id=session_id, total=len(self.active)) |
|
|
| async def send(self, session_id: str, payload: Any) -> None: |
| ws = self.active.get(session_id) |
| if ws: |
| msg = WSOutbound(type="result", payload=payload) |
| await ws.send_bytes(orjson.dumps(msg.model_dump(mode="json"))) |
|
|
|
|
| manager = ConnectionManager() |
|
|
|
|
| |
| |
| |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| log.info("startup.begin", demo_mode=settings.demo_mode, port=settings.port) |
|
|
| if not settings.demo_mode: |
| |
| log.info("startup.waiting_for_services") |
|
|
| if not await _wait_for_redis(settings.redis_url): |
| log.error("startup.redis.timeout"); sys.exit(1) |
| log.info("startup.redis.ok") |
|
|
| if not await _wait_for_qdrant(settings.qdrant_host, settings.qdrant_port): |
| log.error("startup.qdrant.timeout"); sys.exit(1) |
| log.info("startup.qdrant.ok") |
|
|
| if not await _wait_for_memgraph(settings.memgraph_host, settings.memgraph_port): |
| log.warning("startup.memgraph.timeout β trust scores will use neutral 0.5 fallback") |
| else: |
| log.info("startup.memgraph.ok") |
|
|
| |
| from core.db_init import init_all |
| await init_all(settings) |
|
|
| |
| try: |
| await _seed_demo_data() |
| except Exception as exc: |
| log.warning("startup.seed.failed", error=str(exc)) |
| else: |
| |
| try: |
| r = await get_redis() |
| await r.ping() |
| log.info("startup.redis.ok") |
| except Exception: |
| log.warning("startup.redis.unavailable β cache disabled in demo mode") |
|
|
| |
| async def _warm(): |
| try: |
| from rag_pipeline import embed_texts |
| await embed_texts(["warm up"]) |
| log.info("startup.embedder.warm") |
| except Exception as exc: |
| log.warning("startup.embedder.warn", error=str(exc)) |
|
|
| asyncio.create_task(_warm()) |
| log.info("startup.complete") |
|
|
| yield |
|
|
| |
| if _redis: |
| await _redis.aclose() |
| log.info("shutdown.complete") |
|
|
|
|
| |
| |
| |
|
|
| app = FastAPI( |
| title="Omnichannel Fact & Hallucination Intelligence API", |
| version="1.0.0", |
| description="Near-zero-latency fact-checking and hallucination detection via WebSocket", |
| lifespan=lifespan, |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
| |
| |
|
|
| async def process_segment( |
| text: str, |
| content_hash: str, |
| element_id: str, |
| platform: Platform, |
| ) -> AnalysisResult | None: |
| """ |
| Full pipeline for a single text segment. Returns None if noise. |
| |
| Cache key: verdict:{content_hash} |
| TTL: 6 h β green / red |
| 15 m β yellow |
| none β purple (hallucination results are context-specific) |
| """ |
| |
| try: |
| r = await get_redis() |
| cached_json = await r.get(f"verdict:{content_hash}") |
| if cached_json: |
| result = AnalysisResult.model_validate_json(cached_json) |
| result.cached = True |
| result.element_id = element_id |
| log.debug("cache.hit", hash=content_hash[:8]) |
| return result |
| except Exception: |
| pass |
|
|
| |
| try: |
| gate: GatekeeperResult = await classify_claim(text, settings) |
| except Exception as exc: |
| log.error("gatekeeper.error", error=str(exc)) |
| return None |
|
|
| if gate.label == "noise": |
| log.debug("gatekeeper.noise_dropped", hash=content_hash[:8]) |
| return None |
|
|
| |
| rag_result, grok_result = await asyncio.gather( |
| run_rag_pipeline(text, content_hash, settings), |
| query_grok_sensor(text, content_hash, settings), |
| ) |
|
|
| |
| result: AnalysisResult = await evaluate_claim( |
| claim=text, |
| claim_hash=content_hash, |
| element_id=element_id, |
| platform=platform, |
| rag_result=rag_result, |
| grok_result=grok_result, |
| settings=settings, |
| ) |
|
|
| |
| try: |
| r = await get_redis() |
| if result.color != HighlightColor.PURPLE: |
| ttl = ( |
| settings.cache_ttl_green_red |
| if result.color in (HighlightColor.GREEN, HighlightColor.RED) |
| else settings.cache_ttl_yellow |
| ) |
| await r.setex(f"verdict:{content_hash}", ttl, result.model_dump_json()) |
| except Exception: |
| pass |
|
|
| return result |
|
|
|
|
| |
| |
| |
|
|
| @app.websocket("/ws/{session_id}") |
| async def websocket_endpoint(ws: WebSocket, session_id: str): |
| """ |
| Persistent WebSocket connection from the browser extension. |
| |
| Inbound: { type: "batch", payload: TextBatch } |
| | { type: "ping" } |
| Outbound: { type: "result", payload: AnalysisResult } |
| | { type: "pong" } |
| | { type: "error", payload: { message: str } } |
| | { type: "status", payload: { connected: bool, demo_mode: bool, β¦ } } |
| """ |
| await manager.connect(session_id, ws) |
|
|
| |
| await ws.send_bytes(orjson.dumps( |
| WSOutbound(type="status", payload={ |
| "connected": True, |
| "demo_mode": settings.demo_mode, |
| "has_groq": settings.has_groq, |
| "has_x_api": settings.has_x_api, |
| }).model_dump(mode="json") |
| )) |
|
|
| try: |
| while True: |
| raw = await ws.receive_bytes() |
| envelope = WSInbound.model_validate_json(raw) |
|
|
| if envelope.type == "ping": |
| await ws.send_bytes(orjson.dumps( |
| WSOutbound(type="pong", payload=None).model_dump(mode="json") |
| )) |
| continue |
|
|
| if envelope.type != "batch" or not envelope.payload: |
| continue |
|
|
| try: |
| batch = TextBatch.model_validate(envelope.payload) |
| except ValidationError as exc: |
| await ws.send_bytes(orjson.dumps( |
| WSOutbound(type="error", payload={"message": str(exc)}).model_dump(mode="json") |
| )) |
| continue |
|
|
| |
| async def _process_and_send(segment): |
| t0 = time.perf_counter() |
| result = await process_segment( |
| text=segment.text, |
| content_hash=segment.content_hash, |
| element_id=segment.element_id, |
| platform=batch.platform, |
| ) |
| if result: |
| result.latency_ms = round((time.perf_counter() - t0) * 1000, 2) |
| await manager.send(session_id, result.model_dump(mode="json")) |
|
|
| await asyncio.gather(*[_process_and_send(seg) for seg in batch.segments]) |
|
|
| except WebSocketDisconnect: |
| manager.disconnect(session_id) |
| except Exception as exc: |
| log.error("ws.unexpected_error", session_id=session_id, error=str(exc)) |
| manager.disconnect(session_id) |
|
|
|
|
| |
| |
| |
|
|
| @app.get("/health") |
| async def health(): |
| try: |
| r = await get_redis() |
| redis_ok = await r.ping() |
| except Exception: |
| redis_ok = False |
| return { |
| "status": "ok", |
| "redis": redis_ok, |
| "demo_mode": settings.demo_mode, |
| "version": "1.0.0", |
| } |
|
|
|
|
| @app.get("/metrics") |
| async def metrics(): |
| try: |
| r = await get_redis() |
| cached_verdicts = await r.dbsize() |
| except Exception: |
| cached_verdicts = 0 |
| return { |
| "active_connections": len(manager.active), |
| "cached_verdicts": cached_verdicts, |
| } |
|
|
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def demo_ui(): |
| """Serves the interactive demo UI at the root path (HuggingFace Spaces landing page).""" |
| ui_path = os.path.join(os.path.dirname(__file__), "static", "index.html") |
| if os.path.exists(ui_path): |
| with open(ui_path) as f: |
| return HTMLResponse(f.read()) |
| return HTMLResponse( |
| "<h1>Fact Intelligence API</h1>" |
| "<p>Connect via WebSocket at <code>/ws/{session_id}</code></p>" |
| ) |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run( |
| "app:app", |
| host="0.0.0.0", |
| port=settings.port, |
| log_level=settings.log_level.lower(), |
| access_log=False, |
| ws_ping_interval=20, |
| ws_ping_timeout=60, |
| ) |
|
|