Spaces:
Sleeping
Sleeping
File size: 2,055 Bytes
b29925a 081d154 77b0666 081d154 397123d 848bdfb 081d154 8776504 081d154 9e499eb 081d154 740f610 081d154 740f610 081d154 8776504 081d154 848bdfb 081d154 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
from datetime import datetime, timedelta
from passlib.context import CryptContext
from jose import jwt, JWTError
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from core.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
from db.mongo import users_collection
import logging
logger = logging.getLogger(__name__)
# OAuth2 setup
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Hash a plain password
def hash_password(password: str) -> str:
return pwd_context.hash(password)
# Verify a plain password against the hash
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# Create a JWT access token
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# Get the current user from the JWT token
async def get_current_user(token: str = Depends(oauth2_scheme)):
print("π Token received:", token)
if not token:
print("β No token received")
raise HTTPException(status_code=401, detail="No token provided")
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
print("π§ Token payload:", payload)
email = payload.get("sub")
if not email:
raise HTTPException(status_code=401, detail="Invalid token: missing subject")
except JWTError as e:
print("β JWT decode error:", str(e))
raise HTTPException(status_code=401, detail="Could not validate token")
user = await users_collection.find_one({"email": email})
if not user:
raise HTTPException(status_code=404, detail="User not found")
print("β
Authenticated user:", user["email"])
return user
|