zenith-backend / app /modules /users /service.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
import json
import uuid
from typing import Optional
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from app.modules.users.repository import UserRepository
from app.modules.users.schemas import UserCreate, UserUpdate
from core.models import User
from core.security.hashing import hash_password
class UserService:
def __init__(self, db: Session):
self.repo = UserRepository(db)
def get_user(self, user_id: str) -> User:
user = self.repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
def get_user_by_email(self, email: str) -> Optional[User]:
return self.repo.get_by_email(email)
def get_users_paginated(
self, offset: int = 0, limit: int = 100, filters: dict = None
) -> tuple[list[User], int]:
return self.repo.get_paginated(offset, limit, filters)
def get_users(
self, skip: int = 0, limit: int = 100, filters: dict = None
) -> list[User]:
# Legacy method compatibility
users, _ = self.repo.get_paginated(skip, limit, filters)
return users
def create_user(self, user_in: UserCreate) -> User:
if self.repo.get_by_email(user_in.email):
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system.",
)
user_data = user_in.model_dump()
password = user_data.pop("password")
user_data["password_hash"] = hash_password(password)
user_data["id"] = str(uuid.uuid4())
return self.repo.create(user_data)
def update_user(self, user_id: str, user_in: UserUpdate) -> User:
user = self.get_user(user_id)
user_data = user_in.model_dump(exclude_unset=True)
# This part requires repo.update method which was missing in the basic copy.
# I need to update the repo to support updates.
# For now, I'll do it manually here relying on session tracking, but really repo should handle it.
for field, value in user_data.items():
setattr(user, field, value)
self.repo.session.add(user)
self.repo.session.commit()
self.repo.session.refresh(user)
return user
def update_user_preferences(self, user_id: str, preferences: dict) -> bool:
"""
Update user preferences.
"""
try:
user = self.get_user(user_id)
if not isinstance(preferences, dict):
raise ValueError("Preferences must be a dictionary")
# Get existing preferences
existing_prefs_raw = getattr(user, "preferences", "{}")
if not existing_prefs_raw:
existing_prefs_raw = "{}"
try:
existing_prefs = (
json.loads(existing_prefs_raw)
if isinstance(existing_prefs_raw, str)
else existing_prefs_raw
)
except (json.JSONDecodeError, TypeError):
existing_prefs = {}
# Merge and serialize
updated_prefs = {**existing_prefs, **preferences}
user.preferences = json.dumps(updated_prefs)
self.repo.session.add(user)
self.repo.session.commit()
return True
except Exception as e:
self.repo.session.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update user preferences: {str(e)}",
)
def get_user_preferences(self, user_id: str) -> dict:
"""
Get user preferences.
"""
user = self.get_user(user_id)
prefs_raw = getattr(user, "preferences", "{}")
if not prefs_raw:
return {}
try:
return json.loads(prefs_raw) if isinstance(prefs_raw, str) else prefs_raw
except (json.JSONDecodeError, TypeError):
return {}