Spaces:
Sleeping
Sleeping
File size: 4,943 Bytes
01d5a5d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
"""
Role related API route
"""
import logging
from flask import Blueprint, request, jsonify
from lpm_kernel.api.common.responses import APIResponse
from lpm_kernel.api.domains.kernel2.dto.role_dto import CreateRoleRequest, UpdateRoleRequest
from lpm_kernel.api.domains.kernel2.dto.role_dto import ShareRoleRequest
from lpm_kernel.api.domains.kernel2.services.role_service import role_service
logger = logging.getLogger(__name__)
role_bp = Blueprint("role", __name__, url_prefix="/api/kernel2/roles")
@role_bp.route("", methods=["POST"])
def create_role():
"""create new Role"""
try:
data = request.get_json()
create_request = CreateRoleRequest.from_dict(data)
role = role_service.create_role(create_request)
if role:
return jsonify(APIResponse.success(role.to_dict()))
else:
return jsonify(APIResponse.error("create role failed, maybe the name existed")), 400
except Exception as e:
logger.error(f"Error creating Role: {str(e)}")
return jsonify(APIResponse.error(f"Error occurred when creating role: {str(e)}")), 500
@role_bp.route("", methods=["GET"])
def get_all_roles():
"""Get all Role"""
try:
roles = role_service.get_all_roles()
return jsonify(APIResponse.success([role.to_dict() for role in roles]))
except Exception as e:
logger.error(f"Error getting Role list: {str(e)}")
return jsonify(APIResponse.error(f"Error occurred when getting Role list: {str(e)}")), 500
@role_bp.route("/<string:uuid>", methods=["GET"])
def get_role(uuid: str):
"""Get specified Role"""
try:
role = role_service.get_role_by_uuid(uuid)
if role:
return jsonify(APIResponse.success(role.to_dict()))
else:
return jsonify(APIResponse.error("Role not existed")), 404
except Exception as e:
logger.error(f"Error getting Role: {str(e)}")
return jsonify(APIResponse.error(f"Error occurred when getting Role: {str(e)}")), 500
@role_bp.route("/<string:uuid>", methods=["PUT"])
def update_role(uuid: str):
"""Update Role"""
try:
data = request.get_json()
update_request = UpdateRoleRequest.from_dict(data)
role = role_service.update_role_by_uuid(uuid, update_request)
if role:
return jsonify(APIResponse.success(role.to_dict()))
else:
return jsonify(
APIResponse.error(
"Error occurred when updating Role,Role does not exist or the role name is already in use")
), 400
except Exception as e:
logger.error(f"Error updating Role: {str(e)}")
return jsonify(APIResponse.error(f"Error occurred when updating Role: {str(e)}")), 500
@role_bp.route("/<string:uuid>", methods=["DELETE"])
def delete_role(uuid: str):
"""Delete Role"""
try:
success = role_service.delete_role_by_uuid(uuid)
if success:
return jsonify(APIResponse.success("Role deletion successed"))
else:
return jsonify(APIResponse.error("Role not existed")), 404
except Exception as e:
logger.error(f"Error deleting Role: {str(e)}")
return jsonify(APIResponse.error(f"Error occurred when deleting Role: {str(e)}")), 500
@role_bp.route("/share", methods=["POST"])
def share_role():
"""
Share role to registry center
This API shares local role information to the remote registry center, making it accessible to other instances.
Request body:
{
"role_id": "string" // Required, UUID of the role to be shared
}
Response:
Success:
{
"code": 0,
"message": "success",
"data": {
"id": 1,
"uuid": "role_abc123",
"name": "Role Name",
"description": "Role Description",
"system_prompt": "System Prompt",
"icon": "Icon URL",
"is_active": true,
"enable_l0_retrieval": true,
"enable_l1_retrieval": true,
"create_time": "2025-03-17T15:24:42",
"update_time": "2025-03-17T15:24:42"
}
}
Failure:
{
"code": -1,
"message": "share role failed",
"data": null
}
Possible errors:
- 400: Share role failed, possibly due to non-existent role or remote service error
- 500: Internal server error
"""
try:
data = request.get_json()
share_request = ShareRoleRequest.from_dict(data)
role = role_service.share_role(share_request)
if role:
return jsonify(APIResponse.success(role.to_dict()))
else:
return jsonify(APIResponse.error("share role failed")), 400
except Exception as e:
logger.error(f"Error sharing Role: {str(e)}")
return jsonify(APIResponse.error(f"Error occurred when sharing role: {str(e)}")), 500
|