""" Authentication utilities — password hashing, JWT creation / verification. """ import os import hashlib import secrets from datetime import datetime, timedelta, timezone from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError, jwt # ── Configuration ──────────────────────────────────────────────────── SECRET_KEY = os.getenv("JWT_SECRET_KEY", "factcheck-thesis-secret-key-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days # ── Password hashing (SHA-256 + salt) ──────────────────────────────── # ── Bearer token scheme ───────────────────────────────────────────── bearer_scheme = HTTPBearer(auto_error=False) def hash_password(password: str) -> str: """Hash a plaintext password using SHA-256 with a random salt.""" salt = secrets.token_hex(16) hashed = hashlib.sha256((salt + password).encode()).hexdigest() return f"{salt}${hashed}" def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a plaintext password against its salted SHA-256 hash.""" try: salt, stored_hash = hashed_password.split("$", 1) test_hash = hashlib.sha256((salt + plain_password).encode()).hexdigest() return test_hash == stored_hash except (ValueError, AttributeError): return False def create_access_token(user_id: int, email: str) -> str: """Create a signed JWT with the user's id and email in the payload.""" expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) payload = { "sub": str(user_id), "email": email, "exp": expire, } return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) def decode_access_token(token: str) -> dict: """ Decode and validate a JWT. Returns the payload dict or raises an HTTPException. """ try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token.", ) # ── FastAPI dependencies ───────────────────────────────────────────── async def get_current_user( credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme), ) -> Optional[dict]: """ Dependency that extracts the current user from the Authorization header. Returns the decoded payload dict if a valid token is present, or None if no token is provided or if the token is invalid/expired. """ if credentials is None: return None try: return decode_access_token(credentials.credentials) except HTTPException: return None async def require_current_user( credentials: HTTPAuthorizationCredentials = Depends(HTTPBearer()), ) -> dict: """ Stricter version — always requires a valid token. Raises 401 if missing or invalid. """ return decode_access_token(credentials.credentials)