Spaces:
Running
Running
File size: 2,571 Bytes
84c328d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
"""Authentication dependencies for protected routes.
[Task]: T036, T037
[From]: specs/001-user-auth/plan.md
"""
from typing import Optional
from fastapi import Depends, HTTPException, status, Cookie
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlmodel import Session, select
from models.user import User
from core.database import get_session
from core.security import decode_access_token
# Optional: HTTP Bearer scheme for Authorization header
security = HTTPBearer(auto_error=False)
async def get_current_user(
response: Optional[str] = None,
credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
auth_token: Optional[str] = Cookie(None),
session: Session = Depends(get_session)
) -> User:
"""Get current authenticated user from JWT token.
Extracts JWT from Authorization header or httpOnly cookie,
verifies signature, queries database for user.
[Task]: T036, T037
[From]: specs/001-user-auth/plan.md
Args:
credentials: HTTP Bearer credentials from Authorization header
auth_token: JWT token from httpOnly cookie
session: Database session
Returns:
User: Authenticated user object
Raises:
HTTPException 401: If token is invalid, expired, or missing
"""
# Extract token from Authorization header or cookie
token = None
# Try Authorization header first
if credentials:
token = credentials.credentials
# If no token in header, try cookie
if not token and auth_token:
token = auth_token
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Decode and verify token
payload = decode_access_token(token)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token: user_id missing"
)
# Query user from database
user = session.get(User, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
return user
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
|