CPS-API / core /security.py
Ali2206's picture
Update core/security.py
5c9fe8e verified
raw
history blame
3.4 kB
from datetime import datetime, timedelta
from fastapi import HTTPException, status, Request, Depends
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from db.mongo import users_collection
from core.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
import logging
from typing import Optional
logger = logging.getLogger(__name__)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
def create_access_token(data: dict) -> str:
try:
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
logger.info(f"Token created for {data.get('sub')}")
return encoded_jwt
except Exception as e:
logger.error(f"Token creation failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Token creation failed"
)
async def validate_token(token: str) -> dict:
try:
if not token:
logger.error("Empty token provided")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="No token provided"
)
# Handle cases where 'Bearer ' prefix might be present
if token.startswith("Bearer "):
token = token[7:]
logger.debug("Removed 'Bearer ' prefix from token")
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email = payload.get("sub")
if not email:
logger.error("Token missing 'sub' claim")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token format"
)
logger.info(f"Token validated for {email}")
return payload
except jwt.ExpiredSignatureError:
logger.error("Token expired")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired"
)
except JWTError as e:
logger.error(f"Token validation failed: {str(e)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
async def get_current_user(request: Request, token: str = Depends(oauth2_scheme)) -> dict:
logger.info(f"Incoming request headers: {dict(request.headers)}")
logger.info(f"Raw token received: {token[:15]}...") # Log first 15 chars for security
try:
payload = await validate_token(token)
email = payload.get("sub")
user = await users_collection.find_one({"email": email})
if not user:
logger.error(f"User not found in DB: {email}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
logger.info(f"User authenticated: {email}")
return user
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error in get_current_user: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Internal server error"
)