Spaces:
Sleeping
Sleeping
File size: 2,645 Bytes
b29925a 77b0666 740f610 77b0666 397123d 77b0666 397123d 848bdfb 740f610 77b0666 740f610 8776504 77b0666 9e499eb 77b0666 8776504 77b0666 8776504 740f610 77b0666 740f610 77b0666 740f610 77b0666 8776504 77b0666 8776504 77b0666 8776504 77b0666 848bdfb 77b0666 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | from datetime import datetime, timedelta
from fastapi import HTTPException, status, Depends, Request
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import BaseModel
from typing import Optional
import logging
from core.config import settings
logger = logging.getLogger(__name__)
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login",
scheme_name="JWT"
)
class TokenPayload(BaseModel):
sub: str
exp: int
iat: int
role: Optional[str] = None
def create_access_token(*, data: dict, expires_delta: timedelta = None) -> str:
try:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
logger.info(f"Token created for {data.get('sub')}")
return encoded_jwt
except Exception as e:
logger.error(f"Token creation failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Token creation failed"
)
async def verify_token(token: str = Depends(oauth2_scheme)) -> TokenPayload:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
if not token:
logger.error("Empty token provided")
raise credentials_exception
if token.startswith("Bearer "):
token = token[7:]
logger.debug("Removed 'Bearer ' prefix from token")
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
if datetime.fromtimestamp(token_data.exp) < datetime.utcnow():
logger.error("Token expired")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired"
)
logger.info(f"Token verified for {token_data.sub}")
return token_data
except JWTError as e:
logger.error(f"JWT validation failed: {str(e)}")
raise credentials_exception
except Exception as e:
logger.error(f"Unexpected token verification error: {str(e)}")
raise credentials_exception |