from datetime import datetime, timedelta from typing import Optional import bcrypt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError, jwt from app.core.config import settings bearer_scheme = HTTPBearer() # ── Password helpers ────────────────────────────────────────────── def hash_password(plain: str) -> str: return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode() def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode(), hashed.encode()) # ── JWT helpers ─────────────────────────────────────────────────── def create_token(username: str) -> str: expire = datetime.utcnow() + timedelta(days=settings.JWT_EXPIRE_DAYS) payload = {"sub": username, "exp": expire} return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM) def decode_token(token: str) -> Optional[str]: """Return username from a valid token, or None.""" try: payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) return payload.get("sub") except JWTError: return None # ── FastAPI dependency ──────────────────────────────────────────── def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme)) -> str: """ Validates the Bearer token and returns the username (used as user_id). Raises 401 if the token is missing, expired, or invalid. """ username = decode_token(credentials.credentials) if not username: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) return username