nodes-ui-flow / backend /workflow_runtime.py
markitzeroo
Deploy updated nodes UI flow
1dd9186
from __future__ import annotations
import asyncio
import copy
import json
import random
import re
import time
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
COMPONENT_NODE_TYPE = "system/component"
COMPONENT_INPUT_TYPE = "system/component-input"
COMPONENT_OUTPUT_TYPE = "system/component-output"
FLOW_FORMAT = "nodes-ui-flow"
DEFAULT_ASSISTANT_ROLE = (
"Ты компьютерный ИИ-ассистент. Отвечай кратко, естественно, одним живым абзацем, "
"без Markdown, списков, заголовков и нумерации."
)
class WorkflowRuntimeError(Exception):
"""Raised when the Python workflow runtime cannot continue safely."""
EventCallback = Optional[Callable[[dict], None]]
@dataclass
class WorkflowRunState:
graph: dict
incoming: Dict[str, List[dict]]
outgoing: Dict[str, List[dict]]
feedback_outgoing: Dict[str, List[dict]]
order: List[str]
cursor: int = 0
outputs_by_node: Dict[str, dict] = field(default_factory=dict)
input_overrides_by_node: Dict[str, dict] = field(default_factory=dict)
memory: Dict[str, Any] = field(default_factory=dict)
assistant_role: str = DEFAULT_ASSISTANT_ROLE
retry_counts_by_node: Dict[str, int] = field(default_factory=dict)
component_runs_by_node: Dict[str, "WorkflowRunState"] = field(default_factory=dict)
pending_question: Optional[dict] = None
completed: bool = False
def parse_workflow_graph(workflow: dict, graph_path: Optional[List[str]] = None) -> dict:
if not isinstance(workflow, dict):
raise WorkflowRuntimeError("Invalid workflow payload.")
if workflow.get("format") == FLOW_FORMAT and isinstance(workflow.get("rootGraph"), dict):
graph = workflow["rootGraph"]
elif isinstance(workflow.get("nodes"), list) and isinstance(workflow.get("edges"), list):
graph = workflow
else:
raise WorkflowRuntimeError("Unsupported workflow format for Python runtime.")
for node_id in graph_path or []:
component = next(
(
node
for node in graph.get("nodes", [])
if node.get("id") == node_id and node.get("type") == COMPONENT_NODE_TYPE
),
None,
)
if not component:
raise WorkflowRuntimeError(f"Component graph was not found: {node_id}")
graph = component.get("data", {}).get("subgraph") or create_component_subgraph()
return copy.deepcopy(graph)
def create_component_subgraph() -> dict:
return {"nodes": [], "edges": [], "viewport": None}
def is_component_node_type(node_type: str) -> bool:
return node_type == COMPONENT_NODE_TYPE
def is_component_boundary_type(node_type: str) -> bool:
return node_type in {COMPONENT_INPUT_TYPE, COMPONENT_OUTPUT_TYPE}
def normalize_edge(edge: dict) -> dict:
return {
**edge,
"id": str(edge.get("id", "")),
"source": str(edge.get("source", "")),
"target": str(edge.get("target", "")),
"sourceHandle": edge.get("sourceHandle"),
"targetHandle": edge.get("targetHandle"),
"type": "default" if edge.get("type") == "smoothstep" else edge.get("type", "default"),
}
def hydrate_graph(graph: dict) -> dict:
return {
**(graph or {}),
"nodes": [hydrate_node(node) for node in (graph or {}).get("nodes", [])],
"edges": [normalize_edge(edge) for edge in (graph or {}).get("edges", [])],
"viewport": (graph or {}).get("viewport"),
}
def hydrate_node(node: dict) -> dict:
next_node = copy.deepcopy(node)
next_node["id"] = str(next_node.get("id", ""))
next_node.setdefault("data", {})
if next_node.get("type") == COMPONENT_NODE_TYPE:
subgraph = next_node["data"].get("subgraph")
next_node["data"]["subgraph"] = hydrate_graph(subgraph or create_component_subgraph())
return next_node
def create_runtime(node_type: str) -> dict:
return {
"type": node_type,
"status": "idle",
"error": "",
"inputs": {},
"outputs": {},
}
def prepare_graph_for_run(graph: dict) -> dict:
working_graph = hydrate_graph(graph)
nodes = []
for node in working_graph.get("nodes", []):
next_node = copy.deepcopy(node)
runtime = create_runtime(next_node.get("type", ""))
if next_node.get("type") == "basic/dialog":
runtime.update({"messages": [], "cleared": True})
next_node.setdefault("data", {})
next_node["data"]["runtime"] = runtime
nodes.append(next_node)
return {**working_graph, "nodes": nodes}
def build_edge_maps(edges: List[dict]) -> Tuple[Dict[str, List[dict]], Dict[str, List[dict]]]:
incoming: Dict[str, List[dict]] = {}
outgoing: Dict[str, List[dict]] = {}
for edge in edges:
incoming.setdefault(edge.get("target"), []).append(edge)
outgoing.setdefault(edge.get("source"), []).append(edge)
return incoming, outgoing
def has_path(source_id: str, target_id: str, edges: List[dict], visited: Optional[set] = None) -> bool:
if source_id == target_id:
return True
visited = visited or set()
if source_id in visited:
return False
visited.add(source_id)
return any(
has_path(edge.get("target"), target_id, edges, visited)
for edge in edges
if edge.get("source") == source_id
)
def split_feedback_edges(edges: List[dict]) -> Tuple[List[dict], List[dict]]:
acyclic: List[dict] = []
feedback: List[dict] = []
for edge in edges:
if has_path(edge.get("target"), edge.get("source"), acyclic):
feedback.append(edge)
else:
acyclic.append(edge)
return acyclic, feedback
def topological_sort(nodes: List[dict], incoming: Dict[str, List[dict]]) -> List[dict]:
node_lookup = {node.get("id"): node for node in nodes}
visited = set()
visiting = set()
sorted_nodes: List[dict] = []
def visit(node: dict):
node_id = node.get("id")
if node_id in visited:
return
if node_id in visiting:
raise WorkflowRuntimeError("Workflow contains a circular dependency.")
visiting.add(node_id)
for edge in incoming.get(node_id, []):
source = node_lookup.get(edge.get("source"))
if source:
visit(source)
visiting.remove(node_id)
visited.add(node_id)
sorted_nodes.append(node)
for node in nodes:
visit(node)
return sorted_nodes
def collect_reachable_node_ids(start_ids: List[str], edges: List[dict]) -> set:
reachable = set(start_ids)
outgoing_by_source: Dict[str, List[dict]] = {}
for edge in edges:
outgoing_by_source.setdefault(edge.get("source"), []).append(edge)
queue = list(start_ids)
while queue:
node_id = queue.pop(0)
for edge in outgoing_by_source.get(node_id, []):
target_id = edge.get("target")
if target_id not in reachable:
reachable.add(target_id)
queue.append(target_id)
return reachable
def include_input_dependency_node_ids(node_ids: set, edges: List[dict]) -> set:
required = set(node_ids)
incoming_by_target: Dict[str, List[dict]] = {}
for edge in edges:
incoming_by_target.setdefault(edge.get("target"), []).append(edge)
queue = list(node_ids)
while queue:
node_id = queue.pop(0)
for edge in incoming_by_target.get(node_id, []):
source_id = edge.get("source")
if source_id not in required:
required.add(source_id)
queue.append(source_id)
return required
def runtime_order(nodes: List[dict], edges: List[dict], incoming: Dict[str, List[dict]]) -> List[str]:
sorted_nodes = topological_sort(nodes, incoming)
start_ids = [node.get("id") for node in nodes if node.get("type") == "basic/start"]
if not start_ids:
return [node.get("id") for node in sorted_nodes]
reachable = collect_reachable_node_ids(start_ids, edges)
required = include_input_dependency_node_ids(reachable, edges)
return [node.get("id") for node in sorted_nodes if node.get("id") in required]
def create_interactive_run(graph: dict) -> WorkflowRunState:
working_graph = prepare_graph_for_run(graph)
acyclic_edges, feedback_edges = split_feedback_edges(working_graph.get("edges", []))
incoming, outgoing = build_edge_maps(acyclic_edges)
_, feedback_outgoing = build_edge_maps(feedback_edges)
order = runtime_order(working_graph.get("nodes", []), working_graph.get("edges", []), incoming)
return WorkflowRunState(
graph=working_graph,
incoming=incoming,
outgoing=outgoing,
feedback_outgoing=feedback_outgoing,
order=order,
)
def find_node(graph: dict, node_id: str) -> Optional[dict]:
return next((node for node in graph.get("nodes", []) if node.get("id") == node_id), None)
def update_node_runtime(graph: dict, node_id: str, runtime_patch: dict) -> dict:
next_graph = copy.deepcopy(graph)
for node in next_graph.get("nodes", []):
if node.get("id") != node_id:
continue
data = node.setdefault("data", {})
runtime = data.setdefault("runtime", {})
runtime.update(runtime_patch)
break
return next_graph
def update_node_subgraph(graph: dict, node_id: str, subgraph: dict) -> dict:
next_graph = copy.deepcopy(graph)
for node in next_graph.get("nodes", []):
if node.get("id") == node_id:
node.setdefault("data", {})["subgraph"] = subgraph
break
return next_graph
def collect_inputs_for_node(incoming_edges: List[dict], outputs_by_node: Dict[str, dict]) -> dict:
values_by_handle: Dict[str, List[Any]] = {}
for edge in incoming_edges or []:
source_outputs = outputs_by_node.get(edge.get("source"), {})
handle_id = edge.get("targetHandle") or "input"
value = source_outputs.get(edge.get("sourceHandle"))
values_by_handle.setdefault(handle_id, []).append(value)
result = {}
for handle_id, values in values_by_handle.items():
first_real = next((value for value in values if value is not None), None)
result[handle_id] = first_real if first_real is not None else (values[0] if values else None)
return result
def has_blocked_incoming_value(incoming_edges: List[dict], input_values: dict) -> bool:
handles = {edge.get("targetHandle") or "input" for edge in incoming_edges or []}
return any(input_values.get(handle_id) is None for handle_id in handles)
def render_memory_template(text: Any, memory: dict) -> str:
source = str(text or "")
def replace(match: re.Match) -> str:
key = match.group(1)
value = memory.get(key)
if value is None or value == "":
return match.group(0)
return str(value)
return re.sub(r"\{([a-zA-Z0-9_.-]+)\}", replace, source)
def render_websocket_message_template(text: Any, input_text: Any, memory: dict) -> str:
source = str(text or "")
def replace(match: re.Match) -> str:
key = match.group(1)
if key in {"text", "input"}:
return "" if input_text is None else str(input_text)
value = memory.get(key)
return match.group(0) if value is None else str(value)
return re.sub(r"\{([a-zA-Z0-9_.-]+)\}", replace, source)
def apply_assistant_role(system_prompt: Any, assistant_role: str) -> str:
role = str(assistant_role or "").strip()
if not role:
return str(system_prompt or "")
return (
f"{role}\n\n"
"Общие правила остаются неизменными: отвечай кратко, естественно, одним живым абзацем, "
"без Markdown, списков, заголовков и нумерации.\n\n"
f"{system_prompt or ''}"
).strip()
def get_last_dialog_turn(messages: Any) -> dict:
if not isinstance(messages, list):
return {"question": "", "answer": ""}
for index in range(len(messages) - 1, -1, -1):
message = messages[index]
if not isinstance(message, dict) or message.get("type") != "user" or not message.get("text"):
continue
previous = next(
(
item
for item in reversed(messages[:index])
if isinstance(item, dict) and item.get("type") == "character" and item.get("text")
),
None,
)
return {
"question": previous.get("text", "") if previous else "",
"answer": message.get("text", ""),
}
return {"question": "", "answer": ""}
def build_question_result(node: dict, input_values: dict, answer: str, memory: dict, question_override: str = "") -> dict:
question = question_override or render_memory_template(
input_values.get("question") or node.get("data", {}).get("question") or "",
memory,
)
dialog_in = input_values.get("dialog-in") if isinstance(input_values.get("dialog-in"), list) else []
messages = [
*dialog_in,
*([{"type": "character", "text": question}] if str(question or "").strip() else []),
{"type": "user", "text": answer},
]
return {
"outputs": {
"answer": answer,
"question": question,
"dialog": messages,
"turn": {
"answer": answer,
"question": question,
"dialog": messages,
},
},
"runtime": {
"question": question,
"answer": answer,
"messages": messages,
},
}
def ensure_choices(choices: Any) -> List[dict]:
if isinstance(choices, list) and choices:
return [
{
"id": item.get("id") or f"choice-{index}",
"label": item.get("label") or item.get("keyword") or "",
}
for index, item in enumerate(choices)
if isinstance(item, dict)
]
return [{"id": "choice-0", "label": ""}]
def ensure_conditions(conditions: Any) -> List[dict]:
if isinstance(conditions, list) and conditions:
return [
{
"id": item.get("id") or f"condition-{index}",
"keyword": item.get("keyword") or "",
}
for index, item in enumerate(conditions)
if isinstance(item, dict)
]
return [{"id": "condition-0", "keyword": ""}]
def ensure_script_entries(entries: Any) -> List[dict]:
if not isinstance(entries, list):
return []
result = []
for index, entry in enumerate(entries):
if not isinstance(entry, dict) or entry.get("kind") not in {"user", "character"}:
continue
result.append({"id": entry.get("id") or f"{entry.get('kind')}-{index}", "kind": entry.get("kind")})
return result
def ensure_json_extracts(extracts: Any) -> List[dict]:
if isinstance(extracts, list) and extracts:
return [
{
"id": item.get("id") or f"extract-{index}",
"label": item.get("label") or f"result {index}",
"path": item.get("path") or "",
"fields": item.get("fields") or "",
}
for index, item in enumerate(extracts)
if isinstance(item, dict)
]
return [{"id": "extract-0", "label": "items", "path": "data.filter_group.items", "fields": "key, name"}]
def normalize_choice_text(value: Any) -> str:
return re.sub(r"^[\"'«]+|[\"'»]+$", "", str(value or "").strip().lower())
def parse_json_path(path: str) -> List[Any]:
tokens: List[Any] = []
for part in str(path or "").strip().split("."):
if not part:
continue
for match in re.finditer(r"([^[\]]+)|\[(\d+|\*)?\]", part):
if match.group(1):
tokens.append(match.group(1))
elif match.group(2) in {None, "*"}:
tokens.append("*")
else:
tokens.append(int(match.group(2)))
return tokens
def read_json_path(value: Any, path: str) -> Any:
tokens = parse_json_path(path)
def read(current: Any, remaining: List[Any]) -> Any:
if not remaining:
return current
if current is None:
return None
token, rest = remaining[0], remaining[1:]
if token == "*":
if not isinstance(current, list):
return None
return [item for item in (read(item, rest) for item in current) if item is not None]
if isinstance(token, int):
if not isinstance(current, list) or token < 0 or token >= len(current):
return None
return read(current[token], rest)
if not isinstance(current, dict):
return None
return read(current.get(token), rest)
return read(value, tokens)
def parse_field_list(fields: str) -> List[str]:
return [field.strip() for field in str(fields or "").split(",") if field.strip()]
def pick_json_fields(value: Any, fields: str) -> Any:
field_list = parse_field_list(fields)
if not field_list:
return value
def pick_one(item: Any) -> dict:
if not isinstance(item, dict):
return {field.split(".")[-1]: None for field in field_list}
return {
field.split(".")[-1]: read_json_path(item, field)
for field in field_list
}
if isinstance(value, list):
return [pick_one(item) for item in value]
if isinstance(value, dict):
return pick_one(value)
return value
def format_json_parser_output(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, (int, float, bool)):
return str(value)
return json.dumps(value, ensure_ascii=False, indent=2)
def append_character_message(dialog_in: List[dict], text: Any) -> List[dict]:
message = str(text or "").strip()
return [*dialog_in, {"type": "character", "text": message}] if message else dialog_in
def validate_json_like_message(message: str) -> None:
trimmed = str(message or "").strip()
if not trimmed or (not trimmed.startswith("{") and not trimmed.startswith("[")):
return
json.loads(trimmed)
async def send_websocket_message(url: str, message: str) -> dict:
if not url or not url.strip():
raise WorkflowRuntimeError("WebSocket URL is empty.")
target_url = url.strip()
if not re.match(r"^wss?://", target_url, flags=re.IGNORECASE):
raise WorkflowRuntimeError(f"WebSocket URL must start with ws:// or wss://. Received: {target_url}")
try:
validate_json_like_message(message)
except json.JSONDecodeError as error:
raise WorkflowRuntimeError(f"Message body is not valid JSON: {error}") from error
try:
import websockets
except ImportError as error:
raise WorkflowRuntimeError(
"Python WebSocket client is not installed. Install backend dependency: websockets."
) from error
try:
async with websockets.connect(target_url, open_timeout=15) as socket:
await asyncio.sleep(0.12)
await socket.send(message)
try:
reply = await asyncio.wait_for(socket.recv(), timeout=15)
except asyncio.TimeoutError:
raise WorkflowRuntimeError("WebSocket timed out waiting for reply.")
return {"reply": reply if isinstance(reply, str) else "[binary message]"}
except WorkflowRuntimeError:
raise
except Exception as error:
raise WorkflowRuntimeError(f"WebSocket connection failed for {target_url}: {error}") from error
def collect_component_outputs(graph: dict) -> dict:
output_nodes = [node for node in graph.get("nodes", []) if node.get("type") == COMPONENT_OUTPUT_TYPE]
if not output_nodes:
return {"outputs": {"output": None}, "lastOutput": None}
outputs = {}
for index, node in enumerate(output_nodes):
handle_id = node.get("data", {}).get("externalHandleId") or ("output" if index == 0 else f"output-{index}")
outputs[handle_id] = node.get("data", {}).get("runtime", {}).get("value")
values = list(outputs.values())
return {
"outputs": outputs,
"lastOutput": values[0] if len(values) == 1 else outputs,
}
def get_feedback_jump(run_state: WorkflowRunState, source_node_id: str, outputs: dict) -> Optional[dict]:
for edge in run_state.feedback_outgoing.get(source_node_id, []):
value = outputs.get(edge.get("sourceHandle"))
if value is None:
continue
try:
target_index = run_state.order.index(edge.get("target"))
except ValueError:
continue
return {"edge": edge, "targetIndex": target_index, "outputValue": value}
return None
def build_unclear_node_result(node: dict, input_values: dict, error: Exception) -> Optional[dict]:
node_type = node.get("type")
if node_type == "basic/semantic-branch":
turn = input_values.get("turn") if isinstance(input_values.get("turn"), dict) else {}
branch_payload = input_values.get("dialog-in") if isinstance(input_values.get("dialog-in"), list) else turn.get("dialog", True)
outputs = {choice["id"]: None for choice in ensure_choices(node.get("data", {}).get("choices"))}
outputs["unclear"] = branch_payload
return {
"outputs": outputs,
"runtime": {"result": "unclear", "matchId": "unclear", "error": str(error)},
}
if node_type == "basic/save-memory":
turn = input_values.get("turn") if isinstance(input_values.get("turn"), dict) else {}
dialog_in = input_values.get("dialog-in") if isinstance(input_values.get("dialog-in"), list) else turn.get("dialog", [])
dialog_in = dialog_in if isinstance(dialog_in, list) else []
return {
"outputs": {"dialog": None, "value": "", "unclear": dialog_in},
"runtime": {"result": "unclear", "matchId": "unclear", "value": "", "error": str(error)},
}
return None
class WorkflowServices:
async def request_llm(self, **_: Any) -> str:
raise WorkflowRuntimeError("Request LLM node needs backend workflow services.")
async def classify(self, **_: Any) -> str:
raise WorkflowRuntimeError("Classifier node needs backend workflow services.")
async def extract_memory_field(self, **_: Any) -> dict:
raise WorkflowRuntimeError("Save Memory node needs backend workflow services.")
async def answer_from_context(self, **_: Any) -> str:
raise WorkflowRuntimeError("Knowledge Answer node needs backend workflow services.")
async def paraphrase_text(self, **kwargs: Any) -> str:
return kwargs.get("text", "")
async def execute_node(
node: dict,
inputs: dict,
run_state: WorkflowRunState,
services: WorkflowServices,
context: dict,
) -> dict:
node_type = node.get("type")
data = node.get("data", {})
memory = run_state.memory or {}
if node_type == "basic/text":
input_text = inputs.get("text")
output_text = input_text if input_text is not None else data.get("text", "")
return {"outputs": {"text": output_text}, "runtime": {"inputText": input_text, "outputText": output_text}}
if node_type == "basic/start":
return {"outputs": {"dialog": []}, "runtime": {"started": True}}
if node_type == "basic/update-role":
role = str(data.get("role") or "").strip()
dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else []
return {
"outputs": {"dialog": dialog_in},
"runtime": {"assistantRole": role},
"assistantRolePatch": role,
}
if node_type == "basic/request":
raw_system = inputs.get("system") or ""
user = inputs.get("user") or ""
if not raw_system and not user:
return {"outputs": {"response": ""}, "runtime": {"response": ""}}
system = apply_assistant_role(raw_system, run_state.assistant_role)
response = await services.request_llm(system=system, user=user, node_id=node.get("id"), context=context)
return {"outputs": {"response": response}, "runtime": {"response": response}}
if node_type == "basic/classifier":
question = inputs.get("question") or ""
answer = inputs.get("answer") or ""
options = data.get("options") or ""
if not answer or not options:
return {"outputs": {"result": ""}, "runtime": {"result": ""}}
result = await services.classify(
question=question,
answer=answer,
options=options,
node_id=node.get("id"),
context=context,
)
return {"outputs": {"result": result}, "runtime": {"result": result}}
if node_type == "basic/semantic-branch":
turn = inputs.get("turn") if isinstance(inputs.get("turn"), dict) else {}
question = inputs.get("question") or turn.get("question") or ""
answer = inputs.get("answer") or turn.get("answer") or ""
branch_payload = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else turn.get("dialog", True)
choices = [choice for choice in ensure_choices(data.get("choices")) if choice.get("label", "").strip()]
outputs = {choice["id"]: None for choice in ensure_choices(data.get("choices"))}
outputs["unclear"] = None
if not answer or not choices:
outputs["unclear"] = branch_payload
return {"outputs": outputs, "runtime": {"result": "unclear", "matchId": "unclear"}}
result = await services.classify(
question=question,
answer=answer,
options=[choice["label"] for choice in choices],
node_id=node.get("id"),
context=context,
)
normalized = normalize_choice_text(result)
matched = next((choice for choice in choices if normalize_choice_text(choice.get("label")) == normalized), None)
if matched:
outputs[matched["id"]] = branch_payload
else:
outputs["unclear"] = branch_payload
return {"outputs": outputs, "runtime": {"result": result, "matchId": matched["id"] if matched else "unclear"}}
if node_type == "basic/script":
script_in_connected = "script-in" in inputs
script_in = inputs.get("script-in")
if script_in_connected and not script_in:
return {
"outputs": {"dialog": [], **({"script-text": None} if data.get("hasScriptOutput") else {})},
"runtime": {"messages": [], "scriptOutput": None},
}
messages: List[dict] = []
if isinstance(script_in, list):
messages.extend(script_in)
for entry in ensure_script_entries(data.get("entries")):
value = inputs.get(entry["id"])
if not value:
continue
if isinstance(value, list):
messages.extend(value)
else:
messages.append({"type": entry["kind"], "text": value})
outputs = {"dialog": messages}
script_output = None
if data.get("hasScriptOutput"):
script_edges = [edge for edge in context.get("outgoing_edges", []) if edge.get("sourceHandle") == "script-text"]
target_node = None
if script_edges:
target_node = context.get("node_lookup", {}).get(script_edges[0].get("target"))
script_output = messages[-1]["text"] if target_node and target_node.get("type") == "basic/text" and messages else messages
outputs["script-text"] = script_output
return {"outputs": outputs, "runtime": {"messages": messages, "scriptOutput": script_output}}
if node_type == "basic/dialog":
messages = inputs.get("dialog") if isinstance(inputs.get("dialog"), list) else []
return {"outputs": {}, "runtime": {"messages": messages, "cleared": False}}
if node_type == "basic/assistant-message":
text = inputs.get("text") or data.get("text") or ""
rendered = render_memory_template(text, memory)
message = (
await services.paraphrase_text(
text=rendered,
message_type="message",
node_id=node.get("id"),
context={**context, "assistant_role": run_state.assistant_role},
)
if data.get("paraphrase")
else rendered
)
dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else []
messages = append_character_message(dialog_in, message)
return {
"outputs": {"dialog": messages},
"runtime": {"message": message, "messages": messages, "assistantMessage": message},
}
if node_type == "basic/save-memory":
turn_value = inputs.get("turn")
turn = turn_value if isinstance(turn_value, dict) else {}
key = str(data.get("key") or "").strip()
dialog_in = (
inputs.get("dialog-in")
if isinstance(inputs.get("dialog-in"), list)
else turn_value
if isinstance(turn_value, list)
else turn.get("dialog", [])
)
dialog_in = dialog_in if isinstance(dialog_in, list) else []
latest_turn = get_last_dialog_turn(dialog_in)
answer = inputs.get("answer") or latest_turn.get("answer") or turn.get("answer") or ""
question = inputs.get("question") or latest_turn.get("question") or turn.get("question") or ""
input_text = inputs.get("text") or ""
instruction = str(data.get("instruction") or "").replace("{text}", str(input_text))
unclear_outputs = {"dialog": None, "value": "", "unclear": dialog_in}
if not key or not answer:
return {"outputs": unclear_outputs, "runtime": {"result": "unclear", "matchId": "unclear", "value": ""}}
response = await services.extract_memory_field(
answer=answer,
key=key,
instruction=instruction,
question=question,
node_id=node.get("id"),
context=context,
)
value = response.get("value")
value = str(value).strip() if value is not None else ""
if not value:
return {
"outputs": unclear_outputs,
"runtime": {
"key": key,
"value": "",
"result": "unclear",
"matchId": "unclear",
"instruction": instruction,
"inputText": input_text,
},
}
memory_patch = {key: value}
return {
"outputs": {"dialog": dialog_in, "value": value, "unclear": None},
"runtime": {
"key": key,
"value": value,
"result": "saved",
"matchId": "value",
"instruction": instruction,
"inputText": input_text,
"memoryPatch": memory_patch,
},
"memoryPatch": memory_patch,
}
if node_type == "basic/knowledge-answer":
turn = inputs.get("turn") if isinstance(inputs.get("turn"), dict) else {}
question = render_memory_template(inputs.get("question") or turn.get("answer") or "", memory)
dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else turn.get("dialog", [])
dialog_in = dialog_in if isinstance(dialog_in, list) else []
if not question:
return {"outputs": {"dialog": dialog_in}, "runtime": {"answer": "", "assistantMessage": "", "messages": dialog_in}}
answer = await services.answer_from_context(
question=question,
source=data.get("source") or "uploaded",
context_path=data.get("contextPath") or "",
node_id=node.get("id"),
context={**context, "assistant_role": run_state.assistant_role},
)
messages = append_character_message(dialog_in, answer)
return {"outputs": {"dialog": messages}, "runtime": {"answer": answer, "assistantMessage": answer, "messages": messages}}
if node_type == "basic/counter":
key = str(data.get("key") or "counter").strip() or "counter"
try:
limit = max(1, int(data.get("limit") or 3))
except (TypeError, ValueError):
limit = 3
try:
previous = int(memory.get(key, 0))
except (TypeError, ValueError):
previous = 0
count = previous + 1
match_id = "done" if count >= limit else "continue"
payload = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else True
return {
"outputs": {
"continue": payload if match_id == "continue" else None,
"done": payload if match_id == "done" else None,
},
"runtime": {
"key": key,
"count": count,
"limit": limit,
"matchId": match_id,
},
"memoryPatch": {key: count},
}
if node_type == "basic/wait":
try:
wait_ms = max(0, int(data.get("ms") or 0))
except (TypeError, ValueError):
wait_ms = 0
if wait_ms:
await asyncio.sleep(wait_ms / 1000)
dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else []
return {
"outputs": {"dialog": dialog_in},
"runtime": {"waitedMs": wait_ms},
}
if node_type == "basic/restart":
return {"outputs": {}, "runtime": {"restart": True}, "runtimeAction": {"type": "restart"}}
if node_type == "basic/randomlist":
source = inputs.get("text")
if not source or not str(source).strip():
return {"outputs": {"text": ""}, "runtime": {"selected": ""}}
items = re.findall(r"'([^']*)'", str(source))
if not items:
return {"outputs": {"text": ""}, "runtime": {"selected": ""}}
selected = random.choice(items)
return {"outputs": {"text": selected}, "runtime": {"selected": selected}}
if node_type == "basic/send-websocket":
input_text = inputs.get("text") or ""
dialog_in = inputs.get("dialog-in") if isinstance(inputs.get("dialog-in"), list) else []
url = data.get("url") or ""
message = render_websocket_message_template(data.get("messageTemplate") or "", input_text, memory)
result = await send_websocket_message(url, message)
reply = result.get("reply") or ""
return {
"outputs": {"dialog": dialog_in, "text": reply},
"runtime": {"inputText": input_text, "dialog": dialog_in, "url": url, "message": message, "reply": reply},
}
if node_type == "basic/json-parser":
raw_json = inputs.get("json") or ""
extracts = ensure_json_extracts(data.get("extracts"))
outputs = {extract["id"]: "" for extract in extracts}
if not raw_json or not str(raw_json).strip():
return {"outputs": outputs, "runtime": {"values": outputs}}
try:
parsed = json.loads(str(raw_json))
except json.JSONDecodeError as error:
raise WorkflowRuntimeError(f"Invalid JSON: {error}") from error
for extract in extracts:
value = read_json_path(parsed, extract.get("path", ""))
picked = pick_json_fields(value, extract.get("fields", ""))
outputs[extract["id"]] = format_json_parser_output(picked)
return {"outputs": outputs, "runtime": {"values": outputs}}
if node_type == "basic/ifelse":
input_value = inputs.get("input")
conditions = ensure_conditions(data.get("conditions"))
outputs = {condition["id"]: None for condition in conditions}
match_id = None
if not input_value:
return {"outputs": outputs, "runtime": {"inputValue": "", "matchId": None}}
normalized = str(input_value).strip().lower()
for condition in conditions:
keyword = condition.get("keyword") or ""
if keyword and normalized == keyword.strip().lower():
outputs[condition["id"]] = True
match_id = condition["id"]
break
return {"outputs": outputs, "runtime": {"inputValue": input_value, "matchId": match_id}}
if node_type == COMPONENT_INPUT_TYPE:
external_values = context.get("external_input_values") if isinstance(context.get("external_input_values"), dict) else None
external_handle_id = data.get("externalHandleId")
if external_values is not None and external_handle_id and external_handle_id in external_values:
value = external_values[external_handle_id]
else:
value = context.get("external_input_value")
return {"outputs": {"output": value}, "runtime": {"value": value}}
if node_type == COMPONENT_OUTPUT_TYPE:
value = inputs.get("input")
return {"outputs": {}, "runtime": {"value": value}}
return {"outputs": {}, "runtime": {}}
async def continue_interactive_run(
run_state: WorkflowRunState,
*,
user_answer: Optional[str] = None,
services: Optional[WorkflowServices] = None,
external_input_value: Any = None,
external_input_values: Optional[dict] = None,
context: Optional[dict] = None,
on_event: EventCallback = None,
) -> dict:
services = services or WorkflowServices()
context = context or {}
working_graph = run_state.graph
if run_state.pending_question:
result = await _resolve_pending_question(
run_state,
user_answer=user_answer,
services=services,
external_input_value=external_input_value,
external_input_values=external_input_values,
context=context,
on_event=on_event,
)
if result is not None:
return result
working_graph = run_state.graph
while run_state.cursor < len(run_state.order):
node_id = run_state.order[run_state.cursor]
node = find_node(working_graph, node_id)
if not node:
run_state.cursor += 1
continue
incoming_edges = run_state.incoming.get(node_id, [])
input_values = {
**collect_inputs_for_node(incoming_edges, run_state.outputs_by_node),
**run_state.input_overrides_by_node.get(node_id, {}),
}
if incoming_edges and has_blocked_incoming_value(incoming_edges, input_values):
run_state.outputs_by_node[node_id] = {}
working_graph = update_node_runtime(
working_graph,
node_id,
{"status": "skipped", "error": "", "inputs": input_values, "outputs": {}},
)
run_state.graph = working_graph
_emit(on_event, {"type": "skip", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")})
run_state.cursor += 1
continue
working_graph = update_node_runtime(working_graph, node_id, {"status": "running", "error": "", "inputs": input_values})
run_state.graph = working_graph
_emit(on_event, {"type": "node-start", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")})
if node.get("type") == "basic/question":
rendered = render_memory_template(input_values.get("question") or node.get("data", {}).get("question") or "", run_state.memory)
question = (
await services.paraphrase_text(
text=rendered,
message_type="question",
node_id=node_id,
context={**context, "assistant_role": run_state.assistant_role},
)
if node.get("data", {}).get("paraphrase")
else rendered
)
pending = {"nodeId": node_id, "question": question, "inputs": input_values}
working_graph = update_node_runtime(
working_graph,
node_id,
{"status": "waiting", "error": "", "inputs": input_values, "outputs": {}, "question": question},
)
run_state.graph = working_graph
run_state.pending_question = pending
_emit(on_event, {"type": "assistant-question", "nodeId": node_id, "question": question})
return _runtime_response("paused", run_state, pending)
try:
if is_component_node_type(node.get("type")):
result = await _execute_component_node(
run_state,
node,
input_values,
services=services,
context=context,
on_event=on_event,
)
if result.get("status") in {"paused", "restart"}:
return result
node_result = result["node_result"]
working_graph = run_state.graph
else:
node_lookup = {candidate.get("id"): candidate for candidate in working_graph.get("nodes", [])}
outgoing_edges = run_state.outgoing.get(node_id, [])
node_result = await execute_node(
node,
input_values,
run_state,
services,
{
**context,
"external_input_value": external_input_value,
"external_input_values": external_input_values,
"connected_output_handles": [
edge.get("sourceHandle")
for edge in outgoing_edges
if edge.get("sourceHandle")
],
"outgoing_edges": outgoing_edges,
"node_lookup": node_lookup,
},
)
if node_result.get("runtimeAction", {}).get("type") == "restart":
working_graph = update_node_runtime(
working_graph,
node_id,
{
"status": "success",
"error": "",
"inputs": input_values,
"outputs": node_result.get("outputs", {}),
**node_result.get("runtime", {}),
},
)
run_state.graph = working_graph
_emit(on_event, {"type": "restart", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")})
return _runtime_response("restart", run_state, None)
retry_result = await _maybe_pause_for_unclear_retry(
run_state,
node,
input_values,
node_result,
services=services,
context=context,
on_event=on_event,
)
if retry_result:
return retry_result
outputs = node_result.get("outputs", {})
run_state.outputs_by_node[node_id] = outputs
run_state.input_overrides_by_node.pop(node_id, None)
memory_patch = node_result.get("memoryPatch")
if isinstance(memory_patch, dict):
run_state.memory.update(memory_patch)
if isinstance(node_result.get("assistantRolePatch"), str):
run_state.assistant_role = node_result.get("assistantRolePatch") or DEFAULT_ASSISTANT_ROLE
working_graph = update_node_runtime(
working_graph,
node_id,
{
"status": "success",
"error": "",
"inputs": input_values,
"outputs": outputs,
**node_result.get("runtime", {}),
"assistantRole": run_state.assistant_role,
},
)
run_state.graph = working_graph
_emit(on_event, {"type": "node-success", "nodeId": node_id, "title": node.get("data", {}).get("title") or node.get("type")})
assistant_message = node_result.get("runtime", {}).get("assistantMessage")
if assistant_message:
_emit(on_event, {"type": "assistant-message", "nodeId": node_id, "message": assistant_message})
if isinstance(memory_patch, dict):
_emit(on_event, {"type": "memory-update", "nodeId": node_id, "memoryPatch": memory_patch})
if isinstance(node_result.get("assistantRolePatch"), str):
_emit(on_event, {"type": "assistant-role-update", "nodeId": node_id, "assistantRole": run_state.assistant_role})
feedback_jump = get_feedback_jump(run_state, node_id, outputs)
if feedback_jump:
edge = feedback_jump["edge"]
target_id = edge.get("target")
current_overrides = run_state.input_overrides_by_node.get(target_id, {})
run_state.input_overrides_by_node[target_id] = {
**current_overrides,
edge.get("targetHandle") or "input": feedback_jump["outputValue"],
}
run_state.cursor = feedback_jump["targetIndex"]
continue
except Exception as error:
_emit(
on_event,
{
"type": "node-error",
"nodeId": node_id,
"title": node.get("data", {}).get("title") or node.get("type"),
"error": str(error),
},
)
unclear_result = build_unclear_node_result(node, input_values, error)
if unclear_result:
retry_result = await _maybe_pause_for_unclear_retry(
run_state,
node,
input_values,
unclear_result,
services=services,
context=context,
on_event=on_event,
)
if retry_result:
return retry_result
run_state.outputs_by_node[node_id] = {}
working_graph = update_node_runtime(
working_graph,
node_id,
{"status": "error", "error": str(error), "inputs": input_values, "outputs": {}},
)
run_state.graph = working_graph
run_state.cursor += 1
run_state.completed = True
run_state.pending_question = None
return _runtime_response("complete", run_state, None)
async def _resolve_pending_question(
run_state: WorkflowRunState,
*,
user_answer: Optional[str],
services: WorkflowServices,
external_input_value: Any,
external_input_values: Optional[dict],
context: dict,
on_event: EventCallback,
) -> Optional[dict]:
pending = run_state.pending_question or {}
working_graph = run_state.graph
if pending.get("componentNodeId"):
component_node_id = pending["componentNodeId"]
component_node = find_node(working_graph, component_node_id)
component_run = run_state.component_runs_by_node.get(component_node_id)
component_input_values = pending.get("componentInputValues") or {}
if not component_node or not component_run:
raise WorkflowRuntimeError("Pending component question was not found.")
component_run.memory = run_state.memory
component_run.assistant_role = run_state.assistant_role
component_result = await continue_interactive_run(
component_run,
user_answer=user_answer,
services=services,
external_input_value=component_input_values.get("input"),
external_input_values=component_input_values,
context=context,
on_event=on_event,
)
run_state.memory = component_result["state"].memory
run_state.assistant_role = component_result["state"].assistant_role
run_state.component_runs_by_node[component_node_id] = component_result["state"]
working_graph = update_node_subgraph(working_graph, component_node_id, component_result["state"].graph)
run_state.graph = working_graph
if component_result["status"] == "paused":
next_pending = {
**(component_result.get("pending_question") or {}),
"componentNodeId": component_node_id,
"componentInputValues": component_input_values,
}
run_state.pending_question = next_pending
run_state.graph = update_node_runtime(
run_state.graph,
component_node_id,
{"status": "waiting", "error": "", "inputs": component_input_values, "outputs": {}},
)
return _runtime_response("paused", run_state, next_pending)
if component_result["status"] == "restart":
run_state.pending_question = None
return _runtime_response("restart", run_state, None)
component_outputs = collect_component_outputs(component_result["state"].graph)
run_state.outputs_by_node[component_node_id] = component_outputs["outputs"]
run_state.component_runs_by_node.pop(component_node_id, None)
run_state.pending_question = None
run_state.cursor += 1
run_state.graph = update_node_runtime(
run_state.graph,
component_node_id,
{
"status": "success",
"error": "",
"inputs": component_input_values,
"outputs": component_outputs["outputs"],
"lastOutput": component_outputs["lastOutput"],
"assistantRole": run_state.assistant_role,
},
)
return None
if user_answer is None or not str(user_answer).strip():
return _runtime_response("paused", run_state, pending)
pending_node = find_node(working_graph, pending.get("nodeId"))
if not pending_node:
raise WorkflowRuntimeError("Pending question node was not found.")
answer = str(user_answer).strip()
if pending.get("retryForNodeId"):
retry_dialog = [
*(pending.get("inputs", {}).get("dialog-in") if isinstance(pending.get("inputs", {}).get("dialog-in"), list) else []),
*([{"type": "character", "text": pending.get("question", "")}] if str(pending.get("question", "")).strip() else []),
{"type": "user", "text": answer},
]
run_state.input_overrides_by_node[pending["retryForNodeId"]] = {
"answer": answer,
"question": pending.get("question", ""),
"dialog-in": retry_dialog,
}
_emit(on_event, {"type": "user-answer", "nodeId": pending_node.get("id"), "answer": answer})
run_state.graph = update_node_runtime(run_state.graph, pending_node.get("id"), {"status": "running", "error": ""})
run_state.pending_question = None
return None
result = build_question_result(pending_node, pending.get("inputs", {}), answer, run_state.memory, pending.get("question", ""))
run_state.outputs_by_node[pending_node.get("id")] = result["outputs"]
run_state.graph = update_node_runtime(
run_state.graph,
pending_node.get("id"),
{
"status": "success",
"error": "",
"inputs": pending.get("inputs", {}),
"outputs": result["outputs"],
**result["runtime"],
},
)
_emit(on_event, {"type": "user-answer", "nodeId": pending_node.get("id"), "answer": answer})
run_state.pending_question = None
run_state.cursor += 1
return None
async def _execute_component_node(
run_state: WorkflowRunState,
node: dict,
input_values: dict,
*,
services: WorkflowServices,
context: dict,
on_event: EventCallback,
) -> dict:
subgraph = node.get("data", {}).get("subgraph") or create_component_subgraph()
component_run = create_interactive_run(subgraph)
component_run.memory = run_state.memory
component_run.assistant_role = run_state.assistant_role
run_state.component_runs_by_node[node.get("id")] = component_run
sub_result = await continue_interactive_run(
component_run,
services=services,
external_input_value=input_values.get("input"),
external_input_values=input_values,
context=context,
on_event=on_event,
)
run_state.memory = sub_result["state"].memory
run_state.assistant_role = sub_result["state"].assistant_role
run_state.component_runs_by_node[node.get("id")] = sub_result["state"]
run_state.graph = update_node_subgraph(run_state.graph, node.get("id"), sub_result["state"].graph)
if sub_result["status"] == "paused":
pending = {
**(sub_result.get("pending_question") or {}),
"componentNodeId": node.get("id"),
"componentInputValues": input_values,
}
run_state.pending_question = pending
run_state.graph = update_node_runtime(
run_state.graph,
node.get("id"),
{"status": "waiting", "error": "", "inputs": input_values, "outputs": {}},
)
return _runtime_response("paused", run_state, pending)
if sub_result["status"] == "restart":
return _runtime_response("restart", run_state, None)
component_outputs = collect_component_outputs(sub_result["state"].graph)
run_state.component_runs_by_node.pop(node.get("id"), None)
return {
"status": "node-complete",
"node_result": {
"outputs": component_outputs["outputs"],
"runtime": {"lastOutput": component_outputs["lastOutput"], "assistantRole": run_state.assistant_role},
},
}
async def _maybe_pause_for_unclear_retry(
run_state: WorkflowRunState,
node: dict,
input_values: dict,
node_result: dict,
*,
services: WorkflowServices,
context: dict,
on_event: EventCallback,
) -> Optional[dict]:
node_type = node.get("type")
runtime = node_result.get("runtime", {})
data = node.get("data", {})
can_retry = (
node_type in {"basic/semantic-branch", "basic/save-memory"}
and runtime.get("matchId") == "unclear"
and data.get("retryOnUnclear", True) is not False
)
if not can_retry:
return None
node_id = node.get("id")
retry_count = run_state.retry_counts_by_node.get(node_id, 0)
default_retry_question = "Не смогла уверенно понять ответ. Пожалуйста, ответьте ближе к одному из вариантов."
raw_retry_question = data.get("retryQuestion") if "retryQuestion" in data else default_retry_question
retry_template = render_memory_template(
raw_retry_question,
run_state.memory,
)
if node_type == "basic/save-memory":
retry_template = retry_template.replace("{text}", str(input_values.get("text") or ""))
run_state.retry_counts_by_node[node_id] = retry_count + 1
retry_question = (
await services.paraphrase_text(
text=retry_template,
message_type="question",
node_id=node_id,
context={**context, "assistant_role": run_state.assistant_role},
)
if data.get("retryParaphrase")
else retry_template
)
pending = {
"nodeId": node_id,
"retryForNodeId": node_id,
"question": retry_question,
"inputs": input_values,
}
run_state.pending_question = pending
run_state.graph = update_node_runtime(
run_state.graph,
node_id,
{
"status": "waiting",
"error": "",
"inputs": input_values,
"outputs": {},
"result": "unclear",
"matchId": "unclear",
"retryCount": retry_count + 1,
},
)
_emit(on_event, {"type": "assistant-question", "nodeId": node_id, "question": retry_question})
return _runtime_response("paused", run_state, pending)
def _runtime_response(status: str, run_state: WorkflowRunState, pending_question: Optional[dict]) -> dict:
return {
"status": status,
"state": run_state,
"pending_question": pending_question,
}
def _emit(callback: EventCallback, event: dict) -> None:
if callback:
callback({**event, "timestamp": time.time()})
def serialize_run_response(run_id: str, result: dict, events: List[dict]) -> dict:
state: WorkflowRunState = result["state"]
return {
"run_id": run_id,
"status": result["status"],
"pending_question": result.get("pending_question"),
"completed": state.completed,
"cursor": state.cursor,
"order": state.order,
"memory": state.memory,
"assistant_role": state.assistant_role,
"graph": state.graph,
"events": events,
}