""" fastapi_server.py — Production-grade FastAPI server for Proofly ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Run with: uvicorn fastapi_server:app --host 0.0.0.0 --port 8000 --workers 4 For development (auto-reload): uvicorn fastapi_server:app --reload --port 8000 Dependencies (add to requirements.txt): fastapi uvicorn[standard] python-jose[cryptography] motor # Async MongoDB passlib[bcrypt] python-multipart jinja2 slowapi # Rate limiting python-dotenv certifi Architecture Notes ────────────────── • Routes mirror Flask app.py 1-for-1 so HTML templates are reused unchanged. • Motor (async pymongo) replaces the sync pymongo driver. • JSON Web Tokens issued as HttpOnly cookies via python-jose. • Rate limiting via slowapi (identical semantics to Flask-Limiter). • Lifespan event handles DB index creation at startup. • Static files served by FastAPI StaticFiles mount. """ import os import logging import re as _re from contextlib import asynccontextmanager from datetime import datetime, timedelta, timezone from functools import wraps from typing import Optional from fastapi import ( FastAPI, Request, Response, Form, Cookie, UploadFile, File, Depends, HTTPException, status ) from fastapi.responses import JSONResponse, RedirectResponse, HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from jose import JWTError, jwt from passlib.context import CryptContext from dotenv import load_dotenv # Optional — install slowapi for rate limiting try: from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded limiter = Limiter(key_func=get_remote_address) HAS_LIMITER = True except ImportError: limiter = None HAS_LIMITER = False print("[WARN] slowapi not installed — rate limiting disabled. pip install slowapi") load_dotenv() # ── Config ───────────────────────────────────────────────────────────────────── SECRET_KEY = os.getenv("JWT_SECRET_KEY", "change-this-jwt-secret") ALGORITHM = "HS256" ACCESS_TOKEN_MINS = int(os.getenv("JWT_ACCESS_TOKEN_MINS", "15")) REFRESH_TOKEN_DAYS = int(os.getenv("JWT_REFRESH_TOKEN_DAYS", "7")) BCRYPT_PEPPER = os.getenv("BCRYPT_PEPPER", "") MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/") MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "factcheck") pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto") # ── Logging ──────────────────────────────────────────────────────────────────── class _PrivacyFilter(logging.Filter): _PATTERNS = [ _re.compile(r'[\w.+-]+@[\w-]+\.[a-z]{2,}', _re.I), _re.compile(r'(?i)(password|passwd|secret|token|pepper)\s*[=:]\s*\S+'), ] def filter(self, record): msg = str(record.getMessage()) for pat in self._PATTERNS: msg = pat.sub('[REDACTED]', msg) record.msg = msg record.args = () return True logging.basicConfig(filename='app.log', level=logging.INFO) _root = logging.getLogger() _root.addFilter(_PrivacyFilter()) # ── Motor (async MongoDB) ────────────────────────────────────────────────────── try: from motor.motor_asyncio import AsyncIOMotorClient import certifi _motor_client = AsyncIOMotorClient( MONGO_URI, serverSelectionTimeoutMS=5000, tlsCAFile=certifi.where(), tlsAllowInvalidCertificates=True, ) _adb = _motor_client[MONGO_DB_NAME] HAS_MOTOR = True except ImportError: _adb = None HAS_MOTOR = False print("[WARN] motor not installed — DB calls will fail. pip install motor") # ── Lifespan (startup/shutdown) ──────────────────────────────────────────────── @asynccontextmanager async def lifespan(app: FastAPI): """Run DB index creation at startup.""" if HAS_MOTOR and _adb: try: from pymongo import ASCENDING, DESCENDING await _adb.users.create_index([("email", ASCENDING)], unique=True, name="email_unique") await _adb.history.create_index([("user_id", ASCENDING), ("created_at", DESCENDING)], name="user_history_idx") await _adb.revoked_tokens.create_index([("exp", ASCENDING)], expireAfterSeconds=0, name="token_ttl") await _adb.cached_results.create_index([("normalized_claim", ASCENDING)], unique=True, name="claim_cache_idx") logging.info("[DB] MongoDB indexes ensured.") except Exception as e: logging.warning(f"[DB] Index creation warning: {e}") yield # Shutdown — close motor connection if HAS_MOTOR: _motor_client.close() # ── App ──────────────────────────────────────────────────────────────────────── app = FastAPI(title="Proofly API", lifespan=lifespan) if HAS_LIMITER: app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") # ── JWT Helpers ──────────────────────────────────────────────────────────────── def _create_token(data: dict, expires_delta: timedelta) -> str: payload = data.copy() payload["exp"] = datetime.now(timezone.utc) + expires_delta return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def create_access_token(user_id: str, username: str, is_admin: bool) -> str: return _create_token( {"sub": user_id, "username": username, "is_admin": is_admin}, timedelta(minutes=ACCESS_TOKEN_MINS) ) def create_refresh_token(user_id: str) -> str: return _create_token({"sub": user_id, "type": "refresh"}, timedelta(days=REFRESH_TOKEN_DAYS)) def _decode_token(token: str) -> Optional[dict]: try: return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: return None def _set_token_cookies(response: Response, access_token: str, refresh_token: str): opts = dict(httponly=True, samesite="strict", secure=False) # secure=True in production response.set_cookie("access_token_cookie", access_token, max_age=ACCESS_TOKEN_MINS * 60, **opts) response.set_cookie("refresh_token_cookie", refresh_token, max_age=REFRESH_TOKEN_DAYS * 86400, **opts) def _unset_token_cookies(response: Response): response.delete_cookie("access_token_cookie") response.delete_cookie("refresh_token_cookie") # ── Current User Dependency ──────────────────────────────────────────────────── class CurrentUser: def __init__(self, user_id=None, username=None, is_admin=False): self.user_id = user_id self.username = username self.is_admin = is_admin async def get_current_user( access_token_cookie: Optional[str] = Cookie(default=None) ) -> CurrentUser: if not access_token_cookie: return CurrentUser() payload = _decode_token(access_token_cookie) if not payload: return CurrentUser() # Check revoked if HAS_MOTOR and _adb: revoked = await _adb.revoked_tokens.find_one({"jti": payload.get("jti")}) if revoked: return CurrentUser() return CurrentUser( user_id = payload.get("sub"), username = payload.get("username", "User"), is_admin = payload.get("is_admin", False), ) def require_auth(user: CurrentUser = Depends(get_current_user)) -> CurrentUser: if not user.user_id: raise HTTPException(status_code=302, headers={"Location": "/login"}) return user def require_admin(user: CurrentUser = Depends(get_current_user)) -> CurrentUser: if not user.user_id or not user.is_admin: raise HTTPException(status_code=403, detail="Admin access required.") return user # ── Password Helpers ─────────────────────────────────────────────────────────── def _pepper(pw: str) -> str: return pw + BCRYPT_PEPPER def hash_pw(pw: str) -> str: return pwd_ctx.hash(_pepper(pw)) def verify_pw(pw: str, hashed: str) -> bool: return pwd_ctx.verify(_pepper(pw), hashed) # ── Auth Routes ──────────────────────────────────────────────────────────────── @app.get("/register", response_class=HTMLResponse) async def register_page(request: Request): return templates.TemplateResponse("register.html", {"request": request}) @app.post("/register", response_class=HTMLResponse) async def register( request: Request, username: str = Form(...), email: str = Form(...), password: str = Form(...), confirm_password: str = Form(...), ): errs = [] if not all([username, email, password]): errs.append("All fields are required.") if password != confirm_password: errs.append("Passwords do not match.") if len(password) < 6: errs.append("Password must be at least 6 characters.") if not errs and HAS_MOTOR: existing = await _adb.users.find_one({"email": email.lower()}) if existing: errs.append("An account with that email already exists.") if errs: return templates.TemplateResponse("register.html", {"request": request, "errors": errs}) is_admin = (await _adb.users.count_documents({})) == 0 if HAS_MOTOR else False pw_hash = hash_pw(password) if HAS_MOTOR: await _adb.users.insert_one({ "username": username, "email": email.lower(), "password_hash": pw_hash, "is_admin": is_admin, "created_at": datetime.now(timezone.utc) }) return RedirectResponse("/login?registered=1", status_code=303) @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request): return templates.TemplateResponse("login.html", {"request": request}) @app.post("/login") async def login( request: Request, email: str = Form(...), password: str = Form(...), ): if not HAS_MOTOR: return templates.TemplateResponse("login.html", {"request": request, "error": "DB unavailable."}) user = await _adb.users.find_one({"email": email.lower()}) if not user or not verify_pw(password, user["password_hash"]): return templates.TemplateResponse("login.html", {"request": request, "error": "Invalid credentials."}) uid = str(user["_id"]) at = create_access_token(uid, user["username"], user.get("is_admin", False)) rt = create_refresh_token(uid) resp = RedirectResponse("/", status_code=303) _set_token_cookies(resp, at, rt) return resp @app.get("/logout") @app.post("/logout") async def logout(): resp = RedirectResponse("/login", status_code=303) _unset_token_cookies(resp) return resp # ── Main Routes ──────────────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def index(request: Request, user: CurrentUser = Depends(require_auth)): return templates.TemplateResponse("index.html", {"request": request, "g": user}) @app.post("/check") async def check_claim( request: Request, claim: str = Form(...), user: CurrentUser = Depends(require_auth), ): from api_wrapper import run_fact_check_api claim = claim.strip() if not claim: return JSONResponse({"success": False, "error": "Claim cannot be empty"}, status_code=400) # Cache lookup norm = claim.strip().lower() cached = None if HAS_MOTOR: doc = await _adb.cached_results.find_one({"normalized_claim": norm}) if doc: cached = doc.get("result") result = cached or run_fact_check_api(claim) if result.get("success"): if not cached and HAS_MOTOR: await _adb.cached_results.update_one( {"normalized_claim": norm}, {"$set": {"result": result, "updated_at": datetime.now(timezone.utc)}, "$setOnInsert": {"created_at": datetime.now(timezone.utc)}}, upsert=True ) if HAS_MOTOR: await _adb.history.insert_one({ "user_id": user.user_id, "claim": claim, "verdict": result.get("verdict", "Unknown"), "confidence": result.get("confidence", 0.0), "evidence_count": result.get("total_evidence", 0), "created_at": datetime.now(timezone.utc), }) return JSONResponse(result) @app.get("/history", response_class=HTMLResponse) async def history(request: Request, user: CurrentUser = Depends(require_auth)): records = [] if HAS_MOTOR: from pymongo import DESCENDING records = await _adb.history.find( {"user_id": user.user_id} ).sort("created_at", DESCENDING).limit(50).to_list(50) return templates.TemplateResponse("history.html", {"request": request, "g": user, "records": records}) @app.get("/results", response_class=HTMLResponse) async def results(request: Request, user: CurrentUser = Depends(require_auth)): # Results are stored in session in Flask; in FastAPI we redirect to / if empty return RedirectResponse("/") @app.post("/ocr") async def ocr_image(image: UploadFile = File(...), user: CurrentUser = Depends(require_auth)): try: import easyocr, numpy as np from PIL import Image import io image_bytes = await image.read() img = Image.open(io.BytesIO(image_bytes)).convert('RGB') reader = easyocr.Reader(['en'], gpu=False) text = ' '.join([r[1] for r in reader.readtext(np.array(img))]).strip() return JSONResponse({"success": True, "text": text}) except ImportError: return JSONResponse({"success": False, "error": "OCR library not installed."}, status_code=500) except Exception: return JSONResponse({"success": False, "error": "Could not process image."}, status_code=500) # ── Admin Routes ─────────────────────────────────────────────────────────────── @app.get("/admin", response_class=HTMLResponse) async def admin_dashboard(request: Request, user: CurrentUser = Depends(require_admin)): from project.database import get_system_stats, get_global_history, list_all_users stats = get_system_stats() history = get_global_history(limit=20) users = list_all_users(limit=10) return templates.TemplateResponse("admin.html", { "request": request, "g": user, "stats": stats, "history": history, "users": users }) @app.get("/admin/users", response_class=HTMLResponse) async def admin_users(request: Request, user: CurrentUser = Depends(require_admin)): from project.database import list_all_users users = list_all_users(limit=200) return templates.TemplateResponse("admin_users.html", {"request": request, "g": user, "users": users}) @app.get("/admin/logs", response_class=HTMLResponse) async def admin_logs(request: Request, user: CurrentUser = Depends(require_admin)): from project.database import get_global_history history = get_global_history(limit=500) return templates.TemplateResponse("admin_logs.html", {"request": request, "g": user, "history": history}) # ── API Misc ─────────────────────────────────────────────────────────────────── @app.get("/api/suggested_facts") async def suggested_facts(): import random from knowledge_base import KNOWLEDGE_BASE facts = random.sample(KNOWLEDGE_BASE, min(3, len(KNOWLEDGE_BASE))) return JSONResponse({"success": True, "facts": [f["text"] for f in facts]}) # ── Error Handlers ───────────────────────────────────────────────────────────── @app.exception_handler(404) async def not_found(request: Request, exc): return JSONResponse({"error": "Not found"}, status_code=404) @app.exception_handler(500) async def server_error(request: Request, exc): return JSONResponse({"error": "Internal server error"}, status_code=500) # ── Dev Server Entry Point (python fastapi_server.py) ───────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run("fastapi_server:app", host="0.0.0.0", port=8000, reload=True)