FactCheckAI / main.py
vishwjeet71's picture
Fix HTTPS proxy headers for static files
1f04329
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, field_validator
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
from prototype import Prototype
# ── Logging ───────────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s"
)
logger = logging.getLogger(__name__)
# ── Constants ─────────────────────────────────────────────────────────────────
# Pipeline is I/O-heavy (scraping + LLM calls). 150 s is generous but not infinite.
# If it hasn't returned by then, tell the user to retry rather than hanging forever.
PIPELINE_TIMEOUT_SECONDS = 150
# ThreadPoolExecutor lets us run the blocking Prototype.run() without freezing
# FastAPI's async event loop. max_workers=4 handles 4 simultaneous requests.
THREAD_POOL = ThreadPoolExecutor(max_workers=4)
# ── App ───────────────────────────────────────────────────────────────────────
app = FastAPI(
title="FactCheck AI",
description="AI-powered news fact-checking and trust analysis",
version="1.1.0",
)
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# ── Request / Response Models ─────────────────────────────────────────────────
class ChatRequest(BaseModel):
text: str
inputType: str # "query" | "article_link"
searchApiKey: str # user's search API key
searchProvider: str # "serper.dev" | "gnews" | "publicapi.dev"
llmApiKey: str # user's LLM API key
llmProvider: str # "anthropic" | "openai" | "google" | "groq"
@field_validator("text")
@classmethod
def text_not_empty(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("text must not be empty")
if len(v) > 5000:
raise ValueError("text exceeds 5 000 characters")
return v
@field_validator("inputType")
@classmethod
def valid_input_type(cls, v: str) -> str:
allowed = {"query", "article_link"}
if v not in allowed:
raise ValueError(f"inputType must be one of {allowed}")
return v
@field_validator("llmProvider")
@classmethod
def valid_llm_provider(cls, v: str) -> str:
allowed = {"anthropic", "openai", "google", "groq"}
if v not in allowed:
raise ValueError(f"llmProvider must be one of {allowed}")
return v
class ChatResponse(BaseModel):
response: str
inputType: str
# ── Routes ────────────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def homepage(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/chat", response_model=ChatResponse)
async def chat(payload: ChatRequest):
"""
Runs the Prototype pipeline end-to-end and streams the LLM explanation
back to the frontend as a plain string inside ChatResponse.
Flow:
1. Build Prototype with the user-supplied search API key.
key_provider is always "superdev" β€” that is the internal name used by
Apicaller regardless of which search provider the user picked in the UI.
2. Run the blocking pipeline in a thread-pool so FastAPI stays responsive.
3. Enforce a 150-second timeout. If it fires, return a user-friendly
"please try again" message instead of hanging the request forever.
4. Catch any unexpected exception and return a readable error message.
The pipeline itself already converts most internal errors into LLM
explanations, so this outer catch is only a safety net.
"""
logger.info(
"Chat request | type=%s | llm=%s | search=%s | text=%.120s",
payload.inputType,
payload.llmProvider,
payload.searchProvider,
payload.text,
)
# Prototype's internal search provider name is always "superdev".
# The UI label ("serper.dev", "gnews", etc.) is only for display;
# the actual API call routing is done inside Apicaller.
KEY_PROVIDER = "superdev"
pipeline = Prototype(payload.searchApiKey, KEY_PROVIDER)
def _run_pipeline() -> str:
"""Blocking call β€” executed in the thread pool."""
return pipeline.run(
payload.text,
payload.inputType,
llm_api_key=payload.llmApiKey,
llm_provider=payload.llmProvider,
)
loop = asyncio.get_event_loop()
try:
result: str = await asyncio.wait_for(
loop.run_in_executor(THREAD_POOL, _run_pipeline),
timeout=PIPELINE_TIMEOUT_SECONDS,
)
logger.info("Pipeline finished successfully | type=%s", payload.inputType)
return ChatResponse(response=result, inputType=payload.inputType)
except asyncio.TimeoutError:
logger.warning(
"Pipeline timed out after %ds | type=%s | text=%.80s",
PIPELINE_TIMEOUT_SECONDS,
payload.inputType,
payload.text,
)
return ChatResponse(
response=(
"⏱️ Analysis timed out.\n\n"
"The pipeline ran for over 2.5 minutes without finishing. "
"This usually happens when sources are slow to load or the claim is very broad.\n\n"
"Please try again with a shorter or more specific query."
),
inputType=payload.inputType,
)
except Exception as exc:
# This fires only if something completely unexpected blows up β€”
# the pipeline's own error handling converts most failures into
# user-readable LLM explanations before reaching this point.
logger.exception("Unhandled exception in pipeline: %s", exc)
return ChatResponse(
response=(
"⚠️ Something unexpected went wrong.\n\n"
"The analysis could not complete. Please check that your API keys "
"are correct and try again.\n\n"
f"Technical detail: {str(exc)[:200]}"
),
inputType=payload.inputType,
)
# ── Health / Info ─────────────────────────────────────────────────────────────
@app.get("/health")
async def health():
return {"status": "ok", "service": "FactCheck AI", "version": "1.1.0"}
@app.get("/about")
async def about():
return {
"name": "FactCheck AI",
"description": "AI-powered news verification and trust analysis",
"version": "1.1.0",
"modes": ["query verification", "article URL analysis"],
}