Spaces:
Paused
Paused
| import json | |
| import uuid | |
| from typing import Optional | |
| from fastapi import HTTPException, status | |
| from sqlalchemy.orm import Session | |
| from app.modules.users.repository import UserRepository | |
| from app.modules.users.schemas import UserCreate, UserUpdate | |
| from core.models import User | |
| from core.security.hashing import hash_password | |
| class UserService: | |
| def __init__(self, db: Session): | |
| self.repo = UserRepository(db) | |
| def get_user(self, user_id: str) -> User: | |
| user = self.repo.get_by_id(user_id) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| return user | |
| def get_user_by_email(self, email: str) -> Optional[User]: | |
| return self.repo.get_by_email(email) | |
| def get_users_paginated( | |
| self, offset: int = 0, limit: int = 100, filters: dict = None | |
| ) -> tuple[list[User], int]: | |
| return self.repo.get_paginated(offset, limit, filters) | |
| def get_users( | |
| self, skip: int = 0, limit: int = 100, filters: dict = None | |
| ) -> list[User]: | |
| # Legacy method compatibility | |
| users, _ = self.repo.get_paginated(skip, limit, filters) | |
| return users | |
| def create_user(self, user_in: UserCreate) -> User: | |
| if self.repo.get_by_email(user_in.email): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="The user with this username already exists in the system.", | |
| ) | |
| user_data = user_in.model_dump() | |
| password = user_data.pop("password") | |
| user_data["password_hash"] = hash_password(password) | |
| user_data["id"] = str(uuid.uuid4()) | |
| return self.repo.create(user_data) | |
| def update_user(self, user_id: str, user_in: UserUpdate) -> User: | |
| user = self.get_user(user_id) | |
| user_data = user_in.model_dump(exclude_unset=True) | |
| # This part requires repo.update method which was missing in the basic copy. | |
| # I need to update the repo to support updates. | |
| # For now, I'll do it manually here relying on session tracking, but really repo should handle it. | |
| for field, value in user_data.items(): | |
| setattr(user, field, value) | |
| self.repo.session.add(user) | |
| self.repo.session.commit() | |
| self.repo.session.refresh(user) | |
| return user | |
| def update_user_preferences(self, user_id: str, preferences: dict) -> bool: | |
| """ | |
| Update user preferences. | |
| """ | |
| try: | |
| user = self.get_user(user_id) | |
| if not isinstance(preferences, dict): | |
| raise ValueError("Preferences must be a dictionary") | |
| # Get existing preferences | |
| existing_prefs_raw = getattr(user, "preferences", "{}") | |
| if not existing_prefs_raw: | |
| existing_prefs_raw = "{}" | |
| try: | |
| existing_prefs = ( | |
| json.loads(existing_prefs_raw) | |
| if isinstance(existing_prefs_raw, str) | |
| else existing_prefs_raw | |
| ) | |
| except (json.JSONDecodeError, TypeError): | |
| existing_prefs = {} | |
| # Merge and serialize | |
| updated_prefs = {**existing_prefs, **preferences} | |
| user.preferences = json.dumps(updated_prefs) | |
| self.repo.session.add(user) | |
| self.repo.session.commit() | |
| return True | |
| except Exception as e: | |
| self.repo.session.rollback() | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to update user preferences: {str(e)}", | |
| ) | |
| def get_user_preferences(self, user_id: str) -> dict: | |
| """ | |
| Get user preferences. | |
| """ | |
| user = self.get_user(user_id) | |
| prefs_raw = getattr(user, "preferences", "{}") | |
| if not prefs_raw: | |
| return {} | |
| try: | |
| return json.loads(prefs_raw) if isinstance(prefs_raw, str) else prefs_raw | |
| except (json.JSONDecodeError, TypeError): | |
| return {} | |