Spaces:
Sleeping
Sleeping
File size: 2,721 Bytes
42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 42cffde d3530f3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# from src.interfaces.auth import IPagination
# from src.interfaces.user import ICreateUser, IFilterUser, IUpdateUser
# from src.repositories.user import UserRepository
# from src.dtos.user import UserDto
# from src.models.user import User
# from src.utils.password import PasswordUtils
# from src.utils.exceptions import BadRequestException
# from typing import List, Optional
# from sqlalchemy.ext.asyncio import AsyncSession
# class UserService:
# def __init__(self, repo: UserRepository):
# self.repo = repo
# async def store(self, db: AsyncSession, data: ICreateUser) -> UserDto:
# await self.validate_unique_username(db, data.username)
# hashed_password = PasswordUtils.hash_password(data.password)
# user_data = data.model_copy(update={"password": hashed_password})
# new_user = await self.repo.store(db, data=user_data)
# return UserDto.model_validate(new_user)
# async def update(self, db: AsyncSession, user_id: int, data: IUpdateUser) -> UserDto:
# user = await self.find_or_fail(db, user_id)
# if data.password:
# hashed_password = PasswordUtils.hash_password(data.password)
# data.update(password= hashed_password)
# updated_user = await self.repo.update(db, user, data)
# return updated_user
# async def delete(self, db: AsyncSession, user_id: int):
# user = await self.find_or_fail(db, user_id)
# await self.repo.delete(db, user)
# async def get_one(self, db: AsyncSession, filter_data: IFilterUser) -> Optional[UserDto]:
# user = await self.repo.get_one(db, filter_data)
# if user is None:
# return None
# else:
# return user
# async def get_many(self, db: AsyncSession, paging: IPagination, filter_data: IFilterUser) -> List[UserDto]:
# list_user = await self.repo.get_many(db, paging, filter_data.user_id)
# return list(map(UserDto.model_validate, list_user))
# async def validate_unique_username(self, db: AsyncSession, username: str):
# old_user = await self.repo.get_one(db, IFilterUser(username=username))
# if old_user:
# raise BadRequestException('username_already_exists')
# async def find_by_pk(self, db: AsyncSession, user_id: int):
# return await self.repo.find_by_pk(db, user_id)
# async def find_or_fail(self, db: AsyncSession, user_id: int) -> User:
# user = await self.find_by_pk(db, user_id)
# if not user:
# raise BadRequestException('user_not_found')
# return user
# def get_user_service() -> UserService:
# user_repo = UserRepository()
# return UserService(user_repo)
|