Spaces:
Running
Running
| import secrets | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| from jose import jwt, JWTError | |
| import bcrypt | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import OAuth2PasswordBearer | |
| from sqlalchemy.orm import Session | |
| from src.core.config import settings | |
| from src.infrastructure.database import get_db | |
| from src.core.domain.db_models import User, RefreshToken, UserRole | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login") | |
| ALGORITHM = "HS256" | |
| REFRESH_TOKEN_EXPIRE_DAYS = 30 | |
| # ββ Password helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def verify_password(plain: str, hashed: str) -> bool: | |
| return bcrypt.checkpw(plain.encode(), hashed.encode()) | |
| def get_password_hash(password: str) -> str: | |
| return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() | |
| # ββ JWT access token ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| to_encode = data.copy() | |
| expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)) | |
| to_encode.update({"exp": expire, "type": "access"}) | |
| return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM) | |
| # ββ Refresh token βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_refresh_token(user_id: int, db: Session) -> str: | |
| token = secrets.token_urlsafe(64) | |
| expires_at = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) | |
| db_token = RefreshToken(token=token, user_id=user_id, expires_at=expires_at) | |
| db.add(db_token) | |
| db.commit() | |
| return token | |
| def rotate_refresh_token(old_token: str, db: Session) -> tuple[str, "User"]: | |
| """Validate old refresh token, revoke it, issue a new one.""" | |
| record = db.query(RefreshToken).filter( | |
| RefreshToken.token == old_token, | |
| RefreshToken.revoked == False | |
| ).first() | |
| if not record or record.expires_at < datetime.utcnow(): | |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired refresh token") | |
| record.revoked = True | |
| db.commit() | |
| new_token = create_refresh_token(record.user_id, db) | |
| return new_token, record.user | |
| def revoke_all_refresh_tokens(user_id: int, db: Session): | |
| db.query(RefreshToken).filter( | |
| RefreshToken.user_id == user_id, | |
| RefreshToken.revoked == False | |
| ).update({"revoked": True}) | |
| db.commit() | |
| # ββ Current user dependency βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User: | |
| exc = HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| try: | |
| payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) | |
| if payload.get("type") != "access": | |
| raise exc | |
| email: str = payload.get("sub") | |
| if not email: | |
| raise exc | |
| except JWTError: | |
| raise exc | |
| user = db.query(User).filter(User.email == email).first() | |
| if not user or not user.is_active: | |
| raise exc | |
| return user | |
| def get_current_active_user(current_user: User = Depends(get_current_user)) -> User: | |
| if not current_user.is_active: | |
| raise HTTPException(status_code=400, detail="Inactive user") | |
| return current_user | |
| # ββ Role guards βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def require_super_admin(current_user: User = Depends(get_current_user)) -> User: | |
| if current_user.role != UserRole.super_admin: | |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Super admin access required") | |
| return current_user | |