| from sqlalchemy.ext.asyncio import AsyncSession |
| from sqlalchemy.future import select |
| from sqlalchemy import update |
| from passlib.context import CryptContext |
| from typing import Optional, List, Dict, Any |
| import logging |
|
|
| from src.models.user import User |
| from src.api.schemas import UserCreate, UserUpdate, UserInDB |
|
|
| |
| logger = logging.getLogger(__name__) |
|
|
| |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
|
|
| def verify_password(plain_password: str, hashed_password: str) -> bool: |
| """ |
| Verify password against hash. |
| |
| Args: |
| plain_password: Plain password |
| hashed_password: Hashed password |
| |
| Returns: |
| bool: True if password is correct |
| """ |
| return pwd_context.verify(plain_password, hashed_password) |
|
|
| def get_password_hash(password: str) -> str: |
| """ |
| Hash password. |
| |
| Args: |
| password: Plain password |
| |
| Returns: |
| str: Hashed password |
| """ |
| return pwd_context.hash(password) |
|
|
| async def get_user_by_username(db: AsyncSession, username: str) -> Optional[UserInDB]: |
| """ |
| Get user by username. |
| |
| Args: |
| db: Database session |
| username: Username |
| |
| Returns: |
| Optional[UserInDB]: User if found, None otherwise |
| """ |
| try: |
| result = await db.execute(select(User).where(User.username == username)) |
| user = result.scalars().first() |
| |
| if not user: |
| return None |
| |
| |
| user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns} |
| return UserInDB(**user_dict) |
| except Exception as e: |
| logger.error(f"Error getting user by username: {e}") |
| return None |
|
|
| async def authenticate_user(db: AsyncSession, username: str, password: str) -> Optional[UserInDB]: |
| """ |
| Authenticate user. |
| |
| Args: |
| db: Database session |
| username: Username |
| password: Plain password |
| |
| Returns: |
| Optional[UserInDB]: User if authenticated, None otherwise |
| """ |
| user = await get_user_by_username(db, username) |
| |
| if not user: |
| return None |
| |
| if not verify_password(password, user.hashed_password): |
| return None |
| |
| return user |
|
|
| async def create_user(db: AsyncSession, user_data: UserCreate) -> Optional[UserInDB]: |
| """ |
| Create a new user. |
| |
| Args: |
| db: Database session |
| user_data: User data |
| |
| Returns: |
| Optional[UserInDB]: Created user |
| """ |
| try: |
| |
| existing_user = await get_user_by_username(db, user_data.username) |
| if existing_user: |
| return None |
| |
| |
| hashed_password = get_password_hash(user_data.password) |
| user = User( |
| username=user_data.username, |
| email=user_data.email, |
| full_name=user_data.full_name, |
| hashed_password=hashed_password, |
| is_active=user_data.is_active |
| ) |
| |
| db.add(user) |
| await db.commit() |
| await db.refresh(user) |
| |
| |
| user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns} |
| return UserInDB(**user_dict) |
| except Exception as e: |
| logger.error(f"Error creating user: {e}") |
| await db.rollback() |
| return None |
|
|
| async def update_user(db: AsyncSession, user_id: int, user_data: UserUpdate) -> Optional[UserInDB]: |
| """ |
| Update user. |
| |
| Args: |
| db: Database session |
| user_id: User ID |
| user_data: User data |
| |
| Returns: |
| Optional[UserInDB]: Updated user |
| """ |
| try: |
| |
| update_data = user_data.dict(exclude_unset=True) |
| |
| |
| if "password" in update_data: |
| update_data["hashed_password"] = get_password_hash(update_data.pop("password")) |
| |
| |
| stmt = update(User).where(User.id == user_id).values(**update_data) |
| await db.execute(stmt) |
| await db.commit() |
| |
| |
| result = await db.execute(select(User).where(User.id == user_id)) |
| user = result.scalars().first() |
| |
| if not user: |
| return None |
| |
| |
| user_dict = {c.name: getattr(user, c.name) for c in user.__table__.columns} |
| return UserInDB(**user_dict) |
| except Exception as e: |
| logger.error(f"Error updating user: {e}") |
| await db.rollback() |
| return None |