CPS-API / core /security.py
Ali2206's picture
Update core/security.py
77b0666 verified
raw
history blame
2.65 kB
from datetime import datetime, timedelta
from fastapi import HTTPException, status, Depends, Request
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import BaseModel
from typing import Optional
import logging
from core.config import settings
logger = logging.getLogger(__name__)
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login",
scheme_name="JWT"
)
class TokenPayload(BaseModel):
sub: str
exp: int
iat: int
role: Optional[str] = None
def create_access_token(*, data: dict, expires_delta: timedelta = None) -> str:
try:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
logger.info(f"Token created for {data.get('sub')}")
return encoded_jwt
except Exception as e:
logger.error(f"Token creation failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Token creation failed"
)
async def verify_token(token: str = Depends(oauth2_scheme)) -> TokenPayload:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
if not token:
logger.error("Empty token provided")
raise credentials_exception
if token.startswith("Bearer "):
token = token[7:]
logger.debug("Removed 'Bearer ' prefix from token")
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
token_data = TokenPayload(**payload)
if datetime.fromtimestamp(token_data.exp) < datetime.utcnow():
logger.error("Token expired")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired"
)
logger.info(f"Token verified for {token_data.sub}")
return token_data
except JWTError as e:
logger.error(f"JWT validation failed: {str(e)}")
raise credentials_exception
except Exception as e:
logger.error(f"Unexpected token verification error: {str(e)}")
raise credentials_exception