File size: 1,654 Bytes
cdd38c8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | from fastapi import Depends, HTTPException, status, Cookie
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import Optional
from app.database import get_db
from app.db_models import User
from app.auth import verify_token
security = HTTPBearer(auto_error=False)
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
token: Optional[str] = Cookie(None, alias="access_token"),
session: AsyncSession = Depends(get_db),
) -> Optional[User]:
jwt_token = None
if credentials:
jwt_token = credentials.credentials
elif token:
jwt_token = token
if not jwt_token:
return None
payload = verify_token(jwt_token)
if not payload:
return None
user_id = payload.get("sub")
if not user_id:
return None
result = await session.execute(select(User).where(User.id == int(user_id)))
user = result.scalar_one_or_none()
return user
async def require_user(
current_user: Optional[User] = Depends(get_current_user),
) -> User:
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated"
)
return current_user
async def require_admin(current_user: User = Depends(require_user)) -> User:
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required"
)
return current_user
|