from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session from database import get_db from models.user import User from core.security import decode_access_token oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) ) -> User: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token", headers={"WWW-Authenticate": "Bearer"}, ) email = decode_access_token(token) if email is None: raise credentials_exception user = db.query(User).filter(User.email == email).first() if user is None: raise credentials_exception if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is inactive" ) return user def require_role(*roles: str): def role_checker(current_user: User = Depends(get_current_user)) -> User: if current_user.role not in roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Access denied. Required roles: {list(roles)}" ) return current_user return role_checker # Ready to use dependencies require_admin = require_role("admin") require_analyst_or_admin = require_role("analyst", "admin") require_any = require_role("viewer", "analyst", "admin")