cps-api-tx / core /security.py
Ali2206's picture
Initial CPS-API deployment with TxAgent integration
682caaf
from datetime import datetime, timedelta
from passlib.context import CryptContext
from jose import jwt, JWTError
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from core.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
from db.mongo import users_collection
import logging
logger = logging.getLogger(__name__)
# OAuth2 setup
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Hash a plain password
def hash_password(password: str) -> str:
return pwd_context.hash(password)
# Verify a plain password against the hash
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# Create a JWT access token
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# Get the current user from the JWT token
async def get_current_user(token: str = Depends(oauth2_scheme)):
print("🔐 Token received:", token)
if not token:
print("❌ No token received")
raise HTTPException(status_code=401, detail="No token provided")
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
print("🧠 Token payload:", payload)
email = payload.get("sub")
if not email:
raise HTTPException(status_code=401, detail="Invalid token: missing subject")
except JWTError as e:
print("❌ JWT decode error:", str(e))
raise HTTPException(status_code=401, detail="Could not validate token")
user = await users_collection.find_one({"email": email})
if not user:
raise HTTPException(status_code=404, detail="User not found")
print("✅ Authenticated user:", user["email"])
return user