joblin / backend /auth.py
Britzzy's picture
Add super admin user role with view toggle
f61335f
Raw
History Blame Contribute Delete
1.88 kB
from datetime import datetime, timedelta, timezone
from jose import JWTError, jwt
import bcrypt as _bcrypt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from backend.config import JWT_SECRET, JWT_ALGORITHM, JWT_EXPIRATION_HOURS
security = HTTPBearer(auto_error=False)
def hash_password(password: str) -> str:
return _bcrypt.hashpw(password.encode("utf-8"), _bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
return _bcrypt.checkpw(plain.encode("utf-8"), hashed.encode("utf-8"))
def create_access_token(user_id: int, email: str, is_admin: bool = False) -> str:
exp = datetime.now(timezone.utc) + timedelta(hours=JWT_EXPIRATION_HOURS)
return jwt.encode(
{"sub": str(user_id), "email": email, "is_admin": is_admin, "exp": exp, "iat": datetime.now(timezone.utc)},
JWT_SECRET,
algorithm=JWT_ALGORITHM,
)
def decode_token(token: str) -> dict:
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return {"user_id": int(payload["sub"]), "email": payload["email"], "is_admin": payload.get("is_admin", False)}
except (JWTError, KeyError):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token")
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
if credentials is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
return decode_token(credentials.credentials)
async def optional_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict | None:
if credentials is None:
return None
try:
return decode_token(credentials.credentials)
except Exception:
return None