Spaces:
Running
Running
File size: 4,444 Bytes
a63c61f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 | import secrets
from datetime import datetime, timedelta
from typing import Optional
from jose import jwt, JWTError
import bcrypt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from src.core.config import settings
from src.infrastructure.database import get_db
from src.core.domain.db_models import User, RefreshToken, UserRole
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
ALGORITHM = "HS256"
REFRESH_TOKEN_EXPIRE_DAYS = 30
# ββ Password helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def verify_password(plain: str, hashed: str) -> bool:
return bcrypt.checkpw(plain.encode(), hashed.encode())
def get_password_hash(password: str) -> str:
return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
# ββ JWT access token ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM)
# ββ Refresh token βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def create_refresh_token(user_id: int, db: Session) -> str:
token = secrets.token_urlsafe(64)
expires_at = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
db_token = RefreshToken(token=token, user_id=user_id, expires_at=expires_at)
db.add(db_token)
db.commit()
return token
def rotate_refresh_token(old_token: str, db: Session) -> tuple[str, "User"]:
"""Validate old refresh token, revoke it, issue a new one."""
record = db.query(RefreshToken).filter(
RefreshToken.token == old_token,
RefreshToken.revoked == False
).first()
if not record or record.expires_at < datetime.utcnow():
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired refresh token")
record.revoked = True
db.commit()
new_token = create_refresh_token(record.user_id, db)
return new_token, record.user
def revoke_all_refresh_tokens(user_id: int, db: Session):
db.query(RefreshToken).filter(
RefreshToken.user_id == user_id,
RefreshToken.revoked == False
).update({"revoked": True})
db.commit()
# ββ Current user dependency βββββββββββββββββββββββββββββββββββββββββββββββββββ
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User:
exc = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "access":
raise exc
email: str = payload.get("sub")
if not email:
raise exc
except JWTError:
raise exc
user = db.query(User).filter(User.email == email).first()
if not user or not user.is_active:
raise exc
return user
def get_current_active_user(current_user: User = Depends(get_current_user)) -> User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# ββ Role guards βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def require_super_admin(current_user: User = Depends(get_current_user)) -> User:
if current_user.role != UserRole.super_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Super admin access required")
return current_user
|