from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from jose import JWTError, jwt from passlib.context import CryptContext from ..models.auth import UserProfile, LoginResponse from ..core.config import settings from ..db.crud import get_user_by_email from ..db.database import get_db, AsyncSession pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/v1/auth/login") async def authenticate_user(email: str, password: str, db: AsyncSession) -> Optional[UserProfile]: """Authenticate a user and return their profile if credentials are valid.""" user = await get_user_by_email(db, email) if not user or not verify_password(password, user.hashed_password): return None return UserProfile( id=user.id, email=user.email, full_name=user.full_name, role=user.role ) def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Generate password hash.""" return pwd_context.hash(password) def create_access_token(data: dict) -> str: """Create a JWT access token.""" to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode( to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM ) return encoded_jwt def create_refresh_token(data: dict) -> str: """Create a JWT refresh token.""" to_encode = data.copy() expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire, "refresh": True}) return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) async def refresh_access_token(refresh_token: str, db: AsyncSession) -> LoginResponse: """Create new access token using refresh token.""" try: payload = jwt.decode( refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) if not payload.get("refresh"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid refresh token" ) email: str = payload.get("sub") if email is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) user = await get_user_by_email(db, email) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found" ) access_token = create_access_token({"sub": user.email}) return LoginResponse( access_token=access_token, token_type="bearer", user=UserProfile( id=user.id, email=user.email, full_name=user.full_name, role=user.role ) ) except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token" ) async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)) -> UserProfile: """Get the current authenticated user from JWT token.""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode( token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] ) email: str = payload.get("sub") if email is None: raise credentials_exception except JWTError: raise credentials_exception user = await get_user_by_email(db, email) if user is None: raise credentials_exception return UserProfile( id=user.id, email=user.email, full_name=user.full_name, role=user.role )