""" Authentication Service Password hashing and JWT token generation """ from datetime import datetime, timedelta from typing import Optional from passlib.context import CryptContext from jose import JWTError, jwt from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session from app.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES from app.database import get_db from app.models import User from app.schemas import TokenData # Password hashing (using argon2 instead of bcrypt for Python 3.13 compatibility) pwd_context = CryptContext(schemes=["argon2"], deprecated="auto") # OAuth2 scheme oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against a hash""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Hash a password""" # Bcrypt has a 72 byte limit, truncate if necessary if len(password.encode('utf-8')) > 72: password = password[:72] return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Create JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def authenticate_user(db: Session, username: str, password: str): """Authenticate user with username and password""" user = db.query(User).filter(User.username == username).first() if not user: return False if not verify_password(password, user.hashed_password): return False return user async def get_current_user( token: str = Depends(oauth2_scheme), db: Session = Depends(get_db) ) -> User: """Get current authenticated user from JWT token""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = db.query(User).filter(User.username == token_data.username).first() if user is None: raise credentials_exception return user