Spaces:
Runtime error
Runtime error
| from typing import Annotated | |
| from fastapi import Depends, HTTPException, status | |
| from fastapi.security import OAuth2PasswordBearer | |
| from sqlalchemy.orm import Session | |
| from backend.database import get_db | |
| from backend.core.security import verify_token | |
| from backend.models import User | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") | |
| def get_current_user( | |
| token: Annotated[str, Depends(oauth2_scheme)], | |
| db: Annotated[Session, Depends(get_db)] | |
| ) -> User: | |
| payload = verify_token(token) | |
| if payload is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid or expired token", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| user_id = payload.get("sub") | |
| if not user_id or not isinstance(user_id, str): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid token payload", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| user = db.query(User).filter(User.id == user_id).first() | |
| if user is None: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="User not found", | |
| ) | |
| # Check token version - invalidates tokens after password change/logout | |
| token_version = payload.get("token_version") | |
| if token_version is None or token_version != user.token_version: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Token has been revoked", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| if not user.is_active: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="User account is disabled", | |
| ) | |
| return user | |
| def get_current_admin_user( | |
| current_user: Annotated[User, Depends(get_current_user)] | |
| ) -> User: | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Admin access required", | |
| ) | |
| return current_user | |
| # Type aliases for cleaner route signatures | |
| CurrentUser = Annotated[User, Depends(get_current_user)] | |
| AdminUser = Annotated[User, Depends(get_current_admin_user)] | |
| DbSession = Annotated[Session, Depends(get_db)] | |
| # Re-export access control for convenience | |
| from backend.core.access_control import AccessContext, UserAccessContext | |