File size: 2,998 Bytes
69be42f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""Dependency injection for database sessions and JWT authentication.

[Task]: T013, T014
[From]: specs/001-user-auth/quickstart.md
"""
import uuid
from typing import Annotated, Optional
from sqlmodel import Session
from fastapi import Depends, HTTPException, status
from starlette.requests import Request as StarletteRequest
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

from core.database import get_session as db_get_session
from core.security import decode_access_token

# HTTP Bearer scheme for Authorization header
security = HTTPBearer(auto_error=False)


def get_session():
    """Yield a database session with automatic cleanup.

    Uses the get_session function from core.database for consistency.
    """
    yield from db_get_session()


# Type alias for dependency injection
SessionDep = Annotated[Session, Depends(get_session)]


async def get_current_user_id(
    credentials: Optional[HTTPAuthorizationCredentials] = Depends(security),
    request: StarletteRequest = None
) -> uuid.UUID:
    """Get current user ID from JWT token.

    Extracts JWT token from Authorization header or httpOnly cookie,
    verifies it, and returns user_id as UUID.

    Args:
        credentials: HTTP Bearer credentials from Authorization header
        request: Starlette request object to access cookies

    Returns:
        Current authenticated user's ID as UUID

    Raises:
        HTTPException: If token is invalid, expired, or missing
    """
    # Extract token from Authorization header or cookie
    token = None

    # Try Authorization header first
    if credentials:
        token = credentials.credentials

    # If no token in header, try httpOnly cookie
    if not token and request:
        auth_token = request.cookies.get("auth_token")
        if auth_token:
            token = auth_token

    if not token:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

    try:
        # Decode and verify token
        payload = decode_access_token(token)
        user_id_str = payload.get("sub")

        if not user_id_str:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token: user_id missing"
            )

        # Convert string to UUID for database comparison
        user_id = uuid.UUID(user_id_str)

        return user_id
    except HTTPException:
        raise
    except ValueError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token: malformed user_id"
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials"
        )


# Type alias for JWT authentication dependency
CurrentUserDep = Annotated[uuid.UUID, Depends(get_current_user_id)]