todo-app / src /services /auth_service.py
SyedFarooqAlii
Update
ca76540
from sqlmodel import Session, select
from uuid import UUID
import logging
from datetime import timedelta
from ..models.user import User, UserCreate, UserRead
from ..core.security import verify_password, get_password_hash, create_access_token
# Set up logging
logger = logging.getLogger(__name__)
class AuthService:
"""
Service class for handling authentication-related business logic and database operations.
"""
def register_user(self, db: Session, user_create: UserCreate) -> UserRead:
"""
Register a new user with the system.
"""
logger.info(f"Registering new user with email: {user_create.email}")
# Check if user already exists
existing_user = self.get_user_by_email(db, user_create.email)
if existing_user:
logger.warning(f"Registration attempt for existing email: {user_create.email}")
raise ValueError("Email already registered")
try:
# Hash the password
hashed_password = get_password_hash(user_create.password)
except Exception as e:
logger.error(f"Error hashing password for user {user_create.email}: {str(e)}")
# Re-raise as a more specific error that can be handled by the controller
raise e
# Create the user
db_user = User(
email=user_create.email,
hashed_password=hashed_password
)
db.add(db_user)
db.commit()
db.refresh(db_user)
logger.info(f"Successfully registered user {db_user.id} with email: {user_create.email}")
return UserRead.from_orm(db_user) if hasattr(UserRead, 'from_orm') else UserRead(
id=db_user.id,
email=db_user.email,
created_at=db_user.created_at,
updated_at=db_user.updated_at
)
def get_user_by_email(self, db: Session, email: str) -> User:
"""
Get a user by their email address.
"""
statement = select(User).where(User.email == email)
result = db.exec(statement)
return result.first()
def authenticate_user(self, db: Session, email: str, password: str) -> dict:
"""
Authenticate user credentials and return JWT token data.
"""
logger.info(f"Authenticating user with email: {email}")
# Get user by email
user = self.get_user_by_email(db, email)
if not user or not verify_password(password, user.hashed_password):
logger.warning(f"Authentication failed for email: {email}")
return None
# Create access token
access_token_expires = timedelta(minutes=30) # This should come from settings
access_token = create_access_token(
data={"sub": str(user.id), "email": user.email, "type": "access"},
expires_delta=access_token_expires
)
logger.info(f"Successfully authenticated user {user.id} with email: {email}")
return {
"access_token": access_token,
"token_type": "bearer"
}
def get_user_by_id(self, db: Session, user_id: UUID) -> UserRead:
"""
Get a user by their ID.
"""
logger.debug(f"Retrieving user by ID: {user_id}")
statement = select(User).where(User.id == user_id)
result = db.exec(statement)
user = result.first()
if user:
logger.debug(f"Found user {user.id} by ID")
return UserRead.from_orm(user) if hasattr(UserRead, 'from_orm') else UserRead(
id=user.id,
email=user.email,
created_at=user.created_at,
updated_at=user.updated_at
)
else:
logger.debug(f"User with ID {user_id} not found")
return None
# Create a singleton instance
auth_service = AuthService()