File size: 2,645 Bytes
b29925a
77b0666
740f610
77b0666
 
 
397123d
77b0666
397123d
 
848bdfb
740f610
77b0666
740f610
 
8776504
77b0666
 
 
 
 
9e499eb
77b0666
 
 
 
 
 
 
 
 
 
 
 
 
 
8776504
77b0666
 
8776504
740f610
77b0666
 
 
 
 
 
 
740f610
77b0666
 
 
740f610
77b0666
 
 
8776504
77b0666
 
 
 
 
 
 
 
 
8776504
 
77b0666
8776504
77b0666
 
 
 
848bdfb
77b0666
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from datetime import datetime, timedelta
from fastapi import HTTPException, status, Depends, Request
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import BaseModel
from typing import Optional
import logging
from core.config import settings

logger = logging.getLogger(__name__)

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl=f"{settings.API_V1_STR}/auth/login",
    scheme_name="JWT"
)

class TokenPayload(BaseModel):
    sub: str
    exp: int
    iat: int
    role: Optional[str] = None

def create_access_token(*, data: dict, expires_delta: timedelta = None) -> str:
    try:
        to_encode = data.copy()
        expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
        to_encode.update({"exp": expire, "iat": datetime.utcnow()})
        encoded_jwt = jwt.encode(
            to_encode, 
            settings.SECRET_KEY, 
            algorithm=settings.ALGORITHM
        )
        logger.info(f"Token created for {data.get('sub')}")
        return encoded_jwt
    except Exception as e:
        logger.error(f"Token creation failed: {str(e)}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail="Token creation failed"
        )

async def verify_token(token: str = Depends(oauth2_scheme)) -> TokenPayload:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    
    try:
        if not token:
            logger.error("Empty token provided")
            raise credentials_exception

        if token.startswith("Bearer "):
            token = token[7:]
            logger.debug("Removed 'Bearer ' prefix from token")

        payload = jwt.decode(
            token,
            settings.SECRET_KEY,
            algorithms=[settings.ALGORITHM]
        )
        token_data = TokenPayload(**payload)
        
        if datetime.fromtimestamp(token_data.exp) < datetime.utcnow():
            logger.error("Token expired")
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Token expired"
            )
            
        logger.info(f"Token verified for {token_data.sub}")
        return token_data
        
    except JWTError as e:
        logger.error(f"JWT validation failed: {str(e)}")
        raise credentials_exception
    except Exception as e:
        logger.error(f"Unexpected token verification error: {str(e)}")
        raise credentials_exception