from datetime import datetime, timedelta, timezone import bcrypt from jose import JWTError, jwt from fastapi import HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from .config import settings bearer_scheme = HTTPBearer() _BCRYPT_MAX = 72 def _clip(password: str) -> bytes: return password.encode("utf-8")[:_BCRYPT_MAX] def hash_password(password: str) -> str: return bcrypt.hashpw(_clip(password), bcrypt.gensalt()).decode("utf-8") def verify_password(plain: str, hashed: str) -> bool: try: return bcrypt.checkpw(_clip(plain), hashed.encode("utf-8")) except ValueError: return False def create_token(data: dict) -> str: payload = data.copy() payload["exp"] = datetime.now(timezone.utc) + timedelta( minutes=settings.JWT_EXPIRE_MINUTES ) return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) def decode_token(token: str) -> dict: try: return jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", ) def decode_token_safe(token: str) -> dict | None: try: return jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) except JWTError: return None async def get_current_user( creds: HTTPAuthorizationCredentials = Depends(bearer_scheme), ) -> dict: return decode_token(creds.credentials) def require_role(role: str): async def _dep(payload: dict = Depends(get_current_user)) -> dict: if payload.get("role") != role: raise HTTPException(status_code=403, detail=f"Requires {role} role") return payload return _dep