Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import copy | |
| import json | |
| import random | |
| import re | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple | |
| COMPONENT_NODE_TYPE = "system/component" | |
| COMPONENT_INPUT_TYPE = "system/component-input" | |
| COMPONENT_OUTPUT_TYPE = "system/component-output" | |
| FLOW_FORMAT = "nodes-ui-flow" | |
| DEFAULT_ASSISTANT_ROLE = ( | |
| "Ты компьютерный ИИ-ассистент. Отвечай кратко, естественно, одним живым абзацем, " | |
| "без Markdown, списков, заголовков и нумерации." | |
| ) | |
| class WorkflowRuntimeError(Exception): | |
| """Raised when the Python workflow runtime cannot continue safely.""" | |
| EventCallback = Optional[Callable[[dict], None]] | |
| class WorkflowRunState: | |
| graph: dict | |
| incoming: Dict[str, List[dict]] | |
| outgoing: Dict[str, List[dict]] | |
| feedback_outgoing: Dict[str, List[dict]] | |
| order: List[str] | |
| cursor: int = 0 | |
| outputs_by_node: Dict[str, dict] = field(default_factory=dict) | |
| input_overrides_by_node: Dict[str, dict] = field(default_factory=dict) | |
| memory: Dict[str, Any] = field(default_factory=dict) | |
| assistant_role: str = DEFAULT_ASSISTANT_ROLE | |
| retry_counts_by_node: Dict[str, int] = field(default_factory=dict) | |
| component_runs_by_node: Dict[str, "WorkflowRunState"] = field(default_factory=dict) | |
| pending_question: Optional[dict] = None | |
| completed: bool = False | |
| def parse_workflow_graph(workflow: dict, graph_path: Optional[List[str]] = None) -> dict: | |
| if not isinstance(workflow, dict): | |
| raise WorkflowRuntimeError("Invalid workflow payload.") | |
| if workflow.get("format") == FLOW_FORMAT and isinstance(workflow.get("rootGraph"), dict): | |
| graph = workflow["rootGraph"] | |
| elif isinstance(workflow.get("nodes"), list) and isinstance(workflow.get("edges"), list): | |
| graph = workflow | |
| else: | |
| raise WorkflowRuntimeError("Unsupported workflow format for Python runtime.") | |
| for node_id in graph_path or []: | |
| component = next( | |
| ( | |
| node | |
| for node in graph.get("nodes", []) | |
| if node.get("id") == node_id and node.get("type") == COMPONENT_NODE_TYPE | |
| ), | |
| None, | |
| ) | |
| if not component: | |
| raise WorkflowRuntimeError(f"Component graph was not found: {node_id}") | |
| graph = component.get("data", {}).get("subgraph") or create_component_subgraph() | |
| return copy.deepcopy(graph) | |
| def create_component_subgraph() -> dict: | |
| return {"nodes": [], "edges": [], "viewport": None} | |
| def is_component_node_type(node_type: str) -> bool: | |
| return node_type == COMPONENT_NODE_TYPE | |
| def is_component_boundary_type(node_type: str) -> bool: | |
| return node_type in {COMPONENT_INPUT_TYPE, COMPONENT_OUTPUT_TYPE} | |
| def normalize_edge(edge: dict) -> dict: | |
| return { | |
| **edge, | |
| "id": str(edge.get("id", "")), | |
| "source": str(edge.get("source", "")), | |
| "target": str(edge.get("target", "")), | |
| "sourceHandle": edge.get("sourceHandle"), | |
| "targetHandle": edge.get("targetHandle"), | |
| "type": "default" if edge.get("type") == "smoothstep" else edge.get("type", "default"), | |
| } | |
| def hydrate_graph(graph: dict) -> dict: | |
| return { | |
| **(graph or {}), | |
| "nodes": [hydrate_node(node) for node in (graph or {}).get("nodes", [])], | |
| "edges": [normalize_edge(edge) for edge in (graph or {}).get("edges", [])], | |
| "viewport": (graph or {}).get("viewport"), | |
| } | |
| def hydrate_node(node: dict) -> dict: | |
| next_node = copy.deepcopy(node) | |
| next_node["id"] = str(next_node.get("id", "")) | |
| next_node.setdefault("data", {}) | |
| if next_node.get("type") == COMPONENT_NODE_TYPE: | |
| subgraph = next_node["data"].get("subgraph") | |
| next_node["data"]["subgraph"] = hydrate_graph(subgraph or create_component_subgraph()) | |
| return next_node | |
| def create_runtime(node_type: str) -> dict: | |
| return { | |
| "type": node_type, | |
| "status": "idle", | |
| "error": "", | |
| "inputs": {}, | |
| "outputs": {}, | |
| } | |
| def prepare_graph_for_run(graph: dict) -> dict: | |
| working_graph = hydrate_graph(graph) | |
| nodes = [] | |
| for node in working_graph.get("nodes", []): | |
| next_node = copy.deepcopy(node) | |
| runtime = create_runtime(next_node.get("type", "")) | |
| if next_node.get("type") == "basic/dialog": | |
| runtime.update({"messages": [], "cleared": True}) | |
| next_node.setdefault("data", {}) | |
| next_node["data"]["runtime"] = runtime | |
| nodes.append(next_node) | |
| return {**working_graph, "nodes": nodes} | |
| def build_edge_maps(edges: List[dict]) -> Tuple[Dict[str, List[dict]], Dict[str, List[dict]]]: | |
| incoming: Dict[str, List[dict]] = {} | |
| outgoing: Dict[str, List[dict]] = {} | |
| for edge in edges: | |
| incoming.setdefault(edge.get("target"), []).append(edge) | |
| outgoing.setdefault(edge.get("source"), []).append(edge) | |
| return incoming, outgoing | |
| def has_path(source_id: str, target_id: str, edges: List[dict], visited: Optional[set] = None) -> bool: | |
| if source_id == target_id: | |
| return True | |
| visited = visited or set() | |
| if source_id in visited: | |
| return False | |
| visited.add(source_id) | |
| return any( | |
| has_path(edge.get("target"), target_id, edges, visited) | |
| for edge in edges | |
| if edge.get("source") == source_id | |
| ) | |
| def split_feedback_edges(edges: List[dict]) -> Tuple[List[dict], List[dict]]: | |
| acyclic: List[dict] = [] | |
| feedback: List[dict] = [] | |
| for edge in edges: | |
| if has_path(edge.get("target"), edge.get("source"), acyclic): | |
| feedback.append(edge) | |
| else: | |
| acyclic.append(edge) | |
| return acyclic, feedback | |
| def topological_sort(nodes: List[dict], incoming: Dict[str, List[dict]]) -> List[dict]: | |
| node_lookup = {node.get("id"): node for node in nodes} | |
| visited = set() | |
| visiting = set() | |
| sorted_nodes: List[dict] = [] | |
| def visit(node: dict): | |
| node_id = node.get("id") | |
| if node_id in visited: | |
| return | |
| if node_id in visiting: | |
| raise WorkflowRuntimeError("Workflow contains a circular dependency.") | |
| visiting.add(node_id) | |
| for edge in incoming.get(node_id, []): | |
| source = node_lookup.get(edge.get("source")) | |
| if source: | |
| visit(source) | |
| visiting.remove(node_id) | |
| visited.add(node_id) | |
| sorted_nodes.append(node) | |
| for node in nodes: | |
| visit(node) | |
| return sorted_nodes | |
| def collect_reachable_node_ids(start_ids: List[str], edges: List[dict]) -> set: | |
| reachable = set(start_ids) | |
| outgoing_by_source: Dict[str, List[dict]] = {} | |
| for edge in edges: | |
| outgoing_by_source.setdefault(edge.get("source"), []).append(edge) | |
| queue = list(start_ids) | |
| while queue: | |
| node_id = queue.pop(0) | |
| for edge in outgoing_by_source.get(node_id, []): | |
| target_id = edge.get("target") | |
| if target_id not in reachable: | |
| reachable.add(target_id) | |
| queue.append(target_id) | |
| return reachable | |
| def include_input_dependency_node_ids(node_ids: set, edges: List[dict]) -> set: | |
| required = set(node_ids) | |
| incoming_by_target: Dict[str, List[dict]] = {} | |
| for edge in edges: | |
| incoming_by_target.setdefault(edge.get("target"), []).append(edge) | |
| queue = list(node_ids) | |
| while queue: | |
| node_id = queue.pop(0) | |
| for edge in incoming_by_target.get(node_id, []): | |
| source_id = edge.get("source") | |
| if source_id not in required: | |
| required.add(source_id) | |
| queue.append(source_id) | |
| return required | |
| def runtime_order(nodes: List[dict], edges: List[dict], incoming: Dict[str, List[dict]]) -> List[str]: | |
| sorted_nodes = topological_sort(nodes, incoming) | |
| start_ids = [node.get("id") for node in nodes if node.get("type") == "basic/start"] | |
| if not start_ids: | |
| return [node.get("id") for node in sorted_nodes] | |
| reachable = collect_reachable_node_ids(start_ids, edges) | |
| required = include_input_dependency_node_ids(reachable, edges) | |
| return [node.get("id") for node in sorted_nodes if node.get("id") in required] | |
| def create_interactive_run(graph: dict) -> WorkflowRunState: | |
| working_graph = prepare_graph_for_run(graph) | |
| acyclic_edges, feedback_edges = split_feedback_edges(working_graph.get("edges", [])) | |
| incoming, outgoing = build_edge_maps(acyclic_edges) | |
| _, feedback_outgoing = build_edge_maps(feedback_edges) | |
| order = runtime_order(working_graph.get("nodes", []), working_graph.get("edges", []), incoming) | |
| return WorkflowRunState( | |
| graph=working_graph, | |
| incoming=incoming, | |
| outgoing=outgoing, | |
| feedback_outgoing=feedback_outgoing, | |
| order=order, | |
| ) | |
| def find_node(graph: dict, node_id: str) -> Optional[dict]: | |
| return next((node for node in graph.get("nodes", []) if node.get("id") == node_id), None) | |
| def update_node_runtime(graph: dict, node_id: str, runtime_patch: dict) -> dict: | |
| next_graph = copy.deepcopy(graph) | |
| for node in next_graph.get("nodes", []): | |
| if node.get("id") != node_id: | |
| continue | |
| data = node.setdefault("data", {}) | |
| runtime = data.setdefault("runtime", {}) | |
| runtime.update(runtime_patch) | |
| break | |
| return next_graph | |
| def update_node_subgraph(graph: dict, node_id: str, subgraph: dict) -> dict: | |
| next_graph = copy.deepcopy(graph) | |
| for node in next_graph.get("nodes", []): | |
| if node.get("id") == node_id: | |
| node.setdefault("data", {})["subgraph"] = subgraph | |
| break | |
| return next_graph | |
| def collect_inputs_for_node(incoming_edges: List[dict], outputs_by_node: Dict[str, dict]) -> dict: | |
| values_by_handle: Dict[str, List[Any]] = {} | |
| for edge in incoming_edges or []: | |
| source_outputs = outputs_by_node.get(edge.get("source"), {}) | |
| handle_id = edge.get("targetHandle") or "input" | |
| value = source_outputs.get(edge.get("sourceHandle")) | |
| values_by_handle.setdefault(handle_id, []).append(value) | |
| result = {} | |
| for handle_id, values in values_by_handle.items(): | |
| first_real = next((value for value in values if value is not None), None) | |
| result[handle_id] = first_real if first_real is not None else (values[0] if values else None) | |
| return result | |
| def has_blocked_incoming_value(incoming_edges: List[dict], input_values: dict) -> bool: | |
| handles = {edge.get("targetHandle") or "input" for edge in incoming_edges or []} | |
| return any(input_values.get(handle_id) is None for handle_id in handles) | |
| def render_memory_template(text: Any, memory: dict) -> str: | |
| source = str(text or "") | |
| def replace(match: re.Match) -> str: | |
| key = match.group(1) | |
| value = memory.get(key) | |
| if value is None or value == "": | |
| return match.group(0) | |
| return str(value) | |
| return re.sub(r"\{([a-zA-Z0-9_.-]+)\}", replace, source) | |
| def render_websocket_message_template(text: Any, input_text: Any, memory: dict) -> str: | |
| source = str(text or "") | |
| def replace(match: re.Match) -> str: | |
| key = match.group(1) | |
| if key in {"text", "input"}: | |
| return "" if input_text is None else str(input_text) | |
| value = memory.get(key) | |
| return match.group(0) if value is None else str(value) | |
| return re.sub(r"\{([a-zA-Z0-9_.-]+)\}", replace, source) | |
| def apply_assistant_role(system_prompt: Any, assistant_role: str) -> str: | |
| role = str(assistant_role or "").strip() | |
| if not role: | |
| return str(system_prompt or "") | |
| return ( | |
| f"{role}\n\n" | |
| "Общие правила остаются неизменными: отвечай кратко, естественно, одним живым абзацем, " | |
| "без Markdown, списков, заголовков и нумерации.\n\n" | |
| f"{system_prompt or ''}" | |
| ).strip() | |
| def get_last_dialog_turn(messages: Any) -> dict: | |
| if not isinstance(messages, list): | |
| return {"question": "", "answer": ""} | |
| for index in range(len(messages) - 1, -1, -1): | |
| message = messages[index] | |
| if not isinstance(message, dict) or message.get("type") != "user" or not message.get("text"): | |
| continue | |
| previous = next( | |
| ( | |
| item | |
| for item in reversed(messages[:index]) | |
| if isinstance(item, dict) and item.get("type") == "character" and item.get("text") | |
| ), | |
| None, | |
| ) | |
| return { | |
| "question": previous.get("text", "") if previous else "", | |
| "answer": message.get("text", ""), | |
| } | |
| return {"question": "", "answer": ""} | |
| def build_question_result(node: dict, input_values: dict, answer: str, memory: dict, question_override: str = "") -> dict: | |
| question = question_override or render_memory_template( | |
| input_values.get("question") or node.get("data", {}).get("question") or "", | |
| memory, | |
| ) | |
| dialog_in = input_values.get("dialog-in") if isinstance(input_values.get("dialog-in"), list) else [] | |
| messages = [ | |
| *dialog_in, | |
| *([{"type": "character", "text": question}] if str(question or "").strip() else []), | |
| {"type": "user", "text": answer}, | |
| ] | |
| return { | |
| "outputs": { | |
| "answer": answer, | |
| "question": question, | |
| "dialog": messages, | |
| "turn": { | |
| "answer": answer, | |
| "question": question, | |
| "dialog": messages, | |
| }, | |
| }, | |
| "runtime": { | |
| "question": question, | |
| "answer": answer, | |
| "messages": messages, | |
| }, | |
| } | |
| def ensure_choices(choices: Any) -> List[dict]: | |
| if isinstance(choices, list) and choices: | |
| return [ | |
| { | |
| "id": item.get("id") or f"choice-{index}", | |
| "label": item.get("label") or item.get("keyword") or "", | |
| } | |
| for index, item in enumerate(choices) | |
| if isinstance(item, dict) | |
| ] | |
| return [{"id": "choice-0", "label": ""}] | |
| def ensure_conditions(conditions: Any) -> List[dict]: | |
| if isinstance(conditions, list) and conditions: | |
| return [ | |
| { | |
| "id": item.get("id") or f"condition-{index}", | |
| "keyword": item.get("keyword") or "", | |
| } | |
| for index, item in enumerate(conditions) | |
| if isinstance(item, dict) | |
| ] | |
| return [{"id": "condition-0", "keyword": ""}] | |
| def ensure_script_entries(entries: Any) -> List[dict]: | |
| if not isinstance(entries, list): | |
| return [] | |
| result = [] | |
| for index, entry in enumerate(entries): | |
| if not isinstance(entry, dict) or entry.get("kind") not in {"user", "character"}: | |
| continue | |
| result.append({"id": entry.get("id") or f"{entry.get('kind')}-{index}", "kind": entry.get("kind")}) | |
| return result | |
| def ensure_json_extracts(extracts: Any) -> List[dict]: | |
| if isinstance(extracts, list) and extracts: | |
| return [ | |
| { | |
| "id": item.get("id") or f"extract-{index}", | |
| "label": item.get("label") or f"result {index}", | |
| "path": item.get("path") or "", | |
| "fields": item.get("fields") or "", | |
| } | |
| for index, item in enumerate(extracts) | |
| if isinstance(item, dict) | |
| ] | |
| return [{"id": "extract-0", "label": "items", "path": "data.filter_group.items", "fields": "key, name"}] | |
| def normalize_choice_text(value: Any) -> str: | |
| return re.sub(r"^[\"'«]+|[\"'»]+$", "", str(value or "").strip().lower()) | |
| def parse_json_path(path: str) -> List[Any]: | |
| tokens: List[Any] = [] | |
| for part in str(path or "").strip().split("."): | |
| if not part: | |
| continue | |
| for match in re.finditer(r"([^[\]]+)|\[(\d+|\*)?\]", part): | |
| if match.group(1): | |
| tokens.append(match.group(1)) | |
| elif match.group(2) in {None, "*"}: | |
| tokens.append("*") | |
| else: | |
| tokens.append(int(match.group(2))) | |
| return tokens | |
| def read_json_path(value: Any, path: str) -> Any: | |
| tokens = parse_json_path(path) | |
| def read(current: Any, remaining: List[Any]) -> Any: | |
| if not remaining: | |
| return current | |
| if current is None: | |
| return None | |
| token, rest = remaining[0], remaining[1:] | |
| if token == "*": | |
| if not isinstance(current, list): | |
| return None | |
| return [item for item in (read(item, rest) for item in current) if item is not None] | |
| if isinstance(token, int): | |
| if not isinstance(current, list) or token < 0 or token >= len(current): | |
| return None | |
| return read(current[token], rest) | |
| if not isinstance(current, dict): | |
| return None | |
| return read(current.get(token), rest) | |
| return read(value, tokens) | |
| def parse_field_list(fields: str) -> List[str]: | |
| return [field.strip() for field in str(fields or "").split(",") if field.strip()] | |
| def pick_json_fields(value: Any, fields: str) -> Any: | |
| field_list = parse_field_list(fields) | |
| if not field_list: | |
| return value | |
| def pick_one(item: Any) -> dict: | |
| if not isinstance(item, dict): | |
| return {field.split(".")[-1]: None for field in field_list} | |
| return { | |
| field.split(".")[-1]: read_json_path(item, field) | |
| for field in field_list | |
| } | |
| if isinstance(value, list): | |
| return [pick_one(item) for item in value] | |
| if isinstance(value, dict): | |
| return pick_one(value) | |
| return value | |
| def format_json_parser_output(value: Any) -> str: | |
| if value is None: | |
| return "" | |
| if isinstance(value, str): | |
| return value | |
| if isinstance(value, (int, float, bool)): | |
| return str(value) | |
| return json.dumps(value, ensure_ascii=False, indent=2) | |
| def append_character_message(dialog_in: List[dict], text: Any) -> List[dict]: | |
| message = str(text or "").strip() | |
| return [*dialog_in, {"type": "character", "text": message}] if message else dialog_in | |
| def validate_json_like_message(message: str) -> None: | |
| trimmed = str(message or "").strip() | |
| if not trimmed or (not trimmed.startswith("{") and not trimmed.startswith("[")): | |
| return | |
| json.loads(trimmed) | |
| async def send_websocket_message(url: str, message: str) -> dict: | |
| if not url or not url.strip(): | |
| raise WorkflowRuntimeError("WebSocket URL is empty.") | |
| target_url = url.strip() | |
| if not re.match(r"^wss?://", target_url, flags=re.IGNORECASE): | |
| raise WorkflowRuntimeError(f"WebSocket URL must start with ws:// or wss://. Received: {target_url}") | |
| try: | |
| validate_json_like_message(message) | |
| except json.JSONDecodeError as error: | |
| raise WorkflowRuntimeError(f"Message body is not valid JSON: {error}") from error | |
| try: | |
| import websockets | |
| except ImportError as error: | |
| raise WorkflowRuntimeError( | |
| "Python WebSocket client is not installed. Install backend dependency: websockets." | |
| ) from error | |
| try: | |
| async with websockets.connect(target_url, open_timeout=15) as socket: | |
| await asyncio.sleep(0.12) | |
| await socket.send(message) | |
| try: | |
| reply = await asyncio.wait_for(socket.recv(), timeout=15) | |
| except asyncio.TimeoutError: | |
| raise WorkflowRuntimeError("WebSocket timed out waiting for reply.") | |
| return {"reply": reply if isinstance(reply, str) else "[binary message]"} | |
| except WorkflowRuntimeError: | |
| raise | |
| except Exception as error: | |
| raise WorkflowRuntimeError(f"WebSocket connection failed for {target_url}: {error}") from error | |
| def collect_component_outputs(graph: dict) -> dict: | |
| output_nodes = [node for node in graph.get("nodes", []) if node.get("type") == COMPONENT_OUTPUT_TYPE] | |
| if not output_nodes: | |
| return {"outputs": {"output": None}, "lastOutput": None} | |
| outputs = {} | |
| for index, node in enumerate(output_nodes): | |
| handle_id = node.get("data", {}).get("externalHandleId") or ("output" if index == 0 else f"output-{index}") | |
| outputs[handle_id] = node.get("data", {}).get("runtime", {}).get("value") | |
| values = list(outputs.values()) | |
| return { | |
| "outputs": outputs, | |
| "lastOutput": values[0] if len(values) == 1 else outputs, | |
| } | |
| def get_feedback_jump(run_state: WorkflowRunState, source_node_id: str, outputs: dict) -> Optional[dict]: | |
| for edge in run_state.feedback_outgoing.get(source_node_id, []): | |
| value = outputs.get(edge.get("sourceHandle")) | |
| if value is None: | |
| continue | |
| try: | |
| target_index = run_state.order.index(edge.get("target")) | |
| except ValueError: | |
| continue | |
| return {"edge": edge, "targetIndex": target_index, "outputValue": value} | |
| return None | |
| def build_unclear_node_result(node: dict, input_values: dict, error: Exception) -> Optional[dict]: | |
| node_type = node.get("type") | |
| if node_type == "basic/semantic-branch": | |
| turn = input_values.get("turn") if isinstance(input_values.get("turn"), dict) else {} | |
| branch_payload = input_values.get("dialog-in") if isinstance(input_values.get("dialog-in"), list) else turn.get("dialog", True) | |
| outputs = {choice["id"]: None for choice in ensure_choices(node.get("data", {}).get("choices"))} | |
| outputs["unclear"] = branch_payload | |
| return { | |
| "outputs": outputs, | |
| "runtime": {"result": "unclear", "matchId": "unclear", "error": str(error)}, | |
| } | |
| if node_type == "basic/save-memory": | |
| turn = input_values.get("turn") if isinstance(input_values.get("turn"), dict) else {} | |
| dialog_in = input_values.get("dialog-in") if isinstance(input_values.get("dialog-in"), list) else turn.get("dialog", []) | |
| dialog_in = dialog_in if isinstance(dialog_in, list) else [] | |
| return { | |
| "outputs": {"dialog": None, "value": "", "unclear": dialog_in}, | |
| "runtime": {"result": "unclear", "matchId": "unclear", "value": "", "error": str(error)}, | |
| } | |
| return None | |
| class WorkflowServices: | |
| async def request_llm(self, **_: Any) -> str: | |
| raise WorkflowRuntimeError("Request LLM node needs backend workflow services.") | |
| async def classify(self, **_: Any) -> str: | |
| raise WorkflowRuntimeError("Classifier node needs backend workflow services.") | |
| async def extract_memory_field(self, **_: Any) -> dict: | |
| raise WorkflowRuntimeError("Save Memory node needs backend workflow services.") | |
| async def answer_from_context(self, **_: Any) -> str: | |
| raise WorkflowRuntimeError("Knowledge Answer node needs backend workflow services.") | |
| async def paraphrase_text(self, **kwargs: Any) -> str: | |
| return kwargs.get("text", "") | |
| async def execute_node( | |
| node: dict, | |
| inputs: dict, | |
| run_state: WorkflowRunState, | |
| services: WorkflowServices, | |
| context: dict, | |
| ) -> dict: | |
| node_type = node.get("type") | |
| data = node.get("data", {}) | |
| memory = run_state.memory or {} | |
| if node_type == "basic/text": | |
| input_text = inputs.get("text") | |
| output_text = input_text if input_text is not None else data.get("text", "") | |
| return {"outputs": {"text": output_text}, "runtime": {"inputText": input_text, "outputText": output_text}} | |
| if node_type == "basic/start": | |
| return {"outputs": {"dialog": []}, "runtime": {"started": True}} | |
| if node_type == "basic/update-role": | |
| role = str(data.get("role") or "").strip() | |
| dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else [] | |
| return { | |
| "outputs": {"dialog": dialog_in}, | |
| "runtime": {"assistantRole": role}, | |
| "assistantRolePatch": role, | |
| } | |
| if node_type == "basic/request": | |
| raw_system = inputs.get("system") or "" | |
| user = inputs.get("user") or "" | |
| if not raw_system and not user: | |
| return {"outputs": {"response": ""}, "runtime": {"response": ""}} | |
| system = apply_assistant_role(raw_system, run_state.assistant_role) | |
| response = await services.request_llm(system=system, user=user, node_id=node.get("id"), context=context) | |
| return {"outputs": {"response": response}, "runtime": {"response": response}} | |
| if node_type == "basic/classifier": | |
| question = inputs.get("question") or "" | |
| answer = inputs.get("answer") or "" | |
| options = data.get("options") or "" | |
| if not answer or not options: | |
| return {"outputs": {"result": ""}, "runtime": {"result": ""}} | |
| result = await services.classify( | |
| question=question, | |
| answer=answer, | |
| options=options, | |
| node_id=node.get("id"), | |
| context=context, | |
| ) | |
| return {"outputs": {"result": result}, "runtime": {"result": result}} | |
| if node_type == "basic/semantic-branch": | |
| turn = inputs.get("turn") if isinstance(inputs.get("turn"), dict) else {} | |
| question = inputs.get("question") or turn.get("question") or "" | |
| answer = inputs.get("answer") or turn.get("answer") or "" | |
| branch_payload = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else turn.get("dialog", True) | |
| choices = [choice for choice in ensure_choices(data.get("choices")) if choice.get("label", "").strip()] | |
| outputs = {choice["id"]: None for choice in ensure_choices(data.get("choices"))} | |
| outputs["unclear"] = None | |
| if not answer or not choices: | |
| outputs["unclear"] = branch_payload | |
| return {"outputs": outputs, "runtime": {"result": "unclear", "matchId": "unclear"}} | |
| result = await services.classify( | |
| question=question, | |
| answer=answer, | |
| options=[choice["label"] for choice in choices], | |
| node_id=node.get("id"), | |
| context=context, | |
| ) | |
| normalized = normalize_choice_text(result) | |
| matched = next((choice for choice in choices if normalize_choice_text(choice.get("label")) == normalized), None) | |
| if matched: | |
| outputs[matched["id"]] = branch_payload | |
| else: | |
| outputs["unclear"] = branch_payload | |
| return {"outputs": outputs, "runtime": {"result": result, "matchId": matched["id"] if matched else "unclear"}} | |
| if node_type == "basic/script": | |
| script_in_connected = "script-in" in inputs | |
| script_in = inputs.get("script-in") | |
| if script_in_connected and not script_in: | |
| return { | |
| "outputs": {"dialog": [], **({"script-text": None} if data.get("hasScriptOutput") else {})}, | |
| "runtime": {"messages": [], "scriptOutput": None}, | |
| } | |
| messages: List[dict] = [] | |
| if isinstance(script_in, list): | |
| messages.extend(script_in) | |
| for entry in ensure_script_entries(data.get("entries")): | |
| value = inputs.get(entry["id"]) | |
| if not value: | |
| continue | |
| if isinstance(value, list): | |
| messages.extend(value) | |
| else: | |
| messages.append({"type": entry["kind"], "text": value}) | |
| outputs = {"dialog": messages} | |
| script_output = None | |
| if data.get("hasScriptOutput"): | |
| script_edges = [edge for edge in context.get("outgoing_edges", []) if edge.get("sourceHandle") == "script-text"] | |
| target_node = None | |
| if script_edges: | |
| target_node = context.get("node_lookup", {}).get(script_edges[0].get("target")) | |
| script_output = messages[-1]["text"] if target_node and target_node.get("type") == "basic/text" and messages else messages | |
| outputs["script-text"] = script_output | |
| return {"outputs": outputs, "runtime": {"messages": messages, "scriptOutput": script_output}} | |
| if node_type == "basic/dialog": | |
| messages = inputs.get("dialog") if isinstance(inputs.get("dialog"), list) else [] | |
| return {"outputs": {}, "runtime": {"messages": messages, "cleared": False}} | |
| if node_type == "basic/assistant-message": | |
| text = inputs.get("text") or data.get("text") or "" | |
| rendered = render_memory_template(text, memory) | |
| message = ( | |
| await services.paraphrase_text( | |
| text=rendered, | |
| message_type="message", | |
| node_id=node.get("id"), | |
| context={**context, "assistant_role": run_state.assistant_role}, | |
| ) | |
| if data.get("paraphrase") | |
| else rendered | |
| ) | |
| dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else [] | |
| messages = append_character_message(dialog_in, message) | |
| return { | |
| "outputs": {"dialog": messages}, | |
| "runtime": {"message": message, "messages": messages, "assistantMessage": message}, | |
| } | |
| if node_type == "basic/save-memory": | |
| turn_value = inputs.get("turn") | |
| turn = turn_value if isinstance(turn_value, dict) else {} | |
| key = str(data.get("key") or "").strip() | |
| dialog_in = ( | |
| inputs.get("dialog-in") | |
| if isinstance(inputs.get("dialog-in"), list) | |
| else turn_value | |
| if isinstance(turn_value, list) | |
| else turn.get("dialog", []) | |
| ) | |
| dialog_in = dialog_in if isinstance(dialog_in, list) else [] | |
| latest_turn = get_last_dialog_turn(dialog_in) | |
| answer = inputs.get("answer") or latest_turn.get("answer") or turn.get("answer") or "" | |
| question = inputs.get("question") or latest_turn.get("question") or turn.get("question") or "" | |
| input_text = inputs.get("text") or "" | |
| instruction = str(data.get("instruction") or "").replace("{text}", str(input_text)) | |
| unclear_outputs = {"dialog": None, "value": "", "unclear": dialog_in} | |
| if not key or not answer: | |
| return {"outputs": unclear_outputs, "runtime": {"result": "unclear", "matchId": "unclear", "value": ""}} | |
| response = await services.extract_memory_field( | |
| answer=answer, | |
| key=key, | |
| instruction=instruction, | |
| question=question, | |
| node_id=node.get("id"), | |
| context=context, | |
| ) | |
| value = response.get("value") | |
| value = str(value).strip() if value is not None else "" | |
| if not value: | |
| return { | |
| "outputs": unclear_outputs, | |
| "runtime": { | |
| "key": key, | |
| "value": "", | |
| "result": "unclear", | |
| "matchId": "unclear", | |
| "instruction": instruction, | |
| "inputText": input_text, | |
| }, | |
| } | |
| memory_patch = {key: value} | |
| return { | |
| "outputs": {"dialog": dialog_in, "value": value, "unclear": None}, | |
| "runtime": { | |
| "key": key, | |
| "value": value, | |
| "result": "saved", | |
| "matchId": "value", | |
| "instruction": instruction, | |
| "inputText": input_text, | |
| "memoryPatch": memory_patch, | |
| }, | |
| "memoryPatch": memory_patch, | |
| } | |
| if node_type == "basic/knowledge-answer": | |
| turn = inputs.get("turn") if isinstance(inputs.get("turn"), dict) else {} | |
| question = render_memory_template(inputs.get("question") or turn.get("answer") or "", memory) | |
| dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else turn.get("dialog", []) | |
| dialog_in = dialog_in if isinstance(dialog_in, list) else [] | |
| if not question: | |
| return {"outputs": {"dialog": dialog_in}, "runtime": {"answer": "", "assistantMessage": "", "messages": dialog_in}} | |
| answer = await services.answer_from_context( | |
| question=question, | |
| source=data.get("source") or "uploaded", | |
| context_path=data.get("contextPath") or "", | |
| node_id=node.get("id"), | |
| context={**context, "assistant_role": run_state.assistant_role}, | |
| ) | |
| messages = append_character_message(dialog_in, answer) | |
| return {"outputs": {"dialog": messages}, "runtime": {"answer": answer, "assistantMessage": answer, "messages": messages}} | |
| if node_type == "basic/counter": | |
| key = str(data.get("key") or "counter").strip() or "counter" | |
| try: | |
| limit = max(1, int(data.get("limit") or 3)) | |
| except (TypeError, ValueError): | |
| limit = 3 | |
| try: | |
| previous = int(memory.get(key, 0)) | |
| except (TypeError, ValueError): | |
| previous = 0 | |
| count = previous + 1 | |
| match_id = "done" if count >= limit else "continue" | |
| payload = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else True | |
| return { | |
| "outputs": { | |
| "continue": payload if match_id == "continue" else None, | |
| "done": payload if match_id == "done" else None, | |
| }, | |
| "runtime": { | |
| "key": key, | |
| "count": count, | |
| "limit": limit, | |
| "matchId": match_id, | |
| }, | |
| "memoryPatch": {key: count}, | |
| } | |
| if node_type == "basic/wait": | |
| try: | |
| wait_ms = max(0, int(data.get("ms") or 0)) | |
| except (TypeError, ValueError): | |
| wait_ms = 0 | |
| if wait_ms: | |
| await asyncio.sleep(wait_ms / 1000) | |
| dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else [] | |
| return { | |
| "outputs": {"dialog": dialog_in}, | |
| "runtime": {"waitedMs": wait_ms}, | |
| } | |
| if node_type == "basic/restart": | |
| return {"outputs": {}, "runtime": {"restart": True}, "runtimeAction": {"type": "restart"}} | |
| if node_type == "basic/randomlist": | |
| source = inputs.get("text") | |
| if not source or not str(source).strip(): | |
| return {"outputs": {"text": ""}, "runtime": {"selected": ""}} | |
| items = re.findall(r"'([^']*)'", str(source)) | |
| if not items: | |
| return {"outputs": {"text": ""}, "runtime": {"selected": ""}} | |
| selected = random.choice(items) | |
| return {"outputs": {"text": selected}, "runtime": {"selected": selected}} | |
| if node_type == "basic/send-websocket": | |
| input_text = inputs.get("text") or "" | |
| dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else [] | |
| url = data.get("url") or "" | |
| message = render_websocket_message_template(data.get("messageTemplate") or "", input_text, memory) | |
| result = await send_websocket_message(url, message) | |
| reply = result.get("reply") or "" | |
| return { | |
| "outputs": {"dialog": dialog_in, "text": reply}, | |
| "runtime": {"inputText": input_text, "dialog": dialog_in, "url": url, "message": message, "reply": reply}, | |
| } | |
| if node_type == "basic/json-parser": | |
| raw_json = inputs.get("json") or "" | |
| extracts = ensure_json_extracts(data.get("extracts")) | |
| outputs = {extract["id"]: "" for extract in extracts} | |
| if not raw_json or not str(raw_json).strip(): | |
| return {"outputs": outputs, "runtime": {"values": outputs}} | |
| try: | |
| parsed = json.loads(str(raw_json)) | |
| except json.JSONDecodeError as error: | |
| raise WorkflowRuntimeError(f"Invalid JSON: {error}") from error | |
| for extract in extracts: | |
| value = read_json_path(parsed, extract.get("path", "")) | |
| picked = pick_json_fields(value, extract.get("fields", "")) | |
| outputs[extract["id"]] = format_json_parser_output(picked) | |
| return {"outputs": outputs, "runtime": {"values": outputs}} | |
| if node_type == "basic/ifelse": | |
| input_value = inputs.get("input") | |
| conditions = ensure_conditions(data.get("conditions")) | |
| outputs = {condition["id"]: None for condition in conditions} | |
| match_id = None | |
| if not input_value: | |
| return {"outputs": outputs, "runtime": {"inputValue": "", "matchId": None}} | |
| normalized = str(input_value).strip().lower() | |
| for condition in conditions: | |
| keyword = condition.get("keyword") or "" | |
| if keyword and normalized == keyword.strip().lower(): | |
| outputs[condition["id"]] = True | |
| match_id = condition["id"] | |
| break | |
| return {"outputs": outputs, "runtime": {"inputValue": input_value, "matchId": match_id}} | |
| if node_type == COMPONENT_INPUT_TYPE: | |
| external_values = context.get("external_input_values") if isinstance(context.get("external_input_values"), dict) else None | |
| external_handle_id = data.get("externalHandleId") | |
| if external_values is not None and external_handle_id and external_handle_id in external_values: | |
| value = external_values[external_handle_id] | |
| else: | |
| value = context.get("external_input_value") | |
| return {"outputs": {"output": value}, "runtime": {"value": value}} | |
| if node_type == COMPONENT_OUTPUT_TYPE: | |
| value = inputs.get("input") | |
| return {"outputs": {}, "runtime": {"value": value}} | |
| return {"outputs": {}, "runtime": {}} | |
| async def continue_interactive_run( | |
| run_state: WorkflowRunState, | |
| *, | |
| user_answer: Optional[str] = None, | |
| services: Optional[WorkflowServices] = None, | |
| external_input_value: Any = None, | |
| external_input_values: Optional[dict] = None, | |
| context: Optional[dict] = None, | |
| on_event: EventCallback = None, | |
| ) -> dict: | |
| services = services or WorkflowServices() | |
| context = context or {} | |
| working_graph = run_state.graph | |
| if run_state.pending_question: | |
| result = await _resolve_pending_question( | |
| run_state, | |
| user_answer=user_answer, | |
| services=services, | |
| external_input_value=external_input_value, | |
| external_input_values=external_input_values, | |
| context=context, | |
| on_event=on_event, | |
| ) | |
| if result is not None: | |
| return result | |
| working_graph = run_state.graph | |
| while run_state.cursor < len(run_state.order): | |
| node_id = run_state.order[run_state.cursor] | |
| node = find_node(working_graph, node_id) | |
| if not node: | |
| run_state.cursor += 1 | |
| continue | |
| incoming_edges = run_state.incoming.get(node_id, []) | |
| input_values = { | |
| **collect_inputs_for_node(incoming_edges, run_state.outputs_by_node), | |
| **run_state.input_overrides_by_node.get(node_id, {}), | |
| } | |
| if incoming_edges and has_blocked_incoming_value(incoming_edges, input_values): | |
| run_state.outputs_by_node[node_id] = {} | |
| working_graph = update_node_runtime( | |
| working_graph, | |
| node_id, | |
| {"status": "skipped", "error": "", "inputs": input_values, "outputs": {}}, | |
| ) | |
| run_state.graph = working_graph | |
| _emit(on_event, {"type": "skip", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")}) | |
| run_state.cursor += 1 | |
| continue | |
| working_graph = update_node_runtime(working_graph, node_id, {"status": "running", "error": "", "inputs": input_values}) | |
| run_state.graph = working_graph | |
| _emit(on_event, {"type": "node-start", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")}) | |
| if node.get("type") == "basic/question": | |
| rendered = render_memory_template(input_values.get("question") or node.get("data", {}).get("question") or "", run_state.memory) | |
| question = ( | |
| await services.paraphrase_text( | |
| text=rendered, | |
| message_type="question", | |
| node_id=node_id, | |
| context={**context, "assistant_role": run_state.assistant_role}, | |
| ) | |
| if node.get("data", {}).get("paraphrase") | |
| else rendered | |
| ) | |
| pending = {"nodeId": node_id, "question": question, "inputs": input_values} | |
| working_graph = update_node_runtime( | |
| working_graph, | |
| node_id, | |
| {"status": "waiting", "error": "", "inputs": input_values, "outputs": {}, "question": question}, | |
| ) | |
| run_state.graph = working_graph | |
| run_state.pending_question = pending | |
| _emit(on_event, {"type": "assistant-question", "nodeId": node_id, "question": question}) | |
| return _runtime_response("paused", run_state, pending) | |
| try: | |
| if is_component_node_type(node.get("type")): | |
| result = await _execute_component_node( | |
| run_state, | |
| node, | |
| input_values, | |
| services=services, | |
| context=context, | |
| on_event=on_event, | |
| ) | |
| if result.get("status") in {"paused", "restart"}: | |
| return result | |
| node_result = result["node_result"] | |
| working_graph = run_state.graph | |
| else: | |
| node_lookup = {candidate.get("id"): candidate for candidate in working_graph.get("nodes", [])} | |
| outgoing_edges = run_state.outgoing.get(node_id, []) | |
| node_result = await execute_node( | |
| node, | |
| input_values, | |
| run_state, | |
| services, | |
| { | |
| **context, | |
| "external_input_value": external_input_value, | |
| "external_input_values": external_input_values, | |
| "connected_output_handles": [ | |
| edge.get("sourceHandle") | |
| for edge in outgoing_edges | |
| if edge.get("sourceHandle") | |
| ], | |
| "outgoing_edges": outgoing_edges, | |
| "node_lookup": node_lookup, | |
| }, | |
| ) | |
| if node_result.get("runtimeAction", {}).get("type") == "restart": | |
| working_graph = update_node_runtime( | |
| working_graph, | |
| node_id, | |
| { | |
| "status": "success", | |
| "error": "", | |
| "inputs": input_values, | |
| "outputs": node_result.get("outputs", {}), | |
| **node_result.get("runtime", {}), | |
| }, | |
| ) | |
| run_state.graph = working_graph | |
| _emit(on_event, {"type": "restart", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")}) | |
| return _runtime_response("restart", run_state, None) | |
| retry_result = await _maybe_pause_for_unclear_retry( | |
| run_state, | |
| node, | |
| input_values, | |
| node_result, | |
| services=services, | |
| context=context, | |
| on_event=on_event, | |
| ) | |
| if retry_result: | |
| return retry_result | |
| outputs = node_result.get("outputs", {}) | |
| run_state.outputs_by_node[node_id] = outputs | |
| run_state.input_overrides_by_node.pop(node_id, None) | |
| memory_patch = node_result.get("memoryPatch") | |
| if isinstance(memory_patch, dict): | |
| run_state.memory.update(memory_patch) | |
| if isinstance(node_result.get("assistantRolePatch"), str): | |
| run_state.assistant_role = node_result.get("assistantRolePatch") or DEFAULT_ASSISTANT_ROLE | |
| working_graph = update_node_runtime( | |
| working_graph, | |
| node_id, | |
| { | |
| "status": "success", | |
| "error": "", | |
| "inputs": input_values, | |
| "outputs": outputs, | |
| **node_result.get("runtime", {}), | |
| "assistantRole": run_state.assistant_role, | |
| }, | |
| ) | |
| run_state.graph = working_graph | |
| _emit(on_event, {"type": "node-success", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")}) | |
| assistant_message = node_result.get("runtime", {}).get("assistantMessage") | |
| if assistant_message: | |
| _emit(on_event, {"type": "assistant-message", "nodeId": node_id, "message": assistant_message}) | |
| if isinstance(memory_patch, dict): | |
| _emit(on_event, {"type": "memory-update", "nodeId": node_id, "memoryPatch": memory_patch}) | |
| if isinstance(node_result.get("assistantRolePatch"), str): | |
| _emit(on_event, {"type": "assistant-role-update", "nodeId": node_id, "assistantRole": run_state.assistant_role}) | |
| feedback_jump = get_feedback_jump(run_state, node_id, outputs) | |
| if feedback_jump: | |
| edge = feedback_jump["edge"] | |
| target_id = edge.get("target") | |
| current_overrides = run_state.input_overrides_by_node.get(target_id, {}) | |
| run_state.input_overrides_by_node[target_id] = { | |
| **current_overrides, | |
| edge.get("targetHandle") or "input": feedback_jump["outputValue"], | |
| } | |
| run_state.cursor = feedback_jump["targetIndex"] | |
| continue | |
| except Exception as error: | |
| _emit( | |
| on_event, | |
| { | |
| "type": "node-error", | |
| "nodeId": node_id, | |
| "title": node.get("data", {}).get("title") or node.get("type"), | |
| "error": str(error), | |
| }, | |
| ) | |
| unclear_result = build_unclear_node_result(node, input_values, error) | |
| if unclear_result: | |
| retry_result = await _maybe_pause_for_unclear_retry( | |
| run_state, | |
| node, | |
| input_values, | |
| unclear_result, | |
| services=services, | |
| context=context, | |
| on_event=on_event, | |
| ) | |
| if retry_result: | |
| return retry_result | |
| run_state.outputs_by_node[node_id] = {} | |
| working_graph = update_node_runtime( | |
| working_graph, | |
| node_id, | |
| {"status": "error", "error": str(error), "inputs": input_values, "outputs": {}}, | |
| ) | |
| run_state.graph = working_graph | |
| run_state.cursor += 1 | |
| run_state.completed = True | |
| run_state.pending_question = None | |
| return _runtime_response("complete", run_state, None) | |
| async def _resolve_pending_question( | |
| run_state: WorkflowRunState, | |
| *, | |
| user_answer: Optional[str], | |
| services: WorkflowServices, | |
| external_input_value: Any, | |
| external_input_values: Optional[dict], | |
| context: dict, | |
| on_event: EventCallback, | |
| ) -> Optional[dict]: | |
| pending = run_state.pending_question or {} | |
| working_graph = run_state.graph | |
| if pending.get("componentNodeId"): | |
| component_node_id = pending["componentNodeId"] | |
| component_node = find_node(working_graph, component_node_id) | |
| component_run = run_state.component_runs_by_node.get(component_node_id) | |
| component_input_values = pending.get("componentInputValues") or {} | |
| if not component_node or not component_run: | |
| raise WorkflowRuntimeError("Pending component question was not found.") | |
| component_run.memory = run_state.memory | |
| component_run.assistant_role = run_state.assistant_role | |
| component_result = await continue_interactive_run( | |
| component_run, | |
| user_answer=user_answer, | |
| services=services, | |
| external_input_value=component_input_values.get("input"), | |
| external_input_values=component_input_values, | |
| context=context, | |
| on_event=on_event, | |
| ) | |
| run_state.memory = component_result["state"].memory | |
| run_state.assistant_role = component_result["state"].assistant_role | |
| run_state.component_runs_by_node[component_node_id] = component_result["state"] | |
| working_graph = update_node_subgraph(working_graph, component_node_id, component_result["state"].graph) | |
| run_state.graph = working_graph | |
| if component_result["status"] == "paused": | |
| next_pending = { | |
| **(component_result.get("pending_question") or {}), | |
| "componentNodeId": component_node_id, | |
| "componentInputValues": component_input_values, | |
| } | |
| run_state.pending_question = next_pending | |
| run_state.graph = update_node_runtime( | |
| run_state.graph, | |
| component_node_id, | |
| {"status": "waiting", "error": "", "inputs": component_input_values, "outputs": {}}, | |
| ) | |
| return _runtime_response("paused", run_state, next_pending) | |
| if component_result["status"] == "restart": | |
| run_state.pending_question = None | |
| return _runtime_response("restart", run_state, None) | |
| component_outputs = collect_component_outputs(component_result["state"].graph) | |
| run_state.outputs_by_node[component_node_id] = component_outputs["outputs"] | |
| run_state.component_runs_by_node.pop(component_node_id, None) | |
| run_state.pending_question = None | |
| run_state.cursor += 1 | |
| run_state.graph = update_node_runtime( | |
| run_state.graph, | |
| component_node_id, | |
| { | |
| "status": "success", | |
| "error": "", | |
| "inputs": component_input_values, | |
| "outputs": component_outputs["outputs"], | |
| "lastOutput": component_outputs["lastOutput"], | |
| "assistantRole": run_state.assistant_role, | |
| }, | |
| ) | |
| return None | |
| if user_answer is None or not str(user_answer).strip(): | |
| return _runtime_response("paused", run_state, pending) | |
| pending_node = find_node(working_graph, pending.get("nodeId")) | |
| if not pending_node: | |
| raise WorkflowRuntimeError("Pending question node was not found.") | |
| answer = str(user_answer).strip() | |
| if pending.get("retryForNodeId"): | |
| retry_dialog = [ | |
| *(pending.get("inputs", {}).get("dialog-in") if isinstance(pending.get("inputs", {}).get("dialog-in"), list) else []), | |
| *([{"type": "character", "text": pending.get("question", "")}] if str(pending.get("question", "")).strip() else []), | |
| {"type": "user", "text": answer}, | |
| ] | |
| run_state.input_overrides_by_node[pending["retryForNodeId"]] = { | |
| "answer": answer, | |
| "question": pending.get("question", ""), | |
| "dialog-in": retry_dialog, | |
| } | |
| _emit(on_event, {"type": "user-answer", "nodeId": pending_node.get("id"), "answer": answer}) | |
| run_state.graph = update_node_runtime(run_state.graph, pending_node.get("id"), {"status": "running", "error": ""}) | |
| run_state.pending_question = None | |
| return None | |
| result = build_question_result(pending_node, pending.get("inputs", {}), answer, run_state.memory, pending.get("question", "")) | |
| run_state.outputs_by_node[pending_node.get("id")] = result["outputs"] | |
| run_state.graph = update_node_runtime( | |
| run_state.graph, | |
| pending_node.get("id"), | |
| { | |
| "status": "success", | |
| "error": "", | |
| "inputs": pending.get("inputs", {}), | |
| "outputs": result["outputs"], | |
| **result["runtime"], | |
| }, | |
| ) | |
| _emit(on_event, {"type": "user-answer", "nodeId": pending_node.get("id"), "answer": answer}) | |
| run_state.pending_question = None | |
| run_state.cursor += 1 | |
| return None | |
| async def _execute_component_node( | |
| run_state: WorkflowRunState, | |
| node: dict, | |
| input_values: dict, | |
| *, | |
| services: WorkflowServices, | |
| context: dict, | |
| on_event: EventCallback, | |
| ) -> dict: | |
| subgraph = node.get("data", {}).get("subgraph") or create_component_subgraph() | |
| component_run = create_interactive_run(subgraph) | |
| component_run.memory = run_state.memory | |
| component_run.assistant_role = run_state.assistant_role | |
| run_state.component_runs_by_node[node.get("id")] = component_run | |
| sub_result = await continue_interactive_run( | |
| component_run, | |
| services=services, | |
| external_input_value=input_values.get("input"), | |
| external_input_values=input_values, | |
| context=context, | |
| on_event=on_event, | |
| ) | |
| run_state.memory = sub_result["state"].memory | |
| run_state.assistant_role = sub_result["state"].assistant_role | |
| run_state.component_runs_by_node[node.get("id")] = sub_result["state"] | |
| run_state.graph = update_node_subgraph(run_state.graph, node.get("id"), sub_result["state"].graph) | |
| if sub_result["status"] == "paused": | |
| pending = { | |
| **(sub_result.get("pending_question") or {}), | |
| "componentNodeId": node.get("id"), | |
| "componentInputValues": input_values, | |
| } | |
| run_state.pending_question = pending | |
| run_state.graph = update_node_runtime( | |
| run_state.graph, | |
| node.get("id"), | |
| {"status": "waiting", "error": "", "inputs": input_values, "outputs": {}}, | |
| ) | |
| return _runtime_response("paused", run_state, pending) | |
| if sub_result["status"] == "restart": | |
| return _runtime_response("restart", run_state, None) | |
| component_outputs = collect_component_outputs(sub_result["state"].graph) | |
| run_state.component_runs_by_node.pop(node.get("id"), None) | |
| return { | |
| "status": "node-complete", | |
| "node_result": { | |
| "outputs": component_outputs["outputs"], | |
| "runtime": {"lastOutput": component_outputs["lastOutput"], "assistantRole": run_state.assistant_role}, | |
| }, | |
| } | |
| async def _maybe_pause_for_unclear_retry( | |
| run_state: WorkflowRunState, | |
| node: dict, | |
| input_values: dict, | |
| node_result: dict, | |
| *, | |
| services: WorkflowServices, | |
| context: dict, | |
| on_event: EventCallback, | |
| ) -> Optional[dict]: | |
| node_type = node.get("type") | |
| runtime = node_result.get("runtime", {}) | |
| data = node.get("data", {}) | |
| can_retry = ( | |
| node_type in {"basic/semantic-branch", "basic/save-memory"} | |
| and runtime.get("matchId") == "unclear" | |
| and data.get("retryOnUnclear", True) is not False | |
| ) | |
| if not can_retry: | |
| return None | |
| node_id = node.get("id") | |
| retry_count = run_state.retry_counts_by_node.get(node_id, 0) | |
| default_retry_question = "Не смогла уверенно понять ответ. Пожалуйста, ответьте ближе к одному из вариантов." | |
| raw_retry_question = data.get("retryQuestion") if "retryQuestion" in data else default_retry_question | |
| retry_template = render_memory_template( | |
| raw_retry_question, | |
| run_state.memory, | |
| ) | |
| if node_type == "basic/save-memory": | |
| retry_template = retry_template.replace("{text}", str(input_values.get("text") or "")) | |
| run_state.retry_counts_by_node[node_id] = retry_count + 1 | |
| retry_question = ( | |
| await services.paraphrase_text( | |
| text=retry_template, | |
| message_type="question", | |
| node_id=node_id, | |
| context={**context, "assistant_role": run_state.assistant_role}, | |
| ) | |
| if data.get("retryParaphrase") | |
| else retry_template | |
| ) | |
| pending = { | |
| "nodeId": node_id, | |
| "retryForNodeId": node_id, | |
| "question": retry_question, | |
| "inputs": input_values, | |
| } | |
| run_state.pending_question = pending | |
| run_state.graph = update_node_runtime( | |
| run_state.graph, | |
| node_id, | |
| { | |
| "status": "waiting", | |
| "error": "", | |
| "inputs": input_values, | |
| "outputs": {}, | |
| "result": "unclear", | |
| "matchId": "unclear", | |
| "retryCount": retry_count + 1, | |
| }, | |
| ) | |
| _emit(on_event, {"type": "assistant-question", "nodeId": node_id, "question": retry_question}) | |
| return _runtime_response("paused", run_state, pending) | |
| def _runtime_response(status: str, run_state: WorkflowRunState, pending_question: Optional[dict]) -> dict: | |
| return { | |
| "status": status, | |
| "state": run_state, | |
| "pending_question": pending_question, | |
| } | |
| def _emit(callback: EventCallback, event: dict) -> None: | |
| if callback: | |
| callback({**event, "timestamp": time.time()}) | |
| def serialize_run_response(run_id: str, result: dict, events: List[dict]) -> dict: | |
| state: WorkflowRunState = result["state"] | |
| return { | |
| "run_id": run_id, | |
| "status": result["status"], | |
| "pending_question": result.get("pending_question"), | |
| "completed": state.completed, | |
| "cursor": state.cursor, | |
| "order": state.order, | |
| "memory": state.memory, | |
| "assistant_role": state.assistant_role, | |
| "graph": state.graph, | |
| "events": events, | |
| } | |