Spaces:
Sleeping
Sleeping
| from datetime import datetime, timedelta, timezone | |
| import bcrypt | |
| from jose import JWTError, jwt | |
| from fastapi import HTTPException, Depends, status | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from .config import settings | |
| bearer_scheme = HTTPBearer() | |
| _BCRYPT_MAX = 72 | |
| def _clip(password: str) -> bytes: | |
| return password.encode("utf-8")[:_BCRYPT_MAX] | |
| def hash_password(password: str) -> str: | |
| return bcrypt.hashpw(_clip(password), bcrypt.gensalt()).decode("utf-8") | |
| def verify_password(plain: str, hashed: str) -> bool: | |
| try: | |
| return bcrypt.checkpw(_clip(plain), hashed.encode("utf-8")) | |
| except ValueError: | |
| return False | |
| def create_token(data: dict) -> str: | |
| payload = data.copy() | |
| payload["exp"] = datetime.now(timezone.utc) + timedelta( | |
| minutes=settings.JWT_EXPIRE_MINUTES | |
| ) | |
| return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM) | |
| def decode_token(token: str) -> dict: | |
| try: | |
| return jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) | |
| except JWTError: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired token", | |
| ) | |
| def decode_token_safe(token: str) -> dict | None: | |
| try: | |
| return jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) | |
| except JWTError: | |
| return None | |
| async def get_current_user( | |
| creds: HTTPAuthorizationCredentials = Depends(bearer_scheme), | |
| ) -> dict: | |
| return decode_token(creds.credentials) | |
| def require_role(role: str): | |
| async def _dep(payload: dict = Depends(get_current_user)) -> dict: | |
| if payload.get("role") != role: | |
| raise HTTPException(status_code=403, detail=f"Requires {role} role") | |
| return payload | |
| return _dep | |