Desk-Back2 / app /services /auth_service.py
Fred808's picture
Update app/services/auth_service.py
1cc79a2 verified
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from ..models.auth import UserProfile, LoginResponse
from ..core.config import settings
from ..db.crud import get_user_by_email
from ..db.database import get_db, AsyncSession
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/v1/auth/login")
async def authenticate_user(email: str, password: str, db: AsyncSession) -> Optional[UserProfile]:
"""Authenticate a user and return their profile if credentials are valid."""
user = await get_user_by_email(db, email)
if not user or not verify_password(password, user.hashed_password):
return None
return UserProfile(
id=user.id,
email=user.email,
full_name=user.full_name,
role=user.role
)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Generate password hash."""
return pwd_context.hash(password)
def create_access_token(data: dict) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
return encoded_jwt
def create_refresh_token(data: dict) -> str:
"""Create a JWT refresh token."""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({"exp": expire, "refresh": True})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
async def refresh_access_token(refresh_token: str, db: AsyncSession) -> LoginResponse:
"""Create new access token using refresh token."""
try:
payload = jwt.decode(
refresh_token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
if not payload.get("refresh"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid refresh token"
)
email: str = payload.get("sub")
if email is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
user = await get_user_by_email(db, email)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
access_token = create_access_token({"sub": user.email})
return LoginResponse(
access_token=access_token,
token_type="bearer",
user=UserProfile(
id=user.id,
email=user.email,
full_name=user.full_name,
role=user.role
)
)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token"
)
async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)) -> UserProfile:
"""Get the current authenticated user from JWT token."""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
email: str = payload.get("sub")
if email is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = await get_user_by_email(db, email)
if user is None:
raise credentials_exception
return UserProfile(
id=user.id,
email=user.email,
full_name=user.full_name,
role=user.role
)