Spaces:
Sleeping
Sleeping
File size: 10,280 Bytes
01d5a5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
"""
Role related services
"""
import logging
from typing import List, Optional
from datetime import datetime
from lpm_kernel.api.domains.loads.load_service import LoadService
from sqlalchemy.exc import IntegrityError
from lpm_kernel.common.repository.database_session import DatabaseSession
from lpm_kernel.api.domains.kernel2.dto.role_dto import (
Role,
RoleDTO,
CreateRoleRequest,
UpdateRoleRequest,
ShareRoleRequest,
generate_role_uuid,
)
from lpm_kernel.api.domains.upload.client import RegistryClient
from lpm_kernel.api.domains.loads.dto import LoadDTO
logger = logging.getLogger(__name__)
class RoleService:
"""Role Service"""
@staticmethod
def create_role(request: CreateRoleRequest) -> Optional[RoleDTO]:
"""
Create new Role
Args:
request: Create Role's request object
Returns:
Optional[RoleDTO]: if success return RoleDTO, else return None
"""
try:
with DatabaseSession.session() as session:
role = Role(
uuid=generate_role_uuid(), # gen uuid
name=request.name,
description=request.description,
system_prompt=request.system_prompt,
icon=request.icon,
enable_l0_retrieval=request.enable_l0_retrieval,
enable_l1_retrieval=request.enable_l1_retrieval,
)
session.add(role)
session.commit()
return RoleDTO.from_model(role)
except IntegrityError:
logger.error(f"Failed to create Role: name '{request.name}' already exists")
return None
except Exception as e:
logger.error(f"Error creating Role: {str(e)}")
return None
# @staticmethod
# def get_role(role_id: int) -> Optional[RoleDTO]:
# """
# Get specific Role
# Args:
# role_id: Role ID
# Returns:
# Optional[RoleDTO]: if found return RoleDTO, else return None
# """
# try:
# with DatabaseSession.session() as session:
# role = session.query(Role).filter(Role.id == role_id).first()
# return RoleDTO.from_model(role) if role else None
# except Exception as e:
# logger.error(f"Error getting Role: {str(e)}")
# return None
@staticmethod
def get_role_by_uuid(uuid: str) -> Optional[RoleDTO]:
"""
get role by UUID
Args:
uuid: Role UUID
Returns:
Optional[RoleDTO]: if found, return RoleDTO, else return None
"""
try:
with DatabaseSession.session() as session:
role = session.query(Role).filter(Role.uuid == uuid).first()
return RoleDTO.from_model(role) if role else None
except Exception as e:
logger.error(f"Error getting Role by UUID: {str(e)}")
return None
@staticmethod
def get_all_roles() -> List[RoleDTO]:
"""
Get all Roles ordered by creation time in descending order
Returns:
List[RoleDTO]: Role List sorted by id desc
"""
try:
with DatabaseSession.session() as session:
query = session.query(Role).order_by(Role.id.desc())
roles = query.all()
return [RoleDTO.from_model(role) for role in roles]
except Exception as e:
logger.error(f"Error getting all Roles: {str(e)}")
return []
@staticmethod
def update_role(role_id: int, request: UpdateRoleRequest) -> Optional[RoleDTO]:
"""
Update Role
Args:
role_id: Role ID
request: Update Role's request object
Returns:
Optional[RoleDTO]: if update success return RoleDTO, else return None
"""
try:
with DatabaseSession.session() as session:
role = session.query(Role).filter(Role.id == role_id).first()
if not role:
return None
# only update not None fields
if request.name is not None:
role.name = request.name
if request.description is not None:
role.description = request.description
if request.system_prompt is not None:
role.system_prompt = request.system_prompt
if request.icon is not None:
role.icon = request.icon
if request.is_active is not None:
role.is_active = request.is_active
role.update_time = datetime.now()
session.commit()
return RoleDTO.from_model(role)
except IntegrityError:
logger.error(f"Failed to update Role: name '{request.name}' already exists")
return None
except Exception as e:
logger.error(f"Error updating Role: {str(e)}")
return None
@staticmethod
def update_role_by_uuid(uuid: str, request: UpdateRoleRequest) -> Optional[RoleDTO]:
"""
update role by UUID
Args:
uuid: Role UUID
request: update Role's request object
Returns:
Optional[RoleDTO]: if success return RoleDTO, else return None
"""
try:
with DatabaseSession.session() as session:
role = session.query(Role).filter(Role.uuid == uuid).first()
if not role:
return None
# only update not-None fields
if request.name is not None:
role.name = request.name
if request.description is not None:
role.description = request.description
if request.system_prompt is not None:
role.system_prompt = request.system_prompt
if request.icon is not None:
role.icon = request.icon
if request.is_active is not None:
role.is_active = request.is_active
if request.enable_l0_retrieval is not None:
role.enable_l0_retrieval = request.enable_l0_retrieval
if request.enable_l1_retrieval is not None:
role.enable_l1_retrieval = request.enable_l1_retrieval
role.update_time = datetime.now()
session.commit()
return RoleDTO.from_model(role)
except IntegrityError:
logger.error(f"Failed to update Role: name '{request.name}' already exists")
return None
except Exception as e:
logger.error(f"Error updating Role: {str(e)}")
return None
@staticmethod
def delete_role(role_id: int) -> bool:
"""
Delete Role
Args:
role_id: Role ID
Returns:
bool: if success
"""
try:
with DatabaseSession.session() as session:
role = session.query(Role).filter(Role.id == role_id).first()
if not role:
return False
session.delete(role)
session.commit()
return True
except Exception as e:
logger.error(f"Error deleting Role: {str(e)}")
return False
@staticmethod
def delete_role_by_uuid(uuid: str) -> bool:
"""
Delete Role by UUID
Args:
uuid: Role UUID
Returns:
bool: if delete successfully
"""
try:
with DatabaseSession.session() as session:
role = session.query(Role).filter(Role.uuid == uuid).first()
if not role:
return False
session.delete(role)
session.commit()
return True
except Exception as e:
logger.error(f"Error deleting Role: {str(e)}")
return False
@staticmethod
def share_role(request: ShareRoleRequest) -> Optional[RoleDTO]:
"""
Share role
Args:
request: Share role request object
Returns:
Optional[RoleDTO]: if success return RoleDTO, else return None
"""
try:
current_load, error, status_code = LoadService.get_current_load()
if error:
logger.error(f"Failed to get current load: {error}")
return None
instance_id = current_load.instance_id
if not instance_id:
logger.error("Instance ID not found in current load")
return None
role_dto = RoleService.get_role_by_uuid(request.role_id)
if not role_dto:
logger.error(f"Role not found with uuid: {request.role_id}")
return None
registry_client = RegistryClient()
logger.info(f"Sharing role {role_dto.name} (ID: {role_dto.uuid}) to registry center . instance_id: {instance_id}")
result = registry_client.create_role(
role_id=role_dto.uuid,
name=role_dto.name,
description=role_dto.description,
system_prompt=role_dto.system_prompt,
icon=role_dto.icon,
instance_id=instance_id,
is_active=role_dto.is_active,
enable_l0_retrieval=role_dto.enable_l0_retrieval,
enable_l1_retrieval=role_dto.enable_l1_retrieval
)
if result:
logger.info(f"Role {role_dto.name} (ID: {role_dto.uuid}) shared successfully")
return role_dto
else:
logger.error(f"Failed to share role: {role_dto.name} (ID: {role_dto.uuid})")
return None
except Exception as e:
logger.error(f"Error sharing Role: {str(e)}", exc_info=True)
return None
# create global RoleService instance
role_service = RoleService()
|