import json import uuid from typing import Optional from fastapi import HTTPException, status from sqlalchemy.orm import Session from app.modules.users.repository import UserRepository from app.modules.users.schemas import UserCreate, UserUpdate from core.models import User from core.security.hashing import hash_password class UserService: def __init__(self, db: Session): self.repo = UserRepository(db) def get_user(self, user_id: str) -> User: user = self.repo.get_by_id(user_id) if not user: raise HTTPException(status_code=404, detail="User not found") return user def get_user_by_email(self, email: str) -> Optional[User]: return self.repo.get_by_email(email) def get_users_paginated( self, offset: int = 0, limit: int = 100, filters: dict = None ) -> tuple[list[User], int]: return self.repo.get_paginated(offset, limit, filters) def get_users( self, skip: int = 0, limit: int = 100, filters: dict = None ) -> list[User]: # Legacy method compatibility users, _ = self.repo.get_paginated(skip, limit, filters) return users def create_user(self, user_in: UserCreate) -> User: if self.repo.get_by_email(user_in.email): raise HTTPException( status_code=400, detail="The user with this username already exists in the system.", ) user_data = user_in.model_dump() password = user_data.pop("password") user_data["password_hash"] = hash_password(password) user_data["id"] = str(uuid.uuid4()) return self.repo.create(user_data) def update_user(self, user_id: str, user_in: UserUpdate) -> User: user = self.get_user(user_id) user_data = user_in.model_dump(exclude_unset=True) # This part requires repo.update method which was missing in the basic copy. # I need to update the repo to support updates. # For now, I'll do it manually here relying on session tracking, but really repo should handle it. for field, value in user_data.items(): setattr(user, field, value) self.repo.session.add(user) self.repo.session.commit() self.repo.session.refresh(user) return user def update_user_preferences(self, user_id: str, preferences: dict) -> bool: """ Update user preferences. """ try: user = self.get_user(user_id) if not isinstance(preferences, dict): raise ValueError("Preferences must be a dictionary") # Get existing preferences existing_prefs_raw = getattr(user, "preferences", "{}") if not existing_prefs_raw: existing_prefs_raw = "{}" try: existing_prefs = ( json.loads(existing_prefs_raw) if isinstance(existing_prefs_raw, str) else existing_prefs_raw ) except (json.JSONDecodeError, TypeError): existing_prefs = {} # Merge and serialize updated_prefs = {**existing_prefs, **preferences} user.preferences = json.dumps(updated_prefs) self.repo.session.add(user) self.repo.session.commit() return True except Exception as e: self.repo.session.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update user preferences: {str(e)}", ) def get_user_preferences(self, user_id: str) -> dict: """ Get user preferences. """ user = self.get_user(user_id) prefs_raw = getattr(user, "preferences", "{}") if not prefs_raw: return {} try: return json.loads(prefs_raw) if isinstance(prefs_raw, str) else prefs_raw except (json.JSONDecodeError, TypeError): return {}