""" JWT authentication — register, login, and token verification. """ from datetime import datetime, timedelta, timezone from typing import Optional import jwt import bcrypt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session from app.config import get_settings from app.database import get_db from app.models import User settings = get_settings() security = HTTPBearer() # ── Password Hashing ───────────────────────────────── def hash_password(password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8")) # ── JWT Token ──────────────────────────────────────── def create_access_token(user_id: str) -> str: """Create a JWT access token with user_id as the subject.""" payload = { "sub": user_id, "type": "access", "exp": datetime.now(timezone.utc) + timedelta(minutes=settings.JWT_ACCESS_EXPIRY_MINUTES), "iat": datetime.now(timezone.utc), } return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.JWT_ALGORITHM) def create_refresh_token(user_id: str) -> str: """Create a JWT refresh token with user_id as the subject.""" payload = { "sub": user_id, "type": "refresh", "exp": datetime.now(timezone.utc) + timedelta(days=settings.JWT_REFRESH_EXPIRY_DAYS), "iat": datetime.now(timezone.utc), } return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.JWT_ALGORITHM) def decode_token(token: str, token_type: str = "access") -> Optional[str]: """Decode JWT and return user_id, or None if invalid.""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.JWT_ALGORITHM]) if payload.get("type") != token_type: return None return payload.get("sub") except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None # ── FastAPI Dependencies ───────────────────────────── def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db), ) -> User: """Dependency: extract and validate user from JWT bearer token.""" token = credentials.credentials user_id = decode_token(token) if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", ) return user def get_admin_user(user: User = Depends(get_current_user)) -> User: """Dependency: require admin privileges.""" if not user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required", ) return user