| from fastapi import Depends, HTTPException, status, Cookie | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlalchemy import select | |
| from typing import Optional | |
| from app.database import get_db | |
| from app.db_models import User | |
| from app.auth import verify_token | |
| security = HTTPBearer(auto_error=False) | |
| async def get_current_user( | |
| credentials: Optional[HTTPAuthorizationCredentials] = Depends(security), | |
| token: Optional[str] = Cookie(None, alias="access_token"), | |
| session: AsyncSession = Depends(get_db), | |
| ) -> Optional[User]: | |
| jwt_token = None | |
| if credentials: | |
| jwt_token = credentials.credentials | |
| elif token: | |
| jwt_token = token | |
| if not jwt_token: | |
| return None | |
| payload = verify_token(jwt_token) | |
| if not payload: | |
| return None | |
| user_id = payload.get("sub") | |
| if not user_id: | |
| return None | |
| result = await session.execute(select(User).where(User.id == int(user_id))) | |
| user = result.scalar_one_or_none() | |
| return user | |
| async def require_user( | |
| current_user: Optional[User] = Depends(get_current_user), | |
| ) -> User: | |
| if not current_user: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" | |
| ) | |
| return current_user | |
| async def require_admin(current_user: User = Depends(require_user)) -> User: | |
| if current_user.role != "admin": | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required" | |
| ) | |
| return current_user | |