| | from collections.abc import Generator, Mapping |
| | from typing import Any, Union |
| |
|
| | from openai._exceptions import RateLimitError |
| |
|
| | from configs import dify_config |
| | from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator |
| | from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator |
| | from core.app.apps.chat.app_generator import ChatAppGenerator |
| | from core.app.apps.completion.app_generator import CompletionAppGenerator |
| | from core.app.apps.workflow.app_generator import WorkflowAppGenerator |
| | from core.app.entities.app_invoke_entities import InvokeFrom |
| | from core.app.features.rate_limiting import RateLimit |
| | from models.model import Account, App, AppMode, EndUser |
| | from models.workflow import Workflow |
| | from services.errors.llm import InvokeRateLimitError |
| | from services.workflow_service import WorkflowService |
| |
|
| |
|
| | class AppGenerateService: |
| | @classmethod |
| | def generate( |
| | cls, |
| | app_model: App, |
| | user: Union[Account, EndUser], |
| | args: Mapping[str, Any], |
| | invoke_from: InvokeFrom, |
| | streaming: bool = True, |
| | ): |
| | """ |
| | App Content Generate |
| | :param app_model: app model |
| | :param user: user |
| | :param args: args |
| | :param invoke_from: invoke from |
| | :param streaming: streaming |
| | :return: |
| | """ |
| | max_active_request = AppGenerateService._get_max_active_requests(app_model) |
| | rate_limit = RateLimit(app_model.id, max_active_request) |
| | request_id = RateLimit.gen_request_key() |
| | try: |
| | request_id = rate_limit.enter(request_id) |
| | if app_model.mode == AppMode.COMPLETION.value: |
| | return rate_limit.generate( |
| | CompletionAppGenerator().generate( |
| | app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming |
| | ), |
| | request_id, |
| | ) |
| | elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent: |
| | return rate_limit.generate( |
| | AgentChatAppGenerator().generate( |
| | app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming |
| | ), |
| | request_id, |
| | ) |
| | elif app_model.mode == AppMode.CHAT.value: |
| | return rate_limit.generate( |
| | ChatAppGenerator().generate( |
| | app_model=app_model, user=user, args=args, invoke_from=invoke_from, stream=streaming |
| | ), |
| | request_id, |
| | ) |
| | elif app_model.mode == AppMode.ADVANCED_CHAT.value: |
| | workflow = cls._get_workflow(app_model, invoke_from) |
| | return rate_limit.generate( |
| | AdvancedChatAppGenerator().generate( |
| | app_model=app_model, |
| | workflow=workflow, |
| | user=user, |
| | args=args, |
| | invoke_from=invoke_from, |
| | stream=streaming, |
| | ), |
| | request_id, |
| | ) |
| | elif app_model.mode == AppMode.WORKFLOW.value: |
| | workflow = cls._get_workflow(app_model, invoke_from) |
| | return rate_limit.generate( |
| | WorkflowAppGenerator().generate( |
| | app_model=app_model, |
| | workflow=workflow, |
| | user=user, |
| | args=args, |
| | invoke_from=invoke_from, |
| | stream=streaming, |
| | ), |
| | request_id, |
| | ) |
| | else: |
| | raise ValueError(f"Invalid app mode {app_model.mode}") |
| | except RateLimitError as e: |
| | raise InvokeRateLimitError(str(e)) |
| | finally: |
| | if not streaming: |
| | rate_limit.exit(request_id) |
| |
|
| | @staticmethod |
| | def _get_max_active_requests(app_model: App) -> int: |
| | max_active_requests = app_model.max_active_requests |
| | if app_model.max_active_requests is None: |
| | max_active_requests = int(dify_config.APP_MAX_ACTIVE_REQUESTS) |
| | return max_active_requests |
| |
|
| | @classmethod |
| | def generate_single_iteration(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True): |
| | if app_model.mode == AppMode.ADVANCED_CHAT.value: |
| | workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) |
| | return AdvancedChatAppGenerator().single_iteration_generate( |
| | app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming |
| | ) |
| | elif app_model.mode == AppMode.WORKFLOW.value: |
| | workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER) |
| | return WorkflowAppGenerator().single_iteration_generate( |
| | app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, stream=streaming |
| | ) |
| | else: |
| | raise ValueError(f"Invalid app mode {app_model.mode}") |
| |
|
| | @classmethod |
| | def generate_more_like_this( |
| | cls, |
| | app_model: App, |
| | user: Union[Account, EndUser], |
| | message_id: str, |
| | invoke_from: InvokeFrom, |
| | streaming: bool = True, |
| | ) -> Union[dict, Generator]: |
| | """ |
| | Generate more like this |
| | :param app_model: app model |
| | :param user: user |
| | :param message_id: message id |
| | :param invoke_from: invoke from |
| | :param streaming: streaming |
| | :return: |
| | """ |
| | return CompletionAppGenerator().generate_more_like_this( |
| | app_model=app_model, message_id=message_id, user=user, invoke_from=invoke_from, stream=streaming |
| | ) |
| |
|
| | @classmethod |
| | def _get_workflow(cls, app_model: App, invoke_from: InvokeFrom) -> Workflow: |
| | """ |
| | Get workflow |
| | :param app_model: app model |
| | :param invoke_from: invoke from |
| | :return: |
| | """ |
| | workflow_service = WorkflowService() |
| | if invoke_from == InvokeFrom.DEBUGGER: |
| | |
| | workflow = workflow_service.get_draft_workflow(app_model=app_model) |
| |
|
| | if not workflow: |
| | raise ValueError("Workflow not initialized") |
| | else: |
| | |
| | workflow = workflow_service.get_published_workflow(app_model=app_model) |
| |
|
| | if not workflow: |
| | raise ValueError("Workflow not published") |
| |
|
| | return workflow |
| |
|