from __future__ import annotations import asyncio import copy import json import random import re import time from dataclasses import dataclass, field from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple COMPONENT_NODE_TYPE = "system/component" COMPONENT_INPUT_TYPE = "system/component-input" COMPONENT_OUTPUT_TYPE = "system/component-output" FLOW_FORMAT = "nodes-ui-flow" DEFAULT_ASSISTANT_ROLE = ( "Ты компьютерный ИИ-ассистент. Отвечай кратко, естественно, одним живым абзацем, " "без Markdown, списков, заголовков и нумерации." ) class WorkflowRuntimeError(Exception): """Raised when the Python workflow runtime cannot continue safely.""" EventCallback = Optional[Callable[[dict], None]] @dataclass class WorkflowRunState: graph: dict incoming: Dict[str, List[dict]] outgoing: Dict[str, List[dict]] feedback_outgoing: Dict[str, List[dict]] order: List[str] cursor: int = 0 outputs_by_node: Dict[str, dict] = field(default_factory=dict) input_overrides_by_node: Dict[str, dict] = field(default_factory=dict) memory: Dict[str, Any] = field(default_factory=dict) assistant_role: str = DEFAULT_ASSISTANT_ROLE retry_counts_by_node: Dict[str, int] = field(default_factory=dict) component_runs_by_node: Dict[str, "WorkflowRunState"] = field(default_factory=dict) pending_question: Optional[dict] = None completed: bool = False def parse_workflow_graph(workflow: dict, graph_path: Optional[List[str]] = None) -> dict: if not isinstance(workflow, dict): raise WorkflowRuntimeError("Invalid workflow payload.") if workflow.get("format") == FLOW_FORMAT and isinstance(workflow.get("rootGraph"), dict): graph = workflow["rootGraph"] elif isinstance(workflow.get("nodes"), list) and isinstance(workflow.get("edges"), list): graph = workflow else: raise WorkflowRuntimeError("Unsupported workflow format for Python runtime.") for node_id in graph_path or []: component = next( ( node for node in graph.get("nodes", []) if node.get("id") == node_id and node.get("type") == COMPONENT_NODE_TYPE ), None, ) if not component: raise WorkflowRuntimeError(f"Component graph was not found: {node_id}") graph = component.get("data", {}).get("subgraph") or create_component_subgraph() return copy.deepcopy(graph) def create_component_subgraph() -> dict: return {"nodes": [], "edges": [], "viewport": None} def is_component_node_type(node_type: str) -> bool: return node_type == COMPONENT_NODE_TYPE def is_component_boundary_type(node_type: str) -> bool: return node_type in {COMPONENT_INPUT_TYPE, COMPONENT_OUTPUT_TYPE} def normalize_edge(edge: dict) -> dict: return { **edge, "id": str(edge.get("id", "")), "source": str(edge.get("source", "")), "target": str(edge.get("target", "")), "sourceHandle": edge.get("sourceHandle"), "targetHandle": edge.get("targetHandle"), "type": "default" if edge.get("type") == "smoothstep" else edge.get("type", "default"), } def hydrate_graph(graph: dict) -> dict: return { **(graph or {}), "nodes": [hydrate_node(node) for node in (graph or {}).get("nodes", [])], "edges": [normalize_edge(edge) for edge in (graph or {}).get("edges", [])], "viewport": (graph or {}).get("viewport"), } def hydrate_node(node: dict) -> dict: next_node = copy.deepcopy(node) next_node["id"] = str(next_node.get("id", "")) next_node.setdefault("data", {}) if next_node.get("type") == COMPONENT_NODE_TYPE: subgraph = next_node["data"].get("subgraph") next_node["data"]["subgraph"] = hydrate_graph(subgraph or create_component_subgraph()) return next_node def create_runtime(node_type: str) -> dict: return { "type": node_type, "status": "idle", "error": "", "inputs": {}, "outputs": {}, } def prepare_graph_for_run(graph: dict) -> dict: working_graph = hydrate_graph(graph) nodes = [] for node in working_graph.get("nodes", []): next_node = copy.deepcopy(node) runtime = create_runtime(next_node.get("type", "")) if next_node.get("type") == "basic/dialog": runtime.update({"messages": [], "cleared": True}) next_node.setdefault("data", {}) next_node["data"]["runtime"] = runtime nodes.append(next_node) return {**working_graph, "nodes": nodes} def build_edge_maps(edges: List[dict]) -> Tuple[Dict[str, List[dict]], Dict[str, List[dict]]]: incoming: Dict[str, List[dict]] = {} outgoing: Dict[str, List[dict]] = {} for edge in edges: incoming.setdefault(edge.get("target"), []).append(edge) outgoing.setdefault(edge.get("source"), []).append(edge) return incoming, outgoing def has_path(source_id: str, target_id: str, edges: List[dict], visited: Optional[set] = None) -> bool: if source_id == target_id: return True visited = visited or set() if source_id in visited: return False visited.add(source_id) return any( has_path(edge.get("target"), target_id, edges, visited) for edge in edges if edge.get("source") == source_id ) def split_feedback_edges(edges: List[dict]) -> Tuple[List[dict], List[dict]]: acyclic: List[dict] = [] feedback: List[dict] = [] for edge in edges: if has_path(edge.get("target"), edge.get("source"), acyclic): feedback.append(edge) else: acyclic.append(edge) return acyclic, feedback def topological_sort(nodes: List[dict], incoming: Dict[str, List[dict]]) -> List[dict]: node_lookup = {node.get("id"): node for node in nodes} visited = set() visiting = set() sorted_nodes: List[dict] = [] def visit(node: dict): node_id = node.get("id") if node_id in visited: return if node_id in visiting: raise WorkflowRuntimeError("Workflow contains a circular dependency.") visiting.add(node_id) for edge in incoming.get(node_id, []): source = node_lookup.get(edge.get("source")) if source: visit(source) visiting.remove(node_id) visited.add(node_id) sorted_nodes.append(node) for node in nodes: visit(node) return sorted_nodes def collect_reachable_node_ids(start_ids: List[str], edges: List[dict]) -> set: reachable = set(start_ids) outgoing_by_source: Dict[str, List[dict]] = {} for edge in edges: outgoing_by_source.setdefault(edge.get("source"), []).append(edge) queue = list(start_ids) while queue: node_id = queue.pop(0) for edge in outgoing_by_source.get(node_id, []): target_id = edge.get("target") if target_id not in reachable: reachable.add(target_id) queue.append(target_id) return reachable def include_input_dependency_node_ids(node_ids: set, edges: List[dict]) -> set: required = set(node_ids) incoming_by_target: Dict[str, List[dict]] = {} for edge in edges: incoming_by_target.setdefault(edge.get("target"), []).append(edge) queue = list(node_ids) while queue: node_id = queue.pop(0) for edge in incoming_by_target.get(node_id, []): source_id = edge.get("source") if source_id not in required: required.add(source_id) queue.append(source_id) return required def runtime_order(nodes: List[dict], edges: List[dict], incoming: Dict[str, List[dict]]) -> List[str]: sorted_nodes = topological_sort(nodes, incoming) start_ids = [node.get("id") for node in nodes if node.get("type") == "basic/start"] if not start_ids: return [node.get("id") for node in sorted_nodes] reachable = collect_reachable_node_ids(start_ids, edges) required = include_input_dependency_node_ids(reachable, edges) return [node.get("id") for node in sorted_nodes if node.get("id") in required] def create_interactive_run(graph: dict) -> WorkflowRunState: working_graph = prepare_graph_for_run(graph) acyclic_edges, feedback_edges = split_feedback_edges(working_graph.get("edges", [])) incoming, outgoing = build_edge_maps(acyclic_edges) _, feedback_outgoing = build_edge_maps(feedback_edges) order = runtime_order(working_graph.get("nodes", []), working_graph.get("edges", []), incoming) return WorkflowRunState( graph=working_graph, incoming=incoming, outgoing=outgoing, feedback_outgoing=feedback_outgoing, order=order, ) def find_node(graph: dict, node_id: str) -> Optional[dict]: return next((node for node in graph.get("nodes", []) if node.get("id") == node_id), None) def update_node_runtime(graph: dict, node_id: str, runtime_patch: dict) -> dict: next_graph = copy.deepcopy(graph) for node in next_graph.get("nodes", []): if node.get("id") != node_id: continue data = node.setdefault("data", {}) runtime = data.setdefault("runtime", {}) runtime.update(runtime_patch) break return next_graph def update_node_subgraph(graph: dict, node_id: str, subgraph: dict) -> dict: next_graph = copy.deepcopy(graph) for node in next_graph.get("nodes", []): if node.get("id") == node_id: node.setdefault("data", {})["subgraph"] = subgraph break return next_graph def collect_inputs_for_node(incoming_edges: List[dict], outputs_by_node: Dict[str, dict]) -> dict: values_by_handle: Dict[str, List[Any]] = {} for edge in incoming_edges or []: source_outputs = outputs_by_node.get(edge.get("source"), {}) handle_id = edge.get("targetHandle") or "input" value = source_outputs.get(edge.get("sourceHandle")) values_by_handle.setdefault(handle_id, []).append(value) result = {} for handle_id, values in values_by_handle.items(): first_real = next((value for value in values if value is not None), None) result[handle_id] = first_real if first_real is not None else (values[0] if values else None) return result def has_blocked_incoming_value(incoming_edges: List[dict], input_values: dict) -> bool: handles = {edge.get("targetHandle") or "input" for edge in incoming_edges or []} return any(input_values.get(handle_id) is None for handle_id in handles) def render_memory_template(text: Any, memory: dict) -> str: source = str(text or "") def replace(match: re.Match) -> str: key = match.group(1) value = memory.get(key) if value is None or value == "": return match.group(0) return str(value) return re.sub(r"\{([a-zA-Z0-9_.-]+)\}", replace, source) def render_websocket_message_template(text: Any, input_text: Any, memory: dict) -> str: source = str(text or "") def replace(match: re.Match) -> str: key = match.group(1) if key in {"text", "input"}: return "" if input_text is None else str(input_text) value = memory.get(key) return match.group(0) if value is None else str(value) return re.sub(r"\{([a-zA-Z0-9_.-]+)\}", replace, source) def apply_assistant_role(system_prompt: Any, assistant_role: str) -> str: role = str(assistant_role or "").strip() if not role: return str(system_prompt or "") return ( f"{role}\n\n" "Общие правила остаются неизменными: отвечай кратко, естественно, одним живым абзацем, " "без Markdown, списков, заголовков и нумерации.\n\n" f"{system_prompt or ''}" ).strip() def get_last_dialog_turn(messages: Any) -> dict: if not isinstance(messages, list): return {"question": "", "answer": ""} for index in range(len(messages) - 1, -1, -1): message = messages[index] if not isinstance(message, dict) or message.get("type") != "user" or not message.get("text"): continue previous = next( ( item for item in reversed(messages[:index]) if isinstance(item, dict) and item.get("type") == "character" and item.get("text") ), None, ) return { "question": previous.get("text", "") if previous else "", "answer": message.get("text", ""), } return {"question": "", "answer": ""} def build_question_result(node: dict, input_values: dict, answer: str, memory: dict, question_override: str = "") -> dict: question = question_override or render_memory_template( input_values.get("question") or node.get("data", {}).get("question") or "", memory, ) dialog_in = input_values.get("dialog-in") if isinstance(input_values.get("dialog-in"), list) else [] messages = [ *dialog_in, *([{"type": "character", "text": question}] if str(question or "").strip() else []), {"type": "user", "text": answer}, ] return { "outputs": { "answer": answer, "question": question, "dialog": messages, "turn": { "answer": answer, "question": question, "dialog": messages, }, }, "runtime": { "question": question, "answer": answer, "messages": messages, }, } def ensure_choices(choices: Any) -> List[dict]: if isinstance(choices, list) and choices: return [ { "id": item.get("id") or f"choice-{index}", "label": item.get("label") or item.get("keyword") or "", } for index, item in enumerate(choices) if isinstance(item, dict) ] return [{"id": "choice-0", "label": ""}] def ensure_conditions(conditions: Any) -> List[dict]: if isinstance(conditions, list) and conditions: return [ { "id": item.get("id") or f"condition-{index}", "keyword": item.get("keyword") or "", } for index, item in enumerate(conditions) if isinstance(item, dict) ] return [{"id": "condition-0", "keyword": ""}] def ensure_script_entries(entries: Any) -> List[dict]: if not isinstance(entries, list): return [] result = [] for index, entry in enumerate(entries): if not isinstance(entry, dict) or entry.get("kind") not in {"user", "character"}: continue result.append({"id": entry.get("id") or f"{entry.get('kind')}-{index}", "kind": entry.get("kind")}) return result def ensure_json_extracts(extracts: Any) -> List[dict]: if isinstance(extracts, list) and extracts: return [ { "id": item.get("id") or f"extract-{index}", "label": item.get("label") or f"result {index}", "path": item.get("path") or "", "fields": item.get("fields") or "", } for index, item in enumerate(extracts) if isinstance(item, dict) ] return [{"id": "extract-0", "label": "items", "path": "data.filter_group.items", "fields": "key, name"}] def normalize_choice_text(value: Any) -> str: return re.sub(r"^[\"'«]+|[\"'»]+$", "", str(value or "").strip().lower()) def parse_json_path(path: str) -> List[Any]: tokens: List[Any] = [] for part in str(path or "").strip().split("."): if not part: continue for match in re.finditer(r"([^[\]]+)|\[(\d+|\*)?\]", part): if match.group(1): tokens.append(match.group(1)) elif match.group(2) in {None, "*"}: tokens.append("*") else: tokens.append(int(match.group(2))) return tokens def read_json_path(value: Any, path: str) -> Any: tokens = parse_json_path(path) def read(current: Any, remaining: List[Any]) -> Any: if not remaining: return current if current is None: return None token, rest = remaining[0], remaining[1:] if token == "*": if not isinstance(current, list): return None return [item for item in (read(item, rest) for item in current) if item is not None] if isinstance(token, int): if not isinstance(current, list) or token < 0 or token >= len(current): return None return read(current[token], rest) if not isinstance(current, dict): return None return read(current.get(token), rest) return read(value, tokens) def parse_field_list(fields: str) -> List[str]: return [field.strip() for field in str(fields or "").split(",") if field.strip()] def pick_json_fields(value: Any, fields: str) -> Any: field_list = parse_field_list(fields) if not field_list: return value def pick_one(item: Any) -> dict: if not isinstance(item, dict): return {field.split(".")[-1]: None for field in field_list} return { field.split(".")[-1]: read_json_path(item, field) for field in field_list } if isinstance(value, list): return [pick_one(item) for item in value] if isinstance(value, dict): return pick_one(value) return value def format_json_parser_output(value: Any) -> str: if value is None: return "" if isinstance(value, str): return value if isinstance(value, (int, float, bool)): return str(value) return json.dumps(value, ensure_ascii=False, indent=2) def append_character_message(dialog_in: List[dict], text: Any) -> List[dict]: message = str(text or "").strip() return [*dialog_in, {"type": "character", "text": message}] if message else dialog_in def validate_json_like_message(message: str) -> None: trimmed = str(message or "").strip() if not trimmed or (not trimmed.startswith("{") and not trimmed.startswith("[")): return json.loads(trimmed) async def send_websocket_message(url: str, message: str) -> dict: if not url or not url.strip(): raise WorkflowRuntimeError("WebSocket URL is empty.") target_url = url.strip() if not re.match(r"^wss?://", target_url, flags=re.IGNORECASE): raise WorkflowRuntimeError(f"WebSocket URL must start with ws:// or wss://. Received: {target_url}") try: validate_json_like_message(message) except json.JSONDecodeError as error: raise WorkflowRuntimeError(f"Message body is not valid JSON: {error}") from error try: import websockets except ImportError as error: raise WorkflowRuntimeError( "Python WebSocket client is not installed. Install backend dependency: websockets." ) from error try: async with websockets.connect(target_url, open_timeout=15) as socket: await asyncio.sleep(0.12) await socket.send(message) try: reply = await asyncio.wait_for(socket.recv(), timeout=15) except asyncio.TimeoutError: raise WorkflowRuntimeError("WebSocket timed out waiting for reply.") return {"reply": reply if isinstance(reply, str) else "[binary message]"} except WorkflowRuntimeError: raise except Exception as error: raise WorkflowRuntimeError(f"WebSocket connection failed for {target_url}: {error}") from error def collect_component_outputs(graph: dict) -> dict: output_nodes = [node for node in graph.get("nodes", []) if node.get("type") == COMPONENT_OUTPUT_TYPE] if not output_nodes: return {"outputs": {"output": None}, "lastOutput": None} outputs = {} for index, node in enumerate(output_nodes): handle_id = node.get("data", {}).get("externalHandleId") or ("output" if index == 0 else f"output-{index}") outputs[handle_id] = node.get("data", {}).get("runtime", {}).get("value") values = list(outputs.values()) return { "outputs": outputs, "lastOutput": values[0] if len(values) == 1 else outputs, } def get_feedback_jump(run_state: WorkflowRunState, source_node_id: str, outputs: dict) -> Optional[dict]: for edge in run_state.feedback_outgoing.get(source_node_id, []): value = outputs.get(edge.get("sourceHandle")) if value is None: continue try: target_index = run_state.order.index(edge.get("target")) except ValueError: continue return {"edge": edge, "targetIndex": target_index, "outputValue": value} return None def build_unclear_node_result(node: dict, input_values: dict, error: Exception) -> Optional[dict]: node_type = node.get("type") if node_type == "basic/semantic-branch": turn = input_values.get("turn") if isinstance(input_values.get("turn"), dict) else {} branch_payload = input_values.get("dialog-in") if isinstance(input_values.get("dialog-in"), list) else turn.get("dialog", True) outputs = {choice["id"]: None for choice in ensure_choices(node.get("data", {}).get("choices"))} outputs["unclear"] = branch_payload return { "outputs": outputs, "runtime": {"result": "unclear", "matchId": "unclear", "error": str(error)}, } if node_type == "basic/save-memory": turn = input_values.get("turn") if isinstance(input_values.get("turn"), dict) else {} dialog_in = input_values.get("dialog-in") if isinstance(input_values.get("dialog-in"), list) else turn.get("dialog", []) dialog_in = dialog_in if isinstance(dialog_in, list) else [] return { "outputs": {"dialog": None, "value": "", "unclear": dialog_in}, "runtime": {"result": "unclear", "matchId": "unclear", "value": "", "error": str(error)}, } return None class WorkflowServices: async def request_llm(self, **_: Any) -> str: raise WorkflowRuntimeError("Request LLM node needs backend workflow services.") async def classify(self, **_: Any) -> str: raise WorkflowRuntimeError("Classifier node needs backend workflow services.") async def extract_memory_field(self, **_: Any) -> dict: raise WorkflowRuntimeError("Save Memory node needs backend workflow services.") async def answer_from_context(self, **_: Any) -> str: raise WorkflowRuntimeError("Knowledge Answer node needs backend workflow services.") async def paraphrase_text(self, **kwargs: Any) -> str: return kwargs.get("text", "") async def execute_node( node: dict, inputs: dict, run_state: WorkflowRunState, services: WorkflowServices, context: dict, ) -> dict: node_type = node.get("type") data = node.get("data", {}) memory = run_state.memory or {} if node_type == "basic/text": input_text = inputs.get("text") output_text = input_text if input_text is not None else data.get("text", "") return {"outputs": {"text": output_text}, "runtime": {"inputText": input_text, "outputText": output_text}} if node_type == "basic/start": return {"outputs": {"dialog": []}, "runtime": {"started": True}} if node_type == "basic/update-role": role = str(data.get("role") or "").strip() dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else [] return { "outputs": {"dialog": dialog_in}, "runtime": {"assistantRole": role}, "assistantRolePatch": role, } if node_type == "basic/request": raw_system = inputs.get("system") or "" user = inputs.get("user") or "" if not raw_system and not user: return {"outputs": {"response": ""}, "runtime": {"response": ""}} system = apply_assistant_role(raw_system, run_state.assistant_role) response = await services.request_llm(system=system, user=user, node_id=node.get("id"), context=context) return {"outputs": {"response": response}, "runtime": {"response": response}} if node_type == "basic/classifier": question = inputs.get("question") or "" answer = inputs.get("answer") or "" options = data.get("options") or "" if not answer or not options: return {"outputs": {"result": ""}, "runtime": {"result": ""}} result = await services.classify( question=question, answer=answer, options=options, node_id=node.get("id"), context=context, ) return {"outputs": {"result": result}, "runtime": {"result": result}} if node_type == "basic/semantic-branch": turn = inputs.get("turn") if isinstance(inputs.get("turn"), dict) else {} question = inputs.get("question") or turn.get("question") or "" answer = inputs.get("answer") or turn.get("answer") or "" branch_payload = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else turn.get("dialog", True) choices = [choice for choice in ensure_choices(data.get("choices")) if choice.get("label", "").strip()] outputs = {choice["id"]: None for choice in ensure_choices(data.get("choices"))} outputs["unclear"] = None if not answer or not choices: outputs["unclear"] = branch_payload return {"outputs": outputs, "runtime": {"result": "unclear", "matchId": "unclear"}} result = await services.classify( question=question, answer=answer, options=[choice["label"] for choice in choices], node_id=node.get("id"), context=context, ) normalized = normalize_choice_text(result) matched = next((choice for choice in choices if normalize_choice_text(choice.get("label")) == normalized), None) if matched: outputs[matched["id"]] = branch_payload else: outputs["unclear"] = branch_payload return {"outputs": outputs, "runtime": {"result": result, "matchId": matched["id"] if matched else "unclear"}} if node_type == "basic/script": script_in_connected = "script-in" in inputs script_in = inputs.get("script-in") if script_in_connected and not script_in: return { "outputs": {"dialog": [], **({"script-text": None} if data.get("hasScriptOutput") else {})}, "runtime": {"messages": [], "scriptOutput": None}, } messages: List[dict] = [] if isinstance(script_in, list): messages.extend(script_in) for entry in ensure_script_entries(data.get("entries")): value = inputs.get(entry["id"]) if not value: continue if isinstance(value, list): messages.extend(value) else: messages.append({"type": entry["kind"], "text": value}) outputs = {"dialog": messages} script_output = None if data.get("hasScriptOutput"): script_edges = [edge for edge in context.get("outgoing_edges", []) if edge.get("sourceHandle") == "script-text"] target_node = None if script_edges: target_node = context.get("node_lookup", {}).get(script_edges[0].get("target")) script_output = messages[-1]["text"] if target_node and target_node.get("type") == "basic/text" and messages else messages outputs["script-text"] = script_output return {"outputs": outputs, "runtime": {"messages": messages, "scriptOutput": script_output}} if node_type == "basic/dialog": messages = inputs.get("dialog") if isinstance(inputs.get("dialog"), list) else [] return {"outputs": {}, "runtime": {"messages": messages, "cleared": False}} if node_type == "basic/assistant-message": text = inputs.get("text") or data.get("text") or "" rendered = render_memory_template(text, memory) message = ( await services.paraphrase_text( text=rendered, message_type="message", node_id=node.get("id"), context={**context, "assistant_role": run_state.assistant_role}, ) if data.get("paraphrase") else rendered ) dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else [] messages = append_character_message(dialog_in, message) return { "outputs": {"dialog": messages}, "runtime": {"message": message, "messages": messages, "assistantMessage": message}, } if node_type == "basic/save-memory": turn_value = inputs.get("turn") turn = turn_value if isinstance(turn_value, dict) else {} key = str(data.get("key") or "").strip() dialog_in = ( inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else turn_value if isinstance(turn_value, list) else turn.get("dialog", []) ) dialog_in = dialog_in if isinstance(dialog_in, list) else [] latest_turn = get_last_dialog_turn(dialog_in) answer = inputs.get("answer") or latest_turn.get("answer") or turn.get("answer") or "" question = inputs.get("question") or latest_turn.get("question") or turn.get("question") or "" input_text = inputs.get("text") or "" instruction = str(data.get("instruction") or "").replace("{text}", str(input_text)) unclear_outputs = {"dialog": None, "value": "", "unclear": dialog_in} if not key or not answer: return {"outputs": unclear_outputs, "runtime": {"result": "unclear", "matchId": "unclear", "value": ""}} response = await services.extract_memory_field( answer=answer, key=key, instruction=instruction, question=question, node_id=node.get("id"), context=context, ) value = response.get("value") value = str(value).strip() if value is not None else "" if not value: return { "outputs": unclear_outputs, "runtime": { "key": key, "value": "", "result": "unclear", "matchId": "unclear", "instruction": instruction, "inputText": input_text, }, } memory_patch = {key: value} return { "outputs": {"dialog": dialog_in, "value": value, "unclear": None}, "runtime": { "key": key, "value": value, "result": "saved", "matchId": "value", "instruction": instruction, "inputText": input_text, "memoryPatch": memory_patch, }, "memoryPatch": memory_patch, } if node_type == "basic/knowledge-answer": turn = inputs.get("turn") if isinstance(inputs.get("turn"), dict) else {} question = render_memory_template(inputs.get("question") or turn.get("answer") or "", memory) dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else turn.get("dialog", []) dialog_in = dialog_in if isinstance(dialog_in, list) else [] if not question: return {"outputs": {"dialog": dialog_in}, "runtime": {"answer": "", "assistantMessage": "", "messages": dialog_in}} answer = await services.answer_from_context( question=question, source=data.get("source") or "uploaded", context_path=data.get("contextPath") or "", node_id=node.get("id"), context={**context, "assistant_role": run_state.assistant_role}, ) messages = append_character_message(dialog_in, answer) return {"outputs": {"dialog": messages}, "runtime": {"answer": answer, "assistantMessage": answer, "messages": messages}} if node_type == "basic/counter": key = str(data.get("key") or "counter").strip() or "counter" try: limit = max(1, int(data.get("limit") or 3)) except (TypeError, ValueError): limit = 3 try: previous = int(memory.get(key, 0)) except (TypeError, ValueError): previous = 0 count = previous + 1 match_id = "done" if count >= limit else "continue" payload = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else True return { "outputs": { "continue": payload if match_id == "continue" else None, "done": payload if match_id == "done" else None, }, "runtime": { "key": key, "count": count, "limit": limit, "matchId": match_id, }, "memoryPatch": {key: count}, } if node_type == "basic/wait": try: wait_ms = max(0, int(data.get("ms") or 0)) except (TypeError, ValueError): wait_ms = 0 if wait_ms: await asyncio.sleep(wait_ms / 1000) dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else [] return { "outputs": {"dialog": dialog_in}, "runtime": {"waitedMs": wait_ms}, } if node_type == "basic/restart": return {"outputs": {}, "runtime": {"restart": True}, "runtimeAction": {"type": "restart"}} if node_type == "basic/randomlist": source = inputs.get("text") if not source or not str(source).strip(): return {"outputs": {"text": ""}, "runtime": {"selected": ""}} items = re.findall(r"'([^']*)'", str(source)) if not items: return {"outputs": {"text": ""}, "runtime": {"selected": ""}} selected = random.choice(items) return {"outputs": {"text": selected}, "runtime": {"selected": selected}} if node_type == "basic/send-websocket": input_text = inputs.get("text") or "" dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else [] url = data.get("url") or "" message = render_websocket_message_template(data.get("messageTemplate") or "", input_text, memory) result = await send_websocket_message(url, message) reply = result.get("reply") or "" return { "outputs": {"dialog": dialog_in, "text": reply}, "runtime": {"inputText": input_text, "dialog": dialog_in, "url": url, "message": message, "reply": reply}, } if node_type == "basic/json-parser": raw_json = inputs.get("json") or "" extracts = ensure_json_extracts(data.get("extracts")) outputs = {extract["id"]: "" for extract in extracts} if not raw_json or not str(raw_json).strip(): return {"outputs": outputs, "runtime": {"values": outputs}} try: parsed = json.loads(str(raw_json)) except json.JSONDecodeError as error: raise WorkflowRuntimeError(f"Invalid JSON: {error}") from error for extract in extracts: value = read_json_path(parsed, extract.get("path", "")) picked = pick_json_fields(value, extract.get("fields", "")) outputs[extract["id"]] = format_json_parser_output(picked) return {"outputs": outputs, "runtime": {"values": outputs}} if node_type == "basic/ifelse": input_value = inputs.get("input") conditions = ensure_conditions(data.get("conditions")) outputs = {condition["id"]: None for condition in conditions} match_id = None if not input_value: return {"outputs": outputs, "runtime": {"inputValue": "", "matchId": None}} normalized = str(input_value).strip().lower() for condition in conditions: keyword = condition.get("keyword") or "" if keyword and normalized == keyword.strip().lower(): outputs[condition["id"]] = True match_id = condition["id"] break return {"outputs": outputs, "runtime": {"inputValue": input_value, "matchId": match_id}} if node_type == COMPONENT_INPUT_TYPE: external_values = context.get("external_input_values") if isinstance(context.get("external_input_values"), dict) else None external_handle_id = data.get("externalHandleId") if external_values is not None and external_handle_id and external_handle_id in external_values: value = external_values[external_handle_id] else: value = context.get("external_input_value") return {"outputs": {"output": value}, "runtime": {"value": value}} if node_type == COMPONENT_OUTPUT_TYPE: value = inputs.get("input") return {"outputs": {}, "runtime": {"value": value}} return {"outputs": {}, "runtime": {}} async def continue_interactive_run( run_state: WorkflowRunState, *, user_answer: Optional[str] = None, services: Optional[WorkflowServices] = None, external_input_value: Any = None, external_input_values: Optional[dict] = None, context: Optional[dict] = None, on_event: EventCallback = None, ) -> dict: services = services or WorkflowServices() context = context or {} working_graph = run_state.graph if run_state.pending_question: result = await _resolve_pending_question( run_state, user_answer=user_answer, services=services, external_input_value=external_input_value, external_input_values=external_input_values, context=context, on_event=on_event, ) if result is not None: return result working_graph = run_state.graph while run_state.cursor < len(run_state.order): node_id = run_state.order[run_state.cursor] node = find_node(working_graph, node_id) if not node: run_state.cursor += 1 continue incoming_edges = run_state.incoming.get(node_id, []) input_values = { **collect_inputs_for_node(incoming_edges, run_state.outputs_by_node), **run_state.input_overrides_by_node.get(node_id, {}), } if incoming_edges and has_blocked_incoming_value(incoming_edges, input_values): run_state.outputs_by_node[node_id] = {} working_graph = update_node_runtime( working_graph, node_id, {"status": "skipped", "error": "", "inputs": input_values, "outputs": {}}, ) run_state.graph = working_graph _emit(on_event, {"type": "skip", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")}) run_state.cursor += 1 continue working_graph = update_node_runtime(working_graph, node_id, {"status": "running", "error": "", "inputs": input_values}) run_state.graph = working_graph _emit(on_event, {"type": "node-start", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")}) if node.get("type") == "basic/question": rendered = render_memory_template(input_values.get("question") or node.get("data", {}).get("question") or "", run_state.memory) question = ( await services.paraphrase_text( text=rendered, message_type="question", node_id=node_id, context={**context, "assistant_role": run_state.assistant_role}, ) if node.get("data", {}).get("paraphrase") else rendered ) pending = {"nodeId": node_id, "question": question, "inputs": input_values} working_graph = update_node_runtime( working_graph, node_id, {"status": "waiting", "error": "", "inputs": input_values, "outputs": {}, "question": question}, ) run_state.graph = working_graph run_state.pending_question = pending _emit(on_event, {"type": "assistant-question", "nodeId": node_id, "question": question}) return _runtime_response("paused", run_state, pending) try: if is_component_node_type(node.get("type")): result = await _execute_component_node( run_state, node, input_values, services=services, context=context, on_event=on_event, ) if result.get("status") in {"paused", "restart"}: return result node_result = result["node_result"] working_graph = run_state.graph else: node_lookup = {candidate.get("id"): candidate for candidate in working_graph.get("nodes", [])} outgoing_edges = run_state.outgoing.get(node_id, []) node_result = await execute_node( node, input_values, run_state, services, { **context, "external_input_value": external_input_value, "external_input_values": external_input_values, "connected_output_handles": [ edge.get("sourceHandle") for edge in outgoing_edges if edge.get("sourceHandle") ], "outgoing_edges": outgoing_edges, "node_lookup": node_lookup, }, ) if node_result.get("runtimeAction", {}).get("type") == "restart": working_graph = update_node_runtime( working_graph, node_id, { "status": "success", "error": "", "inputs": input_values, "outputs": node_result.get("outputs", {}), **node_result.get("runtime", {}), }, ) run_state.graph = working_graph _emit(on_event, {"type": "restart", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")}) return _runtime_response("restart", run_state, None) retry_result = await _maybe_pause_for_unclear_retry( run_state, node, input_values, node_result, services=services, context=context, on_event=on_event, ) if retry_result: return retry_result outputs = node_result.get("outputs", {}) run_state.outputs_by_node[node_id] = outputs run_state.input_overrides_by_node.pop(node_id, None) memory_patch = node_result.get("memoryPatch") if isinstance(memory_patch, dict): run_state.memory.update(memory_patch) if isinstance(node_result.get("assistantRolePatch"), str): run_state.assistant_role = node_result.get("assistantRolePatch") or DEFAULT_ASSISTANT_ROLE working_graph = update_node_runtime( working_graph, node_id, { "status": "success", "error": "", "inputs": input_values, "outputs": outputs, **node_result.get("runtime", {}), "assistantRole": run_state.assistant_role, }, ) run_state.graph = working_graph _emit(on_event, {"type": "node-success", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")}) assistant_message = node_result.get("runtime", {}).get("assistantMessage") if assistant_message: _emit(on_event, {"type": "assistant-message", "nodeId": node_id, "message": assistant_message}) if isinstance(memory_patch, dict): _emit(on_event, {"type": "memory-update", "nodeId": node_id, "memoryPatch": memory_patch}) if isinstance(node_result.get("assistantRolePatch"), str): _emit(on_event, {"type": "assistant-role-update", "nodeId": node_id, "assistantRole": run_state.assistant_role}) feedback_jump = get_feedback_jump(run_state, node_id, outputs) if feedback_jump: edge = feedback_jump["edge"] target_id = edge.get("target") current_overrides = run_state.input_overrides_by_node.get(target_id, {}) run_state.input_overrides_by_node[target_id] = { **current_overrides, edge.get("targetHandle") or "input": feedback_jump["outputValue"], } run_state.cursor = feedback_jump["targetIndex"] continue except Exception as error: _emit( on_event, { "type": "node-error", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type"), "error": str(error), }, ) unclear_result = build_unclear_node_result(node, input_values, error) if unclear_result: retry_result = await _maybe_pause_for_unclear_retry( run_state, node, input_values, unclear_result, services=services, context=context, on_event=on_event, ) if retry_result: return retry_result run_state.outputs_by_node[node_id] = {} working_graph = update_node_runtime( working_graph, node_id, {"status": "error", "error": str(error), "inputs": input_values, "outputs": {}}, ) run_state.graph = working_graph run_state.cursor += 1 run_state.completed = True run_state.pending_question = None return _runtime_response("complete", run_state, None) async def _resolve_pending_question( run_state: WorkflowRunState, *, user_answer: Optional[str], services: WorkflowServices, external_input_value: Any, external_input_values: Optional[dict], context: dict, on_event: EventCallback, ) -> Optional[dict]: pending = run_state.pending_question or {} working_graph = run_state.graph if pending.get("componentNodeId"): component_node_id = pending["componentNodeId"] component_node = find_node(working_graph, component_node_id) component_run = run_state.component_runs_by_node.get(component_node_id) component_input_values = pending.get("componentInputValues") or {} if not component_node or not component_run: raise WorkflowRuntimeError("Pending component question was not found.") component_run.memory = run_state.memory component_run.assistant_role = run_state.assistant_role component_result = await continue_interactive_run( component_run, user_answer=user_answer, services=services, external_input_value=component_input_values.get("input"), external_input_values=component_input_values, context=context, on_event=on_event, ) run_state.memory = component_result["state"].memory run_state.assistant_role = component_result["state"].assistant_role run_state.component_runs_by_node[component_node_id] = component_result["state"] working_graph = update_node_subgraph(working_graph, component_node_id, component_result["state"].graph) run_state.graph = working_graph if component_result["status"] == "paused": next_pending = { **(component_result.get("pending_question") or {}), "componentNodeId": component_node_id, "componentInputValues": component_input_values, } run_state.pending_question = next_pending run_state.graph = update_node_runtime( run_state.graph, component_node_id, {"status": "waiting", "error": "", "inputs": component_input_values, "outputs": {}}, ) return _runtime_response("paused", run_state, next_pending) if component_result["status"] == "restart": run_state.pending_question = None return _runtime_response("restart", run_state, None) component_outputs = collect_component_outputs(component_result["state"].graph) run_state.outputs_by_node[component_node_id] = component_outputs["outputs"] run_state.component_runs_by_node.pop(component_node_id, None) run_state.pending_question = None run_state.cursor += 1 run_state.graph = update_node_runtime( run_state.graph, component_node_id, { "status": "success", "error": "", "inputs": component_input_values, "outputs": component_outputs["outputs"], "lastOutput": component_outputs["lastOutput"], "assistantRole": run_state.assistant_role, }, ) return None if user_answer is None or not str(user_answer).strip(): return _runtime_response("paused", run_state, pending) pending_node = find_node(working_graph, pending.get("nodeId")) if not pending_node: raise WorkflowRuntimeError("Pending question node was not found.") answer = str(user_answer).strip() if pending.get("retryForNodeId"): retry_dialog = [ *(pending.get("inputs", {}).get("dialog-in") if isinstance(pending.get("inputs", {}).get("dialog-in"), list) else []), *([{"type": "character", "text": pending.get("question", "")}] if str(pending.get("question", "")).strip() else []), {"type": "user", "text": answer}, ] run_state.input_overrides_by_node[pending["retryForNodeId"]] = { "answer": answer, "question": pending.get("question", ""), "dialog-in": retry_dialog, } _emit(on_event, {"type": "user-answer", "nodeId": pending_node.get("id"), "answer": answer}) run_state.graph = update_node_runtime(run_state.graph, pending_node.get("id"), {"status": "running", "error": ""}) run_state.pending_question = None return None result = build_question_result(pending_node, pending.get("inputs", {}), answer, run_state.memory, pending.get("question", "")) run_state.outputs_by_node[pending_node.get("id")] = result["outputs"] run_state.graph = update_node_runtime( run_state.graph, pending_node.get("id"), { "status": "success", "error": "", "inputs": pending.get("inputs", {}), "outputs": result["outputs"], **result["runtime"], }, ) _emit(on_event, {"type": "user-answer", "nodeId": pending_node.get("id"), "answer": answer}) run_state.pending_question = None run_state.cursor += 1 return None async def _execute_component_node( run_state: WorkflowRunState, node: dict, input_values: dict, *, services: WorkflowServices, context: dict, on_event: EventCallback, ) -> dict: subgraph = node.get("data", {}).get("subgraph") or create_component_subgraph() component_run = create_interactive_run(subgraph) component_run.memory = run_state.memory component_run.assistant_role = run_state.assistant_role run_state.component_runs_by_node[node.get("id")] = component_run sub_result = await continue_interactive_run( component_run, services=services, external_input_value=input_values.get("input"), external_input_values=input_values, context=context, on_event=on_event, ) run_state.memory = sub_result["state"].memory run_state.assistant_role = sub_result["state"].assistant_role run_state.component_runs_by_node[node.get("id")] = sub_result["state"] run_state.graph = update_node_subgraph(run_state.graph, node.get("id"), sub_result["state"].graph) if sub_result["status"] == "paused": pending = { **(sub_result.get("pending_question") or {}), "componentNodeId": node.get("id"), "componentInputValues": input_values, } run_state.pending_question = pending run_state.graph = update_node_runtime( run_state.graph, node.get("id"), {"status": "waiting", "error": "", "inputs": input_values, "outputs": {}}, ) return _runtime_response("paused", run_state, pending) if sub_result["status"] == "restart": return _runtime_response("restart", run_state, None) component_outputs = collect_component_outputs(sub_result["state"].graph) run_state.component_runs_by_node.pop(node.get("id"), None) return { "status": "node-complete", "node_result": { "outputs": component_outputs["outputs"], "runtime": {"lastOutput": component_outputs["lastOutput"], "assistantRole": run_state.assistant_role}, }, } async def _maybe_pause_for_unclear_retry( run_state: WorkflowRunState, node: dict, input_values: dict, node_result: dict, *, services: WorkflowServices, context: dict, on_event: EventCallback, ) -> Optional[dict]: node_type = node.get("type") runtime = node_result.get("runtime", {}) data = node.get("data", {}) can_retry = ( node_type in {"basic/semantic-branch", "basic/save-memory"} and runtime.get("matchId") == "unclear" and data.get("retryOnUnclear", True) is not False ) if not can_retry: return None node_id = node.get("id") retry_count = run_state.retry_counts_by_node.get(node_id, 0) default_retry_question = "Не смогла уверенно понять ответ. Пожалуйста, ответьте ближе к одному из вариантов." raw_retry_question = data.get("retryQuestion") if "retryQuestion" in data else default_retry_question retry_template = render_memory_template( raw_retry_question, run_state.memory, ) if node_type == "basic/save-memory": retry_template = retry_template.replace("{text}", str(input_values.get("text") or "")) run_state.retry_counts_by_node[node_id] = retry_count + 1 retry_question = ( await services.paraphrase_text( text=retry_template, message_type="question", node_id=node_id, context={**context, "assistant_role": run_state.assistant_role}, ) if data.get("retryParaphrase") else retry_template ) pending = { "nodeId": node_id, "retryForNodeId": node_id, "question": retry_question, "inputs": input_values, } run_state.pending_question = pending run_state.graph = update_node_runtime( run_state.graph, node_id, { "status": "waiting", "error": "", "inputs": input_values, "outputs": {}, "result": "unclear", "matchId": "unclear", "retryCount": retry_count + 1, }, ) _emit(on_event, {"type": "assistant-question", "nodeId": node_id, "question": retry_question}) return _runtime_response("paused", run_state, pending) def _runtime_response(status: str, run_state: WorkflowRunState, pending_question: Optional[dict]) -> dict: return { "status": status, "state": run_state, "pending_question": pending_question, } def _emit(callback: EventCallback, event: dict) -> None: if callback: callback({**event, "timestamp": time.time()}) def serialize_run_response(run_id: str, result: dict, events: List[dict]) -> dict: state: WorkflowRunState = result["state"] return { "run_id": run_id, "status": result["status"], "pending_question": result.get("pending_question"), "completed": state.completed, "cursor": state.cursor, "order": state.order, "memory": state.memory, "assistant_role": state.assistant_role, "graph": state.graph, "events": events, }