import secrets from datetime import datetime, timedelta from typing import Optional from jose import jwt, JWTError import bcrypt from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session from src.core.config import settings from src.infrastructure.database import get_db from src.core.domain.db_models import User, RefreshToken, UserRole oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login") ALGORITHM = "HS256" REFRESH_TOKEN_EXPIRE_DAYS = 30 # ── Password helpers ────────────────────────────────────────────────────────── def verify_password(plain: str, hashed: str) -> bool: return bcrypt.checkpw(plain.encode(), hashed.encode()) def get_password_hash(password: str) -> str: return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() # ── JWT access token ────────────────────────────────────────────────────────── def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)) to_encode.update({"exp": expire, "type": "access"}) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM) # ── Refresh token ───────────────────────────────────────────────────────────── def create_refresh_token(user_id: int, db: Session) -> str: token = secrets.token_urlsafe(64) expires_at = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) db_token = RefreshToken(token=token, user_id=user_id, expires_at=expires_at) db.add(db_token) db.commit() return token def rotate_refresh_token(old_token: str, db: Session) -> tuple[str, "User"]: """Validate old refresh token, revoke it, issue a new one.""" record = db.query(RefreshToken).filter( RefreshToken.token == old_token, RefreshToken.revoked == False ).first() if not record or record.expires_at < datetime.utcnow(): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired refresh token") record.revoked = True db.commit() new_token = create_refresh_token(record.user_id, db) return new_token, record.user def revoke_all_refresh_tokens(user_id: int, db: Session): db.query(RefreshToken).filter( RefreshToken.user_id == user_id, RefreshToken.revoked == False ).update({"revoked": True}) db.commit() # ── Current user dependency ─────────────────────────────────────────────────── def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)) -> User: exc = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[ALGORITHM]) if payload.get("type") != "access": raise exc email: str = payload.get("sub") if not email: raise exc except JWTError: raise exc user = db.query(User).filter(User.email == email).first() if not user or not user.is_active: raise exc return user def get_current_active_user(current_user: User = Depends(get_current_user)) -> User: if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user # ── Role guards ─────────────────────────────────────────────────────────────── def require_super_admin(current_user: User = Depends(get_current_user)) -> User: if current_user.role != UserRole.super_admin: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Super admin access required") return current_user