from datetime import datetime, timedelta, timezone from jose import JWTError, jwt from passlib.context import CryptContext from fastapi import HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.orm import Session import hashlib import base64 from app.core.config import settings from app.database.connection import get_session from app.database.models import User # CONFIG SECRET_KEY = settings.SECRET_KEY ALGORITHM = settings.ALGORITHM ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # auto_error=False → allows us to return 401 instead of FastAPI's 403 security = HTTPBearer(auto_error=False) # TOKEN LOGIC def create_access_token(data: dict) -> str: """ Create a JWT access token with UTC-aware expiration """ to_encode = data.copy() expire = datetime.now(timezone.utc) + timedelta( minutes=ACCESS_TOKEN_EXPIRE_MINUTES ) to_encode.update({ "exp": expire, "sub": str(data.get("id")) # JWT best practice }) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def verify_token( credentials: HTTPAuthorizationCredentials = Depends(security) ) -> dict: """ Verify JWT token and return payload """ if credentials is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated" ) try: token = credentials.credentials payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id = payload.get("id") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload" ) return payload except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token" ) # CURRENT USER DEPENDENCY async def get_current_user( token_data: dict = Depends(verify_token), db: Session = Depends(get_session) ) -> User: """ Fetch the logged-in user from DB using JWT payload """ user_id = token_data.get("id") user = ( db.query(User) .filter(User.id == user_id) .first() ) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found" ) return user # PASSWORD LOGIC (bcrypt-safe, unlimited length)