NeighbourAid / app /core /security.py
Parth Kansal
commit
49e9f9d
from datetime import datetime, timedelta, timezone
import bcrypt
from jose import JWTError, jwt
from fastapi import HTTPException, Depends, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from .config import settings
bearer_scheme = HTTPBearer()
_BCRYPT_MAX = 72
def _clip(password: str) -> bytes:
return password.encode("utf-8")[:_BCRYPT_MAX]
def hash_password(password: str) -> str:
return bcrypt.hashpw(_clip(password), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
try:
return bcrypt.checkpw(_clip(plain), hashed.encode("utf-8"))
except ValueError:
return False
def create_token(data: dict) -> str:
payload = data.copy()
payload["exp"] = datetime.now(timezone.utc) + timedelta(
minutes=settings.JWT_EXPIRE_MINUTES
)
return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
def decode_token(token: str) -> dict:
try:
return jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
)
def decode_token_safe(token: str) -> dict | None:
try:
return jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
except JWTError:
return None
async def get_current_user(
creds: HTTPAuthorizationCredentials = Depends(bearer_scheme),
) -> dict:
return decode_token(creds.credentials)
def require_role(role: str):
async def _dep(payload: dict = Depends(get_current_user)) -> dict:
if payload.get("role") != role:
raise HTTPException(status_code=403, detail=f"Requires {role} role")
return payload
return _dep