CPS-API / core /security.py
Ali2206's picture
Update core/security.py
3a0a439 verified
raw
history blame
3.3 kB
# core/security.py
from datetime import datetime, timedelta
from passlib.context import CryptContext
from jose import jwt, JWTError
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from core.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
from db.mongo import users_collection
import logging
from fastapi import Request
logger = logging.getLogger(__name__)
# OAuth2 setup
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/auth/login",
scheme_name="JWT"
)
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
logger.debug(f"Created JWT for {data.get('sub')}, expires at {expire}")
return encoded_jwt
async def get_current_user(request: Request, token: str = Depends(oauth2_scheme)):
auth_header = request.headers.get("Authorization", "No Authorization header")
logger.debug(f"Raw Authorization header: {auth_header}")
logger.debug(f"Processed token: {token[:10]}... if present")
if not token:
logger.error(f"No token provided. Full headers: {dict(request.headers)}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="No token provided",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
logger.debug(f"Token payload: {payload}")
email = payload.get("sub")
if not email:
logger.error("Invalid token: missing subject")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token: missing subject",
headers={"WWW-Authenticate": "Bearer"}
)
exp = payload.get("exp")
if exp and datetime.utcnow().timestamp() > exp:
logger.error(f"Token expired for {email}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"}
)
except JWTError as e:
logger.error(f"JWT decode error: {str(e)}. Token: {token[:10]}...")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Could not validate token: {str(e)}",
headers={"WWW-Authenticate": "Bearer"}
)
user = await users_collection.find_one({"email": email})
if not user:
logger.error(f"User not found for email: {email}")
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
logger.info(f"Authenticated user: {user['email']}, role: {user.get('role')}")
return user