Spaces:
Paused
Paused
| import json | |
| from collections.abc import Mapping | |
| from datetime import datetime | |
| from typing import Any, Optional | |
| from sqlalchemy import or_ | |
| from core.model_runtime.utils.encoders import jsonable_encoder | |
| from core.tools.entities.api_entities import UserToolProvider | |
| from core.tools.provider.workflow_tool_provider import WorkflowToolProviderController | |
| from core.tools.tool_label_manager import ToolLabelManager | |
| from core.tools.utils.workflow_configuration_sync import WorkflowToolConfigurationUtils | |
| from extensions.ext_database import db | |
| from models.model import App | |
| from models.tools import WorkflowToolProvider | |
| from models.workflow import Workflow | |
| from services.tools.tools_transform_service import ToolTransformService | |
| class WorkflowToolManageService: | |
| """ | |
| Service class for managing workflow tools. | |
| """ | |
| def create_workflow_tool( | |
| *, | |
| user_id: str, | |
| tenant_id: str, | |
| workflow_app_id: str, | |
| name: str, | |
| label: str, | |
| icon: dict, | |
| description: str, | |
| parameters: Mapping[str, Any], | |
| privacy_policy: str = "", | |
| labels: Optional[list[str]] = None, | |
| ) -> dict: | |
| WorkflowToolConfigurationUtils.check_parameter_configurations(parameters) | |
| # check if the name is unique | |
| existing_workflow_tool_provider = ( | |
| db.session.query(WorkflowToolProvider) | |
| .filter( | |
| WorkflowToolProvider.tenant_id == tenant_id, | |
| # name or app_id | |
| or_(WorkflowToolProvider.name == name, WorkflowToolProvider.app_id == workflow_app_id), | |
| ) | |
| .first() | |
| ) | |
| if existing_workflow_tool_provider is not None: | |
| raise ValueError(f"Tool with name {name} or app_id {workflow_app_id} already exists") | |
| app = db.session.query(App).filter(App.id == workflow_app_id, App.tenant_id == tenant_id).first() | |
| if app is None: | |
| raise ValueError(f"App {workflow_app_id} not found") | |
| workflow = app.workflow | |
| if workflow is None: | |
| raise ValueError(f"Workflow not found for app {workflow_app_id}") | |
| workflow_tool_provider = WorkflowToolProvider( | |
| tenant_id=tenant_id, | |
| user_id=user_id, | |
| app_id=workflow_app_id, | |
| name=name, | |
| label=label, | |
| icon=json.dumps(icon), | |
| description=description, | |
| parameter_configuration=json.dumps(parameters), | |
| privacy_policy=privacy_policy, | |
| version=workflow.version, | |
| ) | |
| try: | |
| WorkflowToolProviderController.from_db(workflow_tool_provider) | |
| except Exception as e: | |
| raise ValueError(str(e)) | |
| db.session.add(workflow_tool_provider) | |
| db.session.commit() | |
| return {"result": "success"} | |
| def update_workflow_tool( | |
| cls, | |
| user_id: str, | |
| tenant_id: str, | |
| workflow_tool_id: str, | |
| name: str, | |
| label: str, | |
| icon: dict, | |
| description: str, | |
| parameters: list[dict], | |
| privacy_policy: str = "", | |
| labels: Optional[list[str]] = None, | |
| ) -> dict: | |
| """ | |
| Update a workflow tool. | |
| :param user_id: the user id | |
| :param tenant_id: the tenant id | |
| :param workflow_tool_id: workflow tool id | |
| :param name: name | |
| :param label: label | |
| :param icon: icon | |
| :param description: description | |
| :param parameters: parameters | |
| :param privacy_policy: privacy policy | |
| :param labels: labels | |
| :return: the updated tool | |
| """ | |
| WorkflowToolConfigurationUtils.check_parameter_configurations(parameters) | |
| # check if the name is unique | |
| existing_workflow_tool_provider = ( | |
| db.session.query(WorkflowToolProvider) | |
| .filter( | |
| WorkflowToolProvider.tenant_id == tenant_id, | |
| WorkflowToolProvider.name == name, | |
| WorkflowToolProvider.id != workflow_tool_id, | |
| ) | |
| .first() | |
| ) | |
| if existing_workflow_tool_provider is not None: | |
| raise ValueError(f"Tool with name {name} already exists") | |
| workflow_tool_provider: WorkflowToolProvider = ( | |
| db.session.query(WorkflowToolProvider) | |
| .filter(WorkflowToolProvider.tenant_id == tenant_id, WorkflowToolProvider.id == workflow_tool_id) | |
| .first() | |
| ) | |
| if workflow_tool_provider is None: | |
| raise ValueError(f"Tool {workflow_tool_id} not found") | |
| app: App = ( | |
| db.session.query(App).filter(App.id == workflow_tool_provider.app_id, App.tenant_id == tenant_id).first() | |
| ) | |
| if app is None: | |
| raise ValueError(f"App {workflow_tool_provider.app_id} not found") | |
| workflow: Workflow = app.workflow | |
| if workflow is None: | |
| raise ValueError(f"Workflow not found for app {workflow_tool_provider.app_id}") | |
| workflow_tool_provider.name = name | |
| workflow_tool_provider.label = label | |
| workflow_tool_provider.icon = json.dumps(icon) | |
| workflow_tool_provider.description = description | |
| workflow_tool_provider.parameter_configuration = json.dumps(parameters) | |
| workflow_tool_provider.privacy_policy = privacy_policy | |
| workflow_tool_provider.version = workflow.version | |
| workflow_tool_provider.updated_at = datetime.now() | |
| try: | |
| WorkflowToolProviderController.from_db(workflow_tool_provider) | |
| except Exception as e: | |
| raise ValueError(str(e)) | |
| db.session.add(workflow_tool_provider) | |
| db.session.commit() | |
| if labels is not None: | |
| ToolLabelManager.update_tool_labels( | |
| ToolTransformService.workflow_provider_to_controller(workflow_tool_provider), labels | |
| ) | |
| return {"result": "success"} | |
| def list_tenant_workflow_tools(cls, user_id: str, tenant_id: str) -> list[UserToolProvider]: | |
| """ | |
| List workflow tools. | |
| :param user_id: the user id | |
| :param tenant_id: the tenant id | |
| :return: the list of tools | |
| """ | |
| db_tools = db.session.query(WorkflowToolProvider).filter(WorkflowToolProvider.tenant_id == tenant_id).all() | |
| tools = [] | |
| for provider in db_tools: | |
| try: | |
| tools.append(ToolTransformService.workflow_provider_to_controller(provider)) | |
| except: | |
| # skip deleted tools | |
| pass | |
| labels = ToolLabelManager.get_tools_labels(tools) | |
| result = [] | |
| for tool in tools: | |
| user_tool_provider = ToolTransformService.workflow_provider_to_user_provider( | |
| provider_controller=tool, labels=labels.get(tool.provider_id, []) | |
| ) | |
| ToolTransformService.repack_provider(user_tool_provider) | |
| user_tool_provider.tools = [ | |
| ToolTransformService.tool_to_user_tool( | |
| tool.get_tools(user_id, tenant_id)[0], labels=labels.get(tool.provider_id, []) | |
| ) | |
| ] | |
| result.append(user_tool_provider) | |
| return result | |
| def delete_workflow_tool(cls, user_id: str, tenant_id: str, workflow_tool_id: str) -> dict: | |
| """ | |
| Delete a workflow tool. | |
| :param user_id: the user id | |
| :param tenant_id: the tenant id | |
| :param workflow_app_id: the workflow app id | |
| """ | |
| db.session.query(WorkflowToolProvider).filter( | |
| WorkflowToolProvider.tenant_id == tenant_id, WorkflowToolProvider.id == workflow_tool_id | |
| ).delete() | |
| db.session.commit() | |
| return {"result": "success"} | |
| def get_workflow_tool_by_tool_id(cls, user_id: str, tenant_id: str, workflow_tool_id: str) -> dict: | |
| """ | |
| Get a workflow tool. | |
| :param user_id: the user id | |
| :param tenant_id: the tenant id | |
| :param workflow_app_id: the workflow app id | |
| :return: the tool | |
| """ | |
| db_tool: WorkflowToolProvider = ( | |
| db.session.query(WorkflowToolProvider) | |
| .filter(WorkflowToolProvider.tenant_id == tenant_id, WorkflowToolProvider.id == workflow_tool_id) | |
| .first() | |
| ) | |
| if db_tool is None: | |
| raise ValueError(f"Tool {workflow_tool_id} not found") | |
| workflow_app: App = db.session.query(App).filter(App.id == db_tool.app_id, App.tenant_id == tenant_id).first() | |
| if workflow_app is None: | |
| raise ValueError(f"App {db_tool.app_id} not found") | |
| tool = ToolTransformService.workflow_provider_to_controller(db_tool) | |
| return { | |
| "name": db_tool.name, | |
| "label": db_tool.label, | |
| "workflow_tool_id": db_tool.id, | |
| "workflow_app_id": db_tool.app_id, | |
| "icon": json.loads(db_tool.icon), | |
| "description": db_tool.description, | |
| "parameters": jsonable_encoder(db_tool.parameter_configurations), | |
| "tool": ToolTransformService.tool_to_user_tool( | |
| tool.get_tools(user_id, tenant_id)[0], labels=ToolLabelManager.get_tool_labels(tool) | |
| ), | |
| "synced": workflow_app.workflow.version == db_tool.version, | |
| "privacy_policy": db_tool.privacy_policy, | |
| } | |
| def get_workflow_tool_by_app_id(cls, user_id: str, tenant_id: str, workflow_app_id: str) -> dict: | |
| """ | |
| Get a workflow tool. | |
| :param user_id: the user id | |
| :param tenant_id: the tenant id | |
| :param workflow_app_id: the workflow app id | |
| :return: the tool | |
| """ | |
| db_tool: WorkflowToolProvider = ( | |
| db.session.query(WorkflowToolProvider) | |
| .filter(WorkflowToolProvider.tenant_id == tenant_id, WorkflowToolProvider.app_id == workflow_app_id) | |
| .first() | |
| ) | |
| if db_tool is None: | |
| raise ValueError(f"Tool {workflow_app_id} not found") | |
| workflow_app: App = db.session.query(App).filter(App.id == db_tool.app_id, App.tenant_id == tenant_id).first() | |
| if workflow_app is None: | |
| raise ValueError(f"App {db_tool.app_id} not found") | |
| tool = ToolTransformService.workflow_provider_to_controller(db_tool) | |
| return { | |
| "name": db_tool.name, | |
| "label": db_tool.label, | |
| "workflow_tool_id": db_tool.id, | |
| "workflow_app_id": db_tool.app_id, | |
| "icon": json.loads(db_tool.icon), | |
| "description": db_tool.description, | |
| "parameters": jsonable_encoder(db_tool.parameter_configurations), | |
| "tool": ToolTransformService.tool_to_user_tool( | |
| tool.get_tools(user_id, tenant_id)[0], labels=ToolLabelManager.get_tool_labels(tool) | |
| ), | |
| "synced": workflow_app.workflow.version == db_tool.version, | |
| "privacy_policy": db_tool.privacy_policy, | |
| } | |
| def list_single_workflow_tools(cls, user_id: str, tenant_id: str, workflow_tool_id: str) -> list[dict]: | |
| """ | |
| List workflow tool provider tools. | |
| :param user_id: the user id | |
| :param tenant_id: the tenant id | |
| :param workflow_app_id: the workflow app id | |
| :return: the list of tools | |
| """ | |
| db_tool: WorkflowToolProvider = ( | |
| db.session.query(WorkflowToolProvider) | |
| .filter(WorkflowToolProvider.tenant_id == tenant_id, WorkflowToolProvider.id == workflow_tool_id) | |
| .first() | |
| ) | |
| if db_tool is None: | |
| raise ValueError(f"Tool {workflow_tool_id} not found") | |
| tool = ToolTransformService.workflow_provider_to_controller(db_tool) | |
| return [ | |
| ToolTransformService.tool_to_user_tool( | |
| tool.get_tools(user_id, tenant_id)[0], labels=ToolLabelManager.get_tool_labels(tool) | |
| ) | |
| ] | |