from datetime import datetime, timedelta from fastapi import HTTPException, status, Depends, Request from fastapi.security import OAuth2PasswordBearer from jose import jwt, JWTError from pydantic import BaseModel from typing import Optional import logging from core.config import settings logger = logging.getLogger(__name__) oauth2_scheme = OAuth2PasswordBearer( tokenUrl=f"{settings.API_V1_STR}/auth/login", scheme_name="JWT" ) class TokenPayload(BaseModel): sub: str exp: int iat: int role: Optional[str] = None def create_access_token(*, data: dict, expires_delta: timedelta = None) -> str: try: to_encode = data.copy() expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)) to_encode.update({"exp": expire, "iat": datetime.utcnow()}) encoded_jwt = jwt.encode( to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM ) logger.info(f"Token created for {data.get('sub')}") return encoded_jwt except Exception as e: logger.error(f"Token creation failed: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Token creation failed" ) async def verify_token(token: str = Depends(oauth2_scheme)) -> TokenPayload: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: if not token: logger.error("Empty token provided") raise credentials_exception if token.startswith("Bearer "): token = token[7:] logger.debug("Removed 'Bearer ' prefix from token") payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) token_data = TokenPayload(**payload) if datetime.fromtimestamp(token_data.exp) < datetime.utcnow(): logger.error("Token expired") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired" ) logger.info(f"Token verified for {token_data.sub}") return token_data except JWTError as e: logger.error(f"JWT validation failed: {str(e)}") raise credentials_exception except Exception as e: logger.error(f"Unexpected token verification error: {str(e)}") raise credentials_exception