""" Role related services """ import logging from typing import List, Optional from datetime import datetime from lpm_kernel.api.domains.loads.load_service import LoadService from sqlalchemy.exc import IntegrityError from lpm_kernel.common.repository.database_session import DatabaseSession from lpm_kernel.api.domains.kernel2.dto.role_dto import ( Role, RoleDTO, CreateRoleRequest, UpdateRoleRequest, ShareRoleRequest, generate_role_uuid, ) from lpm_kernel.api.domains.upload.client import RegistryClient from lpm_kernel.api.domains.loads.dto import LoadDTO logger = logging.getLogger(__name__) class RoleService: """Role Service""" @staticmethod def create_role(request: CreateRoleRequest) -> Optional[RoleDTO]: """ Create new Role Args: request: Create Role's request object Returns: Optional[RoleDTO]: if success return RoleDTO, else return None """ try: with DatabaseSession.session() as session: role = Role( uuid=generate_role_uuid(), # gen uuid name=request.name, description=request.description, system_prompt=request.system_prompt, icon=request.icon, enable_l0_retrieval=request.enable_l0_retrieval, enable_l1_retrieval=request.enable_l1_retrieval, ) session.add(role) session.commit() return RoleDTO.from_model(role) except IntegrityError: logger.error(f"Failed to create Role: name '{request.name}' already exists") return None except Exception as e: logger.error(f"Error creating Role: {str(e)}") return None # @staticmethod # def get_role(role_id: int) -> Optional[RoleDTO]: # """ # Get specific Role # Args: # role_id: Role ID # Returns: # Optional[RoleDTO]: if found return RoleDTO, else return None # """ # try: # with DatabaseSession.session() as session: # role = session.query(Role).filter(Role.id == role_id).first() # return RoleDTO.from_model(role) if role else None # except Exception as e: # logger.error(f"Error getting Role: {str(e)}") # return None @staticmethod def get_role_by_uuid(uuid: str) -> Optional[RoleDTO]: """ get role by UUID Args: uuid: Role UUID Returns: Optional[RoleDTO]: if found, return RoleDTO, else return None """ try: with DatabaseSession.session() as session: role = session.query(Role).filter(Role.uuid == uuid).first() return RoleDTO.from_model(role) if role else None except Exception as e: logger.error(f"Error getting Role by UUID: {str(e)}") return None @staticmethod def get_all_roles() -> List[RoleDTO]: """ Get all Roles ordered by creation time in descending order Returns: List[RoleDTO]: Role List sorted by id desc """ try: with DatabaseSession.session() as session: query = session.query(Role).order_by(Role.id.desc()) roles = query.all() return [RoleDTO.from_model(role) for role in roles] except Exception as e: logger.error(f"Error getting all Roles: {str(e)}") return [] @staticmethod def update_role(role_id: int, request: UpdateRoleRequest) -> Optional[RoleDTO]: """ Update Role Args: role_id: Role ID request: Update Role's request object Returns: Optional[RoleDTO]: if update success return RoleDTO, else return None """ try: with DatabaseSession.session() as session: role = session.query(Role).filter(Role.id == role_id).first() if not role: return None # only update not None fields if request.name is not None: role.name = request.name if request.description is not None: role.description = request.description if request.system_prompt is not None: role.system_prompt = request.system_prompt if request.icon is not None: role.icon = request.icon if request.is_active is not None: role.is_active = request.is_active role.update_time = datetime.now() session.commit() return RoleDTO.from_model(role) except IntegrityError: logger.error(f"Failed to update Role: name '{request.name}' already exists") return None except Exception as e: logger.error(f"Error updating Role: {str(e)}") return None @staticmethod def update_role_by_uuid(uuid: str, request: UpdateRoleRequest) -> Optional[RoleDTO]: """ update role by UUID Args: uuid: Role UUID request: update Role's request object Returns: Optional[RoleDTO]: if success return RoleDTO, else return None """ try: with DatabaseSession.session() as session: role = session.query(Role).filter(Role.uuid == uuid).first() if not role: return None # only update not-None fields if request.name is not None: role.name = request.name if request.description is not None: role.description = request.description if request.system_prompt is not None: role.system_prompt = request.system_prompt if request.icon is not None: role.icon = request.icon if request.is_active is not None: role.is_active = request.is_active if request.enable_l0_retrieval is not None: role.enable_l0_retrieval = request.enable_l0_retrieval if request.enable_l1_retrieval is not None: role.enable_l1_retrieval = request.enable_l1_retrieval role.update_time = datetime.now() session.commit() return RoleDTO.from_model(role) except IntegrityError: logger.error(f"Failed to update Role: name '{request.name}' already exists") return None except Exception as e: logger.error(f"Error updating Role: {str(e)}") return None @staticmethod def delete_role(role_id: int) -> bool: """ Delete Role Args: role_id: Role ID Returns: bool: if success """ try: with DatabaseSession.session() as session: role = session.query(Role).filter(Role.id == role_id).first() if not role: return False session.delete(role) session.commit() return True except Exception as e: logger.error(f"Error deleting Role: {str(e)}") return False @staticmethod def delete_role_by_uuid(uuid: str) -> bool: """ Delete Role by UUID Args: uuid: Role UUID Returns: bool: if delete successfully """ try: with DatabaseSession.session() as session: role = session.query(Role).filter(Role.uuid == uuid).first() if not role: return False session.delete(role) session.commit() return True except Exception as e: logger.error(f"Error deleting Role: {str(e)}") return False @staticmethod def share_role(request: ShareRoleRequest) -> Optional[RoleDTO]: """ Share role Args: request: Share role request object Returns: Optional[RoleDTO]: if success return RoleDTO, else return None """ try: current_load, error, status_code = LoadService.get_current_load() if error: logger.error(f"Failed to get current load: {error}") return None instance_id = current_load.instance_id if not instance_id: logger.error("Instance ID not found in current load") return None role_dto = RoleService.get_role_by_uuid(request.role_id) if not role_dto: logger.error(f"Role not found with uuid: {request.role_id}") return None registry_client = RegistryClient() logger.info(f"Sharing role {role_dto.name} (ID: {role_dto.uuid}) to registry center . instance_id: {instance_id}") result = registry_client.create_role( role_id=role_dto.uuid, name=role_dto.name, description=role_dto.description, system_prompt=role_dto.system_prompt, icon=role_dto.icon, instance_id=instance_id, is_active=role_dto.is_active, enable_l0_retrieval=role_dto.enable_l0_retrieval, enable_l1_retrieval=role_dto.enable_l1_retrieval ) if result: logger.info(f"Role {role_dto.name} (ID: {role_dto.uuid}) shared successfully") return role_dto else: logger.error(f"Failed to share role: {role_dto.name} (ID: {role_dto.uuid})") return None except Exception as e: logger.error(f"Error sharing Role: {str(e)}", exc_info=True) return None # create global RoleService instance role_service = RoleService()