File size: 2,521 Bytes
82fcb44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d6ee40
82fcb44
 
 
 
 
 
3d6ee40
82fcb44
 
 
3d6ee40
82fcb44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d6ee40
82fcb44
 
 
 
 
 
 
 
 
 
 
3d6ee40
82fcb44
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
"""
Authentication dependencies for FastAPI.
Validates JWT tokens and provides user context.
"""
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from pydantic import BaseModel

from app.core.config import settings

security = HTTPBearer()


class TokenUser(BaseModel):
    """User information extracted from JWT token"""
    user_id: str
    username: str
    role_id: str
    merchant_id: str
    merchant_type: Optional[str] = None
    metadata: Optional[dict] = None
    
    def has_role(self, *roles: str) -> bool:
        """Check if user has any of the specified roles"""
        return self.role_id in roles
    
    def is_admin(self) -> bool:
        """Check if user has admin privileges"""
        return "admin" in self.role_id.lower()


async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> TokenUser:
    """
    Get current authenticated user from JWT token.
    Validates the token and extracts user information.
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    
    try:
        # Decode and verify JWT token
        payload = jwt.decode(
            credentials.credentials,
            settings.SECRET_KEY,
            algorithms=[settings.ALGORITHM]
        )
        
        user_id: str = payload.get("sub")
        username: str = payload.get("username")
        role: str = payload.get("role_id")
        merchant_id: str = payload.get("merchant_id")
        merchant_type: str = payload.get("merchant_type")
        metadata: dict = payload.get("metadata")
        
        if user_id is None or username is None or merchant_id is None:
            raise credentials_exception
        
        # Create TokenUser from payload
        return TokenUser(
            user_id=user_id,
            username=username,
            role_id=role or "user",
            merchant_id=merchant_id,
            merchant_type=merchant_type,
            metadata=metadata
        )
        
    except JWTError:
        raise credentials_exception
    except Exception:
        raise credentials_exception


async def get_current_active_user(
    current_user: TokenUser = Depends(get_current_user)
) -> TokenUser:
    """Get current active user."""
    return current_user