Spaces:
Sleeping
Sleeping
File size: 2,122 Bytes
b29925a fe02952 848bdfb fe02952 af1a42f 848bdfb d1dd9f1 b29925a fe02952 b29925a fe02952 b29925a fe02952 b29925a fe02952 b29925a fe02952 69fa5a7 fe02952 69fa5a7 fe02952 69fa5a7 fe02952 848bdfb 69fa5a7 fe02952 69fa5a7 fe02952 69fa5a7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
from datetime import datetime, timedelta
from passlib.context import CryptContext
from jose import jwt, JWTError
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from core.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
from db.mongo import users_collection
# OAuth2 setup β adjust tokenUrl if your API has a prefix like /api
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Hash a plain password
def hash_password(password: str) -> str:
return pwd_context.hash(password)
# Verify a plain password against the hash
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# Create a JWT access token
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)):
logger.info(f"π Authentication attempt with token: {token[:15]}...") # Log first part of token
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
logger.info(f"π Token payload: {payload}")
email: str = payload.get("sub")
if email is None:
logger.error("β Invalid token: subject missing")
raise HTTPException(status_code=401, detail="Invalid token: subject missing")
except JWTError as e:
logger.error(f"β JWTError while decoding token: {str(e)}")
raise HTTPException(status_code=401, detail="Could not validate token")
user = await users_collection.find_one({"email": email})
if not user:
logger.error(f"β User not found for email: {email}")
raise HTTPException(status_code=404, detail="User not found")
logger.info(f"β
Authenticated user: {user['email']}")
return user |