proofly / fastapi_server.py
Pragthedon's picture
Initial backend API deployment
4f48a4e
"""
fastapi_server.py β€” Production-grade FastAPI server for Proofly
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Run with:
uvicorn fastapi_server:app --host 0.0.0.0 --port 8000 --workers 4
For development (auto-reload):
uvicorn fastapi_server:app --reload --port 8000
Dependencies (add to requirements.txt):
fastapi
uvicorn[standard]
python-jose[cryptography]
motor # Async MongoDB
passlib[bcrypt]
python-multipart
jinja2
slowapi # Rate limiting
python-dotenv
certifi
Architecture Notes
──────────────────
β€’ Routes mirror Flask app.py 1-for-1 so HTML templates are reused unchanged.
β€’ Motor (async pymongo) replaces the sync pymongo driver.
β€’ JSON Web Tokens issued as HttpOnly cookies via python-jose.
β€’ Rate limiting via slowapi (identical semantics to Flask-Limiter).
β€’ Lifespan event handles DB index creation at startup.
β€’ Static files served by FastAPI StaticFiles mount.
"""
import os
import logging
import re as _re
from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
from functools import wraps
from typing import Optional
from fastapi import (
FastAPI, Request, Response, Form, Cookie, UploadFile, File,
Depends, HTTPException, status
)
from fastapi.responses import JSONResponse, RedirectResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from jose import JWTError, jwt
from passlib.context import CryptContext
from dotenv import load_dotenv
# Optional β€” install slowapi for rate limiting
try:
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
HAS_LIMITER = True
except ImportError:
limiter = None
HAS_LIMITER = False
print("[WARN] slowapi not installed β€” rate limiting disabled. pip install slowapi")
load_dotenv()
# ── Config ─────────────────────────────────────────────────────────────────────
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "change-this-jwt-secret")
ALGORITHM = "HS256"
ACCESS_TOKEN_MINS = int(os.getenv("JWT_ACCESS_TOKEN_MINS", "15"))
REFRESH_TOKEN_DAYS = int(os.getenv("JWT_REFRESH_TOKEN_DAYS", "7"))
BCRYPT_PEPPER = os.getenv("BCRYPT_PEPPER", "")
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "factcheck")
pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ── Logging ────────────────────────────────────────────────────────────────────
class _PrivacyFilter(logging.Filter):
_PATTERNS = [
_re.compile(r'[\w.+-]+@[\w-]+\.[a-z]{2,}', _re.I),
_re.compile(r'(?i)(password|passwd|secret|token|pepper)\s*[=:]\s*\S+'),
]
def filter(self, record):
msg = str(record.getMessage())
for pat in self._PATTERNS:
msg = pat.sub('[REDACTED]', msg)
record.msg = msg
record.args = ()
return True
logging.basicConfig(filename='app.log', level=logging.INFO)
_root = logging.getLogger()
_root.addFilter(_PrivacyFilter())
# ── Motor (async MongoDB) ──────────────────────────────────────────────────────
try:
from motor.motor_asyncio import AsyncIOMotorClient
import certifi
_motor_client = AsyncIOMotorClient(
MONGO_URI,
serverSelectionTimeoutMS=5000,
tlsCAFile=certifi.where(),
tlsAllowInvalidCertificates=True,
)
_adb = _motor_client[MONGO_DB_NAME]
HAS_MOTOR = True
except ImportError:
_adb = None
HAS_MOTOR = False
print("[WARN] motor not installed β€” DB calls will fail. pip install motor")
# ── Lifespan (startup/shutdown) ────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Run DB index creation at startup."""
if HAS_MOTOR and _adb:
try:
from pymongo import ASCENDING, DESCENDING
await _adb.users.create_index([("email", ASCENDING)], unique=True, name="email_unique")
await _adb.history.create_index([("user_id", ASCENDING), ("created_at", DESCENDING)], name="user_history_idx")
await _adb.revoked_tokens.create_index([("exp", ASCENDING)], expireAfterSeconds=0, name="token_ttl")
await _adb.cached_results.create_index([("normalized_claim", ASCENDING)], unique=True, name="claim_cache_idx")
logging.info("[DB] MongoDB indexes ensured.")
except Exception as e:
logging.warning(f"[DB] Index creation warning: {e}")
yield
# Shutdown β€” close motor connection
if HAS_MOTOR:
_motor_client.close()
# ── App ────────────────────────────────────────────────────────────────────────
app = FastAPI(title="Proofly API", lifespan=lifespan)
if HAS_LIMITER:
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# ── JWT Helpers ────────────────────────────────────────────────────────────────
def _create_token(data: dict, expires_delta: timedelta) -> str:
payload = data.copy()
payload["exp"] = datetime.now(timezone.utc) + expires_delta
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def create_access_token(user_id: str, username: str, is_admin: bool) -> str:
return _create_token(
{"sub": user_id, "username": username, "is_admin": is_admin},
timedelta(minutes=ACCESS_TOKEN_MINS)
)
def create_refresh_token(user_id: str) -> str:
return _create_token({"sub": user_id, "type": "refresh"}, timedelta(days=REFRESH_TOKEN_DAYS))
def _decode_token(token: str) -> Optional[dict]:
try:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
return None
def _set_token_cookies(response: Response, access_token: str, refresh_token: str):
opts = dict(httponly=True, samesite="strict", secure=False) # secure=True in production
response.set_cookie("access_token_cookie", access_token, max_age=ACCESS_TOKEN_MINS * 60, **opts)
response.set_cookie("refresh_token_cookie", refresh_token, max_age=REFRESH_TOKEN_DAYS * 86400, **opts)
def _unset_token_cookies(response: Response):
response.delete_cookie("access_token_cookie")
response.delete_cookie("refresh_token_cookie")
# ── Current User Dependency ────────────────────────────────────────────────────
class CurrentUser:
def __init__(self, user_id=None, username=None, is_admin=False):
self.user_id = user_id
self.username = username
self.is_admin = is_admin
async def get_current_user(
access_token_cookie: Optional[str] = Cookie(default=None)
) -> CurrentUser:
if not access_token_cookie:
return CurrentUser()
payload = _decode_token(access_token_cookie)
if not payload:
return CurrentUser()
# Check revoked
if HAS_MOTOR and _adb:
revoked = await _adb.revoked_tokens.find_one({"jti": payload.get("jti")})
if revoked:
return CurrentUser()
return CurrentUser(
user_id = payload.get("sub"),
username = payload.get("username", "User"),
is_admin = payload.get("is_admin", False),
)
def require_auth(user: CurrentUser = Depends(get_current_user)) -> CurrentUser:
if not user.user_id:
raise HTTPException(status_code=302, headers={"Location": "/login"})
return user
def require_admin(user: CurrentUser = Depends(get_current_user)) -> CurrentUser:
if not user.user_id or not user.is_admin:
raise HTTPException(status_code=403, detail="Admin access required.")
return user
# ── Password Helpers ───────────────────────────────────────────────────────────
def _pepper(pw: str) -> str: return pw + BCRYPT_PEPPER
def hash_pw(pw: str) -> str: return pwd_ctx.hash(_pepper(pw))
def verify_pw(pw: str, hashed: str) -> bool: return pwd_ctx.verify(_pepper(pw), hashed)
# ── Auth Routes ────────────────────────────────────────────────────────────────
@app.get("/register", response_class=HTMLResponse)
async def register_page(request: Request):
return templates.TemplateResponse("register.html", {"request": request})
@app.post("/register", response_class=HTMLResponse)
async def register(
request: Request,
username: str = Form(...),
email: str = Form(...),
password: str = Form(...),
confirm_password: str = Form(...),
):
errs = []
if not all([username, email, password]):
errs.append("All fields are required.")
if password != confirm_password:
errs.append("Passwords do not match.")
if len(password) < 6:
errs.append("Password must be at least 6 characters.")
if not errs and HAS_MOTOR:
existing = await _adb.users.find_one({"email": email.lower()})
if existing:
errs.append("An account with that email already exists.")
if errs:
return templates.TemplateResponse("register.html", {"request": request, "errors": errs})
is_admin = (await _adb.users.count_documents({})) == 0 if HAS_MOTOR else False
pw_hash = hash_pw(password)
if HAS_MOTOR:
await _adb.users.insert_one({
"username": username, "email": email.lower(),
"password_hash": pw_hash, "is_admin": is_admin,
"created_at": datetime.now(timezone.utc)
})
return RedirectResponse("/login?registered=1", status_code=303)
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login")
async def login(
request: Request,
email: str = Form(...),
password: str = Form(...),
):
if not HAS_MOTOR:
return templates.TemplateResponse("login.html", {"request": request, "error": "DB unavailable."})
user = await _adb.users.find_one({"email": email.lower()})
if not user or not verify_pw(password, user["password_hash"]):
return templates.TemplateResponse("login.html", {"request": request, "error": "Invalid credentials."})
uid = str(user["_id"])
at = create_access_token(uid, user["username"], user.get("is_admin", False))
rt = create_refresh_token(uid)
resp = RedirectResponse("/", status_code=303)
_set_token_cookies(resp, at, rt)
return resp
@app.get("/logout")
@app.post("/logout")
async def logout():
resp = RedirectResponse("/login", status_code=303)
_unset_token_cookies(resp)
return resp
# ── Main Routes ────────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
async def index(request: Request, user: CurrentUser = Depends(require_auth)):
return templates.TemplateResponse("index.html", {"request": request, "g": user})
@app.post("/check")
async def check_claim(
request: Request,
claim: str = Form(...),
user: CurrentUser = Depends(require_auth),
):
from api_wrapper import run_fact_check_api
claim = claim.strip()
if not claim:
return JSONResponse({"success": False, "error": "Claim cannot be empty"}, status_code=400)
# Cache lookup
norm = claim.strip().lower()
cached = None
if HAS_MOTOR:
doc = await _adb.cached_results.find_one({"normalized_claim": norm})
if doc:
cached = doc.get("result")
result = cached or run_fact_check_api(claim)
if result.get("success"):
if not cached and HAS_MOTOR:
await _adb.cached_results.update_one(
{"normalized_claim": norm},
{"$set": {"result": result, "updated_at": datetime.now(timezone.utc)},
"$setOnInsert": {"created_at": datetime.now(timezone.utc)}},
upsert=True
)
if HAS_MOTOR:
await _adb.history.insert_one({
"user_id": user.user_id,
"claim": claim,
"verdict": result.get("verdict", "Unknown"),
"confidence": result.get("confidence", 0.0),
"evidence_count": result.get("total_evidence", 0),
"created_at": datetime.now(timezone.utc),
})
return JSONResponse(result)
@app.get("/history", response_class=HTMLResponse)
async def history(request: Request, user: CurrentUser = Depends(require_auth)):
records = []
if HAS_MOTOR:
from pymongo import DESCENDING
records = await _adb.history.find(
{"user_id": user.user_id}
).sort("created_at", DESCENDING).limit(50).to_list(50)
return templates.TemplateResponse("history.html", {"request": request, "g": user, "records": records})
@app.get("/results", response_class=HTMLResponse)
async def results(request: Request, user: CurrentUser = Depends(require_auth)):
# Results are stored in session in Flask; in FastAPI we redirect to / if empty
return RedirectResponse("/")
@app.post("/ocr")
async def ocr_image(image: UploadFile = File(...), user: CurrentUser = Depends(require_auth)):
try:
import easyocr, numpy as np
from PIL import Image
import io
image_bytes = await image.read()
img = Image.open(io.BytesIO(image_bytes)).convert('RGB')
reader = easyocr.Reader(['en'], gpu=False)
text = ' '.join([r[1] for r in reader.readtext(np.array(img))]).strip()
return JSONResponse({"success": True, "text": text})
except ImportError:
return JSONResponse({"success": False, "error": "OCR library not installed."}, status_code=500)
except Exception:
return JSONResponse({"success": False, "error": "Could not process image."}, status_code=500)
# ── Admin Routes ───────────────────────────────────────────────────────────────
@app.get("/admin", response_class=HTMLResponse)
async def admin_dashboard(request: Request, user: CurrentUser = Depends(require_admin)):
from project.database import get_system_stats, get_global_history, list_all_users
stats = get_system_stats()
history = get_global_history(limit=20)
users = list_all_users(limit=10)
return templates.TemplateResponse("admin.html", {
"request": request, "g": user,
"stats": stats, "history": history, "users": users
})
@app.get("/admin/users", response_class=HTMLResponse)
async def admin_users(request: Request, user: CurrentUser = Depends(require_admin)):
from project.database import list_all_users
users = list_all_users(limit=200)
return templates.TemplateResponse("admin_users.html", {"request": request, "g": user, "users": users})
@app.get("/admin/logs", response_class=HTMLResponse)
async def admin_logs(request: Request, user: CurrentUser = Depends(require_admin)):
from project.database import get_global_history
history = get_global_history(limit=500)
return templates.TemplateResponse("admin_logs.html", {"request": request, "g": user, "history": history})
# ── API Misc ───────────────────────────────────────────────────────────────────
@app.get("/api/suggested_facts")
async def suggested_facts():
import random
from knowledge_base import KNOWLEDGE_BASE
facts = random.sample(KNOWLEDGE_BASE, min(3, len(KNOWLEDGE_BASE)))
return JSONResponse({"success": True, "facts": [f["text"] for f in facts]})
# ── Error Handlers ─────────────────────────────────────────────────────────────
@app.exception_handler(404)
async def not_found(request: Request, exc):
return JSONResponse({"error": "Not found"}, status_code=404)
@app.exception_handler(500)
async def server_error(request: Request, exc):
return JSONResponse({"error": "Internal server error"}, status_code=500)
# ── Dev Server Entry Point (python fastapi_server.py) ─────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run("fastapi_server:app", host="0.0.0.0", port=8000, reload=True)