Soma / app /auth /auth.py
Komalpreet Kaur
refactor: optimize auth flow and improve API error handling
7c00f2a unverified
from datetime import datetime, timedelta
from typing import Optional
import bcrypt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from app.core.config import settings
bearer_scheme = HTTPBearer()
# ── Password helpers ──────────────────────────────────────────────
def hash_password(plain: str) -> str:
return bcrypt.hashpw(plain.encode(), bcrypt.gensalt()).decode()
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
# ── JWT helpers ───────────────────────────────────────────────────
def create_token(username: str) -> str:
expire = datetime.utcnow() + timedelta(days=settings.JWT_EXPIRE_DAYS)
payload = {"sub": username, "exp": expire}
return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
def decode_token(token: str) -> Optional[str]:
"""Return username from a valid token, or None."""
try:
payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
return payload.get("sub")
except JWTError:
return None
# ── FastAPI dependency ────────────────────────────────────────────
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(bearer_scheme)) -> str:
"""
Validates the Bearer token and returns the username (used as user_id).
Raises 401 if the token is missing, expired, or invalid.
"""
username = decode_token(credentials.credentials)
if not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
return username