fffiloni's picture
Create app.py
3c75989 verified
"""
Mini Daggr UI Lab
=================
A self-contained experimental UI to explore and debug Daggr-like execution semantics.
This app is NOT a reimplementation of Daggr.
It is a **debug and exploration lab** focused on clarity and edge cases.
What this app models:
- A small DAG of function nodes
- Versioned inputs (controls) with history
- Nodes produce snapshots (history of executions)
- Each snapshot records:
- inputs values
- upstream snapshot indices
- control versions
- execution status (ok / error)
What the UI shows:
- Wire colors that explain *why* a node is fresh, stale, mismatched, failed, or blocked
- Snapshot history per node
- Restore strategies (node-only, upstream, downstream auto-match)
- Explicit visualization of error propagation and blocking
Key design goals:
- Make execution semantics observable
- Make restore behavior explainable
- Make failure propagation debuggable
- Keep everything explicit and readable
This file is intentionally large but structured in clear sections:
1. Helpers & utilities
2. Core execution engine (Graph + FrontendState)
3. Pipeline DSL (Pipeline, ports, wiring)
4. Pipeline definition (the part a developer edits)
5. UI rendering & event wiring (Gradio)
If you want to understand or modify the pipeline,
jump directly to the **PIPELINE DEFINITION** section.
"""
# ============================================================
# JUMP POINTS
# - PIPELINE DEFINITION
# - PIPELINE DSL
# - EXECUTION ENGINE
# - UI (GRADIO)
# ============================================================
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
import ast
import inspect
import textwrap
import hashlib
import json
import time
import re
import gradio as gr
# =============================================================================
# Helpers & UI primitives
# =============================================================================
def stable_hash(obj: Any) -> str:
"""
Stable, short hash for signatures:
- JSON-dumps with sort_keys
- sha256 truncated to 12
"""
s = json.dumps(obj, sort_keys=True, ensure_ascii=False, default=str)
return hashlib.sha256(s.encode("utf-8")).hexdigest()[:12]
WIRE_COLOR_MODE = "rich" # "rich" | "daggr"
def map_wire_color_for_ui(color: str) -> str:
if WIRE_COLOR_MODE == "daggr":
# Daggr-like: only "ok" vs "not ok"
if color == "green":
return "orange" # Gradio official orange
return "gray"
return color
def dot_html(color: str) -> str:
color = map_wire_color_for_ui(color)
css = {
"green": "#16a34a",
"orange": "#f59e0b",
"red": "#ef4444",
"blue": "#3b82f6",
"gray": "#9ca3af",
"purple": "#a855f7", # blocked
}.get(color, "#9ca3af")
return (
"<span style='display:inline-block;width:10px;height:10px;"
f"border-radius:999px;background:{css};vertical-align:middle;'></span>"
)
def emoji_wire(color: str) -> str:
color = map_wire_color_for_ui(color)
return {
"green": "🟢",
"orange": "🟠",
"red": "🔴",
"blue": "🔵",
"gray": "⚪",
"purple": "🟣",
}.get(color, "⚪")
def pretty_val(v: Any, max_len: int = 80) -> str:
s = repr(v)
if len(s) > max_len:
return s[: max_len - 1] + "…"
return s
def parse_hist_choice_idx(idx_str: str) -> Optional[int]:
"""
Parse strings like:
"12 | ✓ | run=... | ..."
Returns int index, or None.
"""
if not idx_str or idx_str.startswith("("):
return None
try:
return int(idx_str.split("|", 1)[0].strip())
except Exception:
return None
FILE_COMPONENTS = {
gr.Audio,
gr.Video,
gr.Image,
gr.File,
}
# =============================================================================
# TEMPORARY HARD LIMITS (build-time constraints)
# =============================================================================
DISABLE_MULTI_OUTPUT = True # multi-return + multi declared outputs disabled
DISABLE_FANOUT = True # same (node,port) cannot feed multiple children
# =============================================================================
# Wire legend (shown in UI)
# =============================================================================
COLOR_RULES_MD = (
"### Wire color rules\n"
"- 🔵 **blue**: node never ran yet (no selected snapshot)\n"
"- 🔴 **red**: node failed (selected error snapshot OR last run failed on this node)\n"
"- 🟣 **purple**: **blocked** (this node did not run because an upstream parent failed in the last run)\n"
"- ⚪ **gray**: structurally incompatible (indices not aligned)\n"
"- 🟠 **orange**: aligned but stale (controls changed since snapshot)\n"
"- 🟢 **green**: aligned and fresh\n"
)
GRAPH_MAP_MD = (
"## Graph map\n"
"```\n"
"Input[d] → D →\\\n"
" A → B → C\n"
"Input[e] → E →/ ^\n"
" Input[b]\n"
"```\n"
"\n"
"**Legend:**\n"
"- Boxes (D, E, A, B, C) are function nodes\n"
"- Input[x] are UI inputs (versioned, no run button)\n"
"- Colors apply to **wires and dots only** (see Global panels)\n"
)
def graph_map_md_simple_from_spec(spec: dict) -> str:
"""
Simple + pretty ASCII map (no globals), derived from a pipeline spec.
Goals:
- readable map like:
Input[d] → D →\
A → B → C
Input[e] → E →/ ^
Input[b]
- handles:
- one main merge (2+ parents into one node) if present
- multiple "late inputs" shown under a single caret
- chooses a stable main chain (longest path), so it doesn't randomly miss nodes
Not a general ASCII router: prioritizes clarity.
"""
# -----------------------------
# Helpers to read spec + labels
# -----------------------------
layout = spec.get("layout") or []
node_items = [it for col in layout for it in col if it.get("kind") == "node"]
input_items = [it for col in layout for it in col if it.get("kind") == "input"]
node_label = {it["name"]: it.get("label") or it["name"] for it in node_items}
input_label = {it["name"]: it.get("label") or it["name"] for it in input_items}
def N(n: str) -> str:
return node_label.get(n, n)
def I(c: str) -> str:
# display "Input[d]" even if label is "Input d"
# keep your existing style
return f"Input[{c}]"
# -----------------------------------------
# Build a minimal graph from node_items/wires
# -----------------------------------------
class _MiniGraph:
def __init__(self):
self.nodes = set()
self.parents = {} # node -> list[parent]
self.children = {} # node -> list[child]
def add_node(self, n: str):
self.nodes.add(n)
self.parents.setdefault(n, [])
self.children.setdefault(n, [])
def add_edge(self, p: str, c: str):
self.add_node(p)
self.add_node(c)
if p not in self.parents[c]:
self.parents[c].append(p)
if c not in self.children[p]:
self.children[p].append(c)
def parents_of(self, n: str) -> list[str]:
return list(self.parents.get(n, []))
def children_of(self, n: str) -> list[str]:
return list(self.children.get(n, []))
g = _MiniGraph()
for it in node_items:
g.add_node(it["name"])
# node wires: child_port -> (parent_node, parent_port)
for it in node_items:
child = it["name"]
for _child_port, (parent, _parent_port) in (it.get("wires") or {}).items():
if parent in g.nodes and child in g.nodes:
g.add_edge(parent, child)
# control feeds: input -> (node, port)
control_ports = {}
for it in input_items:
cname = it["name"]
feeds = it.get("feeds")
if isinstance(feeds, (tuple, list)) and len(feeds) == 2:
control_ports[cname] = (feeds[0], feeds[1]) # (node, port)
# map control -> target node
control_to_node = {c: tgt[0] for c, tgt in control_ports.items()}
def controls_feeding_node(n: str) -> list[str]:
return [c for c, nn in control_to_node.items() if nn == n]
nodes = list(g.nodes)
if not nodes:
return "## Graph map\n```\n(empty)\n```"
# -----------------------------
# Longest-path chain (stable)
# -----------------------------
def topo_order() -> list[str]:
indeg = {n: 0 for n in nodes}
for n in nodes:
for ch in g.children_of(n):
if ch in indeg:
indeg[ch] += 1
q = [n for n in nodes if indeg[n] == 0]
out = []
while q:
cur = q.pop(0)
out.append(cur)
for ch in g.children_of(cur):
if ch in indeg:
indeg[ch] -= 1
if indeg[ch] == 0:
q.append(ch)
# fallback if cycle or disconnected weirdness
for n in nodes:
if n not in out:
out.append(n)
return out
def longest_path_chain() -> list[str]:
topo = topo_order()
dist = {n: 0 for n in topo}
prev = {n: None for n in topo}
for n in topo:
for ch in g.children_of(n):
if ch not in dist:
continue
if dist[n] + 1 > dist[ch]:
dist[ch] = dist[n] + 1
prev[ch] = n
end = max(topo, key=lambda n: dist.get(n, 0))
chain = []
cur = end
while cur is not None:
chain.append(cur)
cur = prev[cur]
chain.reverse()
return chain
chain = longest_path_chain()
chain_set = set(chain)
# -----------------------------
# Pick a merge node (prefer on chain)
# -----------------------------
merge = None
for n in chain:
if len(g.parents_of(n)) >= 2:
merge = n
break
if merge is None:
# any merge anywhere
merge = next((n for n in nodes if len(g.parents_of(n)) >= 2), None)
# -----------------------------
# Build lines
# -----------------------------
lines: list[str] = []
# Case A: we have a merge and can show 2 parent lines nicely
if merge:
parents = g.parents_of(merge)
# choose two parents that themselves have a control (if possible)
def parent_score(p: str) -> int:
return 1 if controls_feeding_node(p) else 0
parents_sorted = sorted(parents, key=parent_score, reverse=True)
p1 = parents_sorted[0]
p2 = parents_sorted[1] if len(parents_sorted) > 1 else None
# left texts "Input[x] → P"
def left_txt(p: str) -> str:
cs = controls_feeding_node(p)
if cs:
# if multiple controls feed same node, show first (keep simple)
return f"{I(cs[0])}{N(p)}"
return f"{N(p)}"
top = f"{left_txt(p1)} →\\"
bot = f"{left_txt(p2)} →/" if p2 else ""
# chain from merge onward (if merge not in chain, just show merge then follow one child)
if merge in chain:
chain_from_merge = chain[chain.index(merge):]
else:
# fallback: merge + follow first child path
chain_from_merge = [merge]
cur = merge
seen = {merge}
while True:
kids = g.children_of(cur)
if not kids:
break
nxt = kids[0]
if nxt in seen:
break
chain_from_merge.append(nxt)
seen.add(nxt)
cur = nxt
mid_txt = " → ".join(N(n) for n in chain_from_merge)
slash_col = max(len(top), len(bot)) if bot else len(top)
top_line = top.ljust(slash_col).rstrip()
bot_line = bot.ljust(slash_col).rstrip() if bot else ""
mid_prefix = " " * slash_col
mid_line = f"{mid_prefix}{mid_txt}".rstrip()
lines.append(top_line)
lines.append(mid_line)
if bot_line:
lines.append(bot_line)
# Late controls (controls feeding nodes in chain, excluding parents shown above)
excluded_nodes = {p1}
if p2:
excluded_nodes.add(p2)
late_controls = []
for c, n in control_to_node.items():
if n in chain_set and n not in excluded_nodes:
late_controls.append((c, n))
# Put caret under the FIRST late target if any, else none
if late_controls:
c0, n0 = late_controls[0]
target = N(n0)
idx = mid_line.find(target)
if idx != -1:
caret_pos = idx + max(1, len(target) // 2)
lines.append((" " * caret_pos + "^").rstrip())
# show ALL late inputs, stacked under caret
for c_late, _n_late in late_controls:
pad = max(0, caret_pos - len(I(c_late)) // 2)
lines.append((" " * pad + I(c_late)).rstrip())
# Case B: no merge → single chain + (optional) late inputs
else:
chain_txt = " → ".join(N(n) for n in chain)
lines.append(chain_txt.rstrip())
late_controls = []
for c, n in control_to_node.items():
if n in chain_set:
late_controls.append((c, n))
if late_controls:
c0, n0 = late_controls[0]
target = N(n0)
idx = chain_txt.find(target)
if idx != -1:
caret_pos = idx + max(1, len(target) // 2)
lines.append((" " * caret_pos + "^").rstrip())
for c_late, _n_late in late_controls:
pad = max(0, caret_pos - len(I(c_late)) // 2)
lines.append((" " * pad + I(c_late)).rstrip())
# -----------------------------
# Ensure we didn't "forget" isolated nodes:
# if some nodes aren't in chain and not merge parents, list them in a small footer
# -----------------------------
shown_nodes = set()
for ln in lines:
for n in nodes:
if N(n) in ln:
shown_nodes.add(n)
missing = [n for n in nodes if n not in shown_nodes]
if missing:
lines.append("")
lines.append("Other nodes:")
lines.append(", ".join(N(n) for n in missing))
return "## Graph map\n```\n" + "\n".join(lines).rstrip() + "\n```"
def graph_map_md_simple() -> str:
"""
Simple + pretty ASCII map:
- supports one main merge (two parents into one node)
- supports one or more "late inputs" into chain nodes (rendered with ^ and Input[...] below)
Intended for readability, not full general routing.
Uses current globals: S (FrontendState), PIPELINE_LAYOUT, ui_node/ui_input, S.control_ports
"""
# --- collect nodes (exclude inputs) ---
nodes = list(S.g.nodes.keys())
# indegree / parents
parents = {n: S.g.parents_of(n) for n in nodes}
# find merge node: prefer first with >=2 parents
merge_nodes = [n for n in nodes if len(parents[n]) >= 2]
merge = merge_nodes[0] if merge_nodes else None
# build a main chain starting at merge (or a topo-first node)
def topo():
# tiny topo based on existing helper
# pick any node and ask topo_upstream_to then filter unique order
# better: compute topo over all nodes
indeg = {n: 0 for n in nodes}
for n in nodes:
for ch in S.g.children_of(n):
if ch in indeg:
indeg[ch] += 1
q = [n for n in nodes if indeg[n] == 0]
out = []
while q:
cur = q.pop(0)
out.append(cur)
for ch in S.g.children_of(cur):
if ch in indeg:
indeg[ch] -= 1
if indeg[ch] == 0:
q.append(ch)
# stable fallback
if len(out) != len(nodes):
for n in nodes:
if n not in out:
out.append(n)
return out
topo_order = topo()
# choose start of chain:
# - if merge exists, chain starts at merge
# - else, pick a sink-ish path from first topo node
if merge:
chain = [merge]
cur = merge
while True:
kids = S.g.children_of(cur)
if not kids:
break
# choose the child that appears earliest in topo_order after cur
kids_sorted = sorted(kids, key=lambda k: topo_order.index(k) if k in topo_order else 10**9)
nxt = kids_sorted[0]
if nxt in chain:
break
chain.append(nxt)
cur = nxt
else:
# pick a simple forward chain from first topo node
start = topo_order[0] if topo_order else None
chain = []
cur = start
while cur:
chain.append(cur)
kids = S.g.children_of(cur)
if not kids:
break
kids_sorted = sorted(kids, key=lambda k: topo_order.index(k) if k in topo_order else 10**9)
nxt = kids_sorted[0]
if nxt in chain:
break
cur = nxt
# helper labels
def N(n): # node label
return ui_node(n) if "ui_node" in globals() else n
def I(c): # input label
return f"Input[{c}]"
# map controls to their target node
control_to_node = {c: tgt[0] for c, tgt in S.control_ports.items()}
# find controls that feed the two parents of merge (if any)
# and controls that feed nodes inside the chain (late inputs)
def control_feeding_node(n):
return [c for c, nn in control_to_node.items() if nn == n]
# parent lines if merge
top_line = ""
bot_line = ""
mid_line = ""
caret_line = ""
below_line = ""
if merge and len(parents[merge]) >= 2:
p1, p2 = parents[merge][0], parents[merge][1]
# assume each parent has one direct control (Input[d] -> D, etc.) for pretty rendering
c1 = control_feeding_node(p1)
c2 = control_feeding_node(p2)
left1 = f"{I(c1[0])}{N(p1)}" if c1 else f"{N(p1)}"
left2 = f"{I(c2[0])}{N(p2)}" if c2 else f"{N(p2)}"
# build chain text from merge onward
chain_txt = " → ".join(N(n) for n in chain)
# We want:
# left1 → p1 →\
# merge → ...
# left2 → p2 →/
#
# alignment: put merge start under the end of slashes
left1_txt = f"{left1} →\\"
left2_txt = f"{left2} →/"
# choose indent so that merge begins under the slashes column
slash_col = max(len(left1_txt), len(left2_txt))
indent = " " * (slash_col - len(N(merge)) - 2) # approximate, then we’ll correct below
# better: force merge to start at column = slash_col
# So mid line begins with spaces to slash_col, then chain.
mid_prefix = " " * (slash_col)
mid_line = f"{mid_prefix}{chain_txt}"
# top and bottom lines padded to slash_col
top_line = left1_txt.ljust(slash_col) # ends at slash
bot_line = left2_txt.ljust(slash_col)
# Now add the merge node label on mid line, aligned under the slash join:
# We want the merge label at exactly slash_col - 1 maybe, but simpler:
# Put 15 spaces then "A → B → C"
# We'll insert merge label at start of chain_txt already includes it (chain starts with merge)
# So mid_line is fine.
# Late inputs: any control feeding a node in chain but not feeding the first parent nodes
late_controls = []
chain_set = set(chain)
for c, n in control_to_node.items():
if n in chain_set and n not in (p1, p2):
late_controls.append((c, n))
# only render the first late control nicely (matches your example)
if late_controls:
c_late, n_late = late_controls[0]
# position caret under the target node label in mid_line
# find substring " → <node> →" or "<node>" occurrence
target = N(n_late)
idx = mid_line.find(target)
if idx != -1:
caret_pos = idx + len(target)//2
caret_line = " " * caret_pos + "^"
below_line = " " * (max(0, caret_pos - len(I(c_late))//2)) + I(c_late)
else:
# no merge: just print a single chain, and optionally a late input caret
chain_txt = " → ".join(N(n) for n in chain)
mid_line = chain_txt
late_controls = []
chain_set = set(chain)
for c, n in control_to_node.items():
if n in chain_set:
late_controls.append((c, n))
if late_controls:
c_late, n_late = late_controls[0]
target = N(n_late)
idx = mid_line.find(target)
if idx != -1:
caret_pos = idx + len(target)//2
caret_line = " " * caret_pos + "^"
below_line = " " * (max(0, caret_pos - len(I(c_late))//2)) + I(c_late)
# assemble
lines = []
if top_line:
lines.append(top_line.rstrip())
if mid_line:
lines.append(mid_line.rstrip())
if bot_line:
lines.append(bot_line.rstrip())
if caret_line:
lines.append(caret_line.rstrip())
if below_line:
lines.append(below_line.rstrip())
# wrap markdown
return "## Graph map\n```\n" + "\n".join(lines) + "\n```"
# -----------------------------------------------------------------------------
# Badge label mode (global UI toggle)
# -----------------------------------------------------------------------------
BADGE_LABEL_MODE = "nodes" # "ports" | "nodes"
# =============================================================================
# EXECUTION ENGINE
# - Snapshot, RestoreContext
# - Graph
# - Controls
# - FrontendState
# =============================================================================
@dataclass
class Snapshot:
inputs: Dict[str, Any]
upstream_selected: Dict[str, int]
outputs: Optional[Dict[str, Any]]
sig: str
controls_selected: Dict[str, int]
status: str = "ok" # "ok" | "error"
error: Optional[str] = None
run_id: Optional[int] = None
run_phase: Optional[str] = None # upstream(auto) | upstream(force) | root | downstream(force)
run_step: Optional[int] = None # 0,1,2... within the run
@dataclass
class RestoreContext:
"""
A "solver context" used by restore_with_context:
- ctx.sel: proposed selected snapshot index per node
- ctx.ctrl_sel: proposed selected index per control
- ctx.locked: nodes that cannot be moved (root etc.)
- ctx.touched: nodes moved in a *pass* (prevents double-move within pass)
- ctx.reconciled: nodes frozen across passes (patch 3)
"""
root: str
mode: str
sel: Dict[str, int] = field(default_factory=dict)
locked: Set[str] = field(default_factory=set)
touched: Set[str] = field(default_factory=set)
ctrl_sel: Dict[str, int] = field(default_factory=dict)
ctrl_locked: Set[str] = field(default_factory=set)
reconciled: Set[str] = field(default_factory=set)
log: List[str] = field(default_factory=list)
def get_idx(self, node: str, fallback: Dict[str, int]) -> int:
return self.sel.get(node, fallback.get(node, -1))
def set_idx(self, node: str, idx: int, reason: str) -> bool:
if node in self.locked:
return False
if node in self.touched:
return False
self.sel[node] = idx
self.touched.add(node)
self.log.append(f"set {node} -> {idx} ({reason})")
return True
class FnNode:
def __init__(self, name: str, fn: Callable[..., Any], input_ports: List[str], output_ports: List[str]):
self.name = name
self.fn = fn
self.input_ports = input_ports
self.output_ports = output_ports
class Graph:
"""
Minimal directed graph with port-level wiring.
- rev maps (dst_node, dst_port) -> (src_node, src_port)
- fwd maps src_node -> [child_nodes]
"""
def __init__(self):
self.nodes: Dict[str, FnNode] = {}
self.rev: Dict[Tuple[str, str], Tuple[str, str]] = {}
self.fwd: Dict[str, List[str]] = {}
def add_node(self, node: FnNode) -> None:
self.nodes[node.name] = node
self.fwd.setdefault(node.name, [])
def connect(self, src_node: str, src_port: str, dst_node: str, dst_port: str) -> None:
self.rev[(dst_node, dst_port)] = (src_node, src_port)
self.fwd.setdefault(src_node, [])
if dst_node not in self.fwd[src_node]:
self.fwd[src_node].append(dst_node)
def parents_of(self, node_name: str) -> List[str]:
parents: List[str] = []
for port in self.nodes[node_name].input_ports:
key = (node_name, port)
if key in self.rev:
src, _ = self.rev[key]
parents.append(src)
return list(dict.fromkeys(parents))
def children_of(self, node_name: str) -> List[str]:
return list(self.fwd.get(node_name, []))
def deps_of(self, node_name: str) -> Set[str]:
"""All upstream nodes that node_name depends on (graph structure)."""
seen: Set[str] = set()
stack = [node_name]
while stack:
n = stack.pop()
for port in self.nodes[n].input_ports:
key = (n, port)
if key in self.rev:
src, _ = self.rev[key]
if src not in seen:
seen.add(src)
stack.append(src)
return seen
def downstream_of(self, node_name: str) -> Set[str]:
"""All downstream nodes reachable from node_name."""
seen: Set[str] = set()
stack = [node_name]
while stack:
n = stack.pop()
for ch in self.children_of(n):
if ch not in seen:
seen.add(ch)
stack.append(ch)
return seen
def topo_downstream_from(self, node_name: str) -> List[str]:
"""Topologically sorted downstream nodes starting from node_name (excluding node_name)."""
nodes = self.downstream_of(node_name)
if not nodes:
return []
nodes.add(node_name)
indeg = {n: 0 for n in nodes}
for n in nodes:
for ch in self.children_of(n):
if ch in nodes:
indeg[ch] += 1
q = [n for n in nodes if indeg[n] == 0]
order: List[str] = []
while q:
cur = q.pop(0)
if cur != node_name:
order.append(cur)
for ch in self.children_of(cur):
if ch in nodes:
indeg[ch] -= 1
if indeg[ch] == 0:
q.append(ch)
return order
def topo_upstream_to(self, node_name: str) -> List[str]:
"""
Topological order of deps(node_name) plus node_name, upstream -> downstream.
"""
nodes = self.deps_of(node_name) | {node_name}
indeg = {n: 0 for n in nodes}
for n in nodes:
for ch in self.children_of(n):
if ch in nodes:
indeg[ch] += 1
q = [n for n in nodes if indeg[n] == 0]
order: List[str] = []
while q:
cur = q.pop(0)
order.append(cur)
for ch in self.children_of(cur):
if ch in nodes:
indeg[ch] -= 1
if indeg[ch] == 0:
q.append(ch)
return order
# =============================================================================
# Versioned UI controls (Daggr-like)
# =============================================================================
class Controls:
"""
Versioned controls:
- values[name]: current UI string (not necessarily committed)
- history[name]: list of committed values
- selected[name]: index into history
"""
def __init__(self, names: List[str], reuse_identical_values: bool = False):
self.names = names
self.reuse_identical_values = reuse_identical_values
self.reset()
def reset(self) -> None:
self.values: Dict[str, Any] = {n: "" for n in self.names}
self.history: Dict[str, List[Any]] = {n: [] for n in self.names}
self.selected: Dict[str, int] = {n: -1 for n in self.names}
def set_ui_value(self, name: str, value: Any) -> None:
self.values[name] = value
cur_idx = self.selected.get(name, -1)
if 0 <= cur_idx < len(self.history[name]) and values_equal(self.history[name][cur_idx], value):
return
if self.reuse_identical_values:
for i, v in enumerate(self.history[name]):
if values_equal(v, value):
self.selected[name] = i
return
self.history[name].append(value)
self.selected[name] = len(self.history[name]) - 1
def set_selected(self, name: str, idx: int) -> None:
if idx < 0 or idx >= len(self.history[name]):
raise ValueError(f"bad control idx for {name}: {idx}")
self.selected[name] = idx
self.values[name] = self.history[name][idx]
def get_selected_value(self, name: str) -> Any:
idx = self.selected.get(name, -1)
if idx < 0:
return None
return self.history[name][idx]
# =============================================================================
# Frontend-like state engine
# =============================================================================
class FrontendState:
"""
Central state machine of the app.
Responsibilities:
- Store snapshot histories per node
- Track selected snapshots
- Execute nodes (run semantics)
- Resolve inputs from parents and controls
- Determine wire colors (fresh / stale / error / blocked)
- Implement restore strategies
This class intentionally mixes "engine" and "UI-facing semantics":
the goal is debuggability, not strict layering.
"""
def __init__(self, g: Graph, controls: Controls, control_ports: Dict[str, Tuple[str, str]]):
self.g = g
self.controls = controls
self.control_ports = control_ports
self.last_run_meta: Dict[str, str] = {}
self.reset()
# -------------------------
# Reset / lifecycle helpers
# -------------------------
def reset(self) -> None:
self.selected_index: Dict[str, int] = {}
self.history: Dict[str, List[Snapshot]] = {n: [] for n in self.g.nodes}
self.last_run_note: Optional[str] = None
self.last_run_meta = {}
self.last_run_failed: bool = False
self.last_run_error: Optional[dict] = None
self.run_counter: int = 0
self.active_run_id: Optional[int] = None
self.active_run_step: int = 0
self.last_run_executed: Set[str] = set()
self.last_run_blocked_nodes: Set[str] = set()
def clear_transient_run_state(self) -> None:
"""
Clears transient UI state (used by restore/nav/input changes).
This prevents "phantom red" and stale banners after changing context.
"""
self.last_run_note = None
self.last_run_meta.clear()
self.last_run_failed = False
self.last_run_error = None
self.last_run_executed = set()
self.last_run_blocked_nodes = set()
def start_run(self) -> int:
"""
Begin a run transaction:
- increments run_counter
- allocates active_run_id
- resets active_run_step
"""
self.run_counter += 1
self.active_run_id = self.run_counter
self.active_run_step = 0
return self.active_run_id
def end_run(self) -> None:
self.active_run_id = None
self.active_run_step = 0
# -------------------------
# Input resolution
# -------------------------
def never_ran(self, node: str) -> bool:
return len(self.history.get(node, [])) == 0
def _resolve_inputs_for_node(self, node: str) -> Tuple[Dict[str, Any], Dict[str, int]]:
"""
Build inputs dict for node:
- for wired ports: read selected snapshot output from parent
- for control ports: read selected control version
Also returns upstream_selected fingerprints.
"""
inputs: Dict[str, Any] = {}
upstream_selected: Dict[str, int] = {}
for port in self.g.nodes[node].input_ports:
key = (node, port)
if key in self.g.rev:
src_node, src_port = self.g.rev[key]
idx = self.selected_index.get(src_node, -1)
upstream_selected[src_node] = idx
if idx < 0:
raise RuntimeError(f"Upstream '{src_node}' has no selected snapshot for {node}.{port}")
snap = self.history[src_node][idx]
if not snap.outputs:
raise RuntimeError(f"Upstream '{src_node}' selected snapshot has no outputs for {node}.{port}")
inputs[port] = snap.outputs[src_port]
continue
# Find matching control for this (node, port)
control_name = None
for cname, (n, p) in self.control_ports.items():
if n == node and p == port:
control_name = cname
break
if control_name is None:
raise RuntimeError(f"Unwired port {node}.{port} has no control mapping.")
inputs[port] = self.controls.get_selected_value(control_name)
return inputs, upstream_selected
def _controls_in_scope(self, node: str) -> Set[str]:
"""
Controls considered "in-scope" for snapshot freshness:
all controls feeding the dependency subgraph (deps(node) ∪ {node}).
"""
deps = self.g.deps_of(node) | {node}
used = set()
for cname, (n, _p) in self.control_ports.items():
if n in deps:
used.add(cname)
return used
def _ensure_required_controls_committed(self, node: str) -> None:
"""
Controls that directly feed this node must have a selected version.
(UI: blur/apply/Run commits current textbox values.)
"""
for cname, (n, _p) in self.control_ports.items():
if n == node and self.controls.selected.get(cname, -1) < 0:
raise RuntimeError(
f"Control '{cname}' has no selected value yet "
f"(edit textbox + blur, or click Apply)."
)
# -------------------------
# Running nodes
# -------------------------
def run_one(
self,
node: str,
on_error: str = "no_snapshot",
run_phase: Optional[str] = None,
) -> None:
inputs, upstream_selected = self._resolve_inputs_for_node(node)
node_spec = self.g.nodes[node]
out_ports = node_spec.output_ports
status = "ok"
error: Optional[str] = None
outputs: Optional[Dict[str, Any]] = None
try:
raw = node_spec.fn(**inputs)
# -------------------------------------------------
# OUTPUT RESOLUTION (single vs multi-output)
# -------------------------------------------------
# Case 1: function returned multiple values
if isinstance(raw, (tuple, list)):
if DISABLE_MULTI_OUTPUT:
raise RuntimeError(
f"Node '{node}' returned {len(raw)} values but multi-output is disabled "
f"(DISABLE_MULTI_OUTPUT=True)."
)
# Fallback: implicit out1/out2/... if only 'out' declared
if out_ports == ["out"]:
outputs = {f"out{i+1}": raw[i] for i in range(len(raw))}
else:
if len(raw) != len(out_ports):
raise RuntimeError(
f"{node} returned {len(raw)} values but "
f"{len(out_ports)} outputs are declared: {out_ports}"
)
outputs = {
out_ports[i]: raw[i]
for i in range(len(out_ports))
}
# Case 2: single value returned
else:
if len(out_ports) != 1:
raise RuntimeError(
f"{node} returned a single value but declares outputs {out_ports}"
)
outputs = {out_ports[0]: raw}
except Exception as e:
status = "error"
error = repr(e)
outputs = None
# -------------------------
# SNAPSHOT METADATA
# -------------------------
ctrl_scope = self._controls_in_scope(node)
controls_selected = {c: self.controls.selected[c] for c in ctrl_scope}
step = self.active_run_step
self.active_run_step += 1
sig = stable_hash(
{
"status": status,
"error": error,
"inputs": inputs,
"up": upstream_selected,
"outputs": outputs,
"ctrl": controls_selected,
"run_id": self.active_run_id,
"run_phase": run_phase,
"run_step": step,
}
)
snap = Snapshot(
inputs=inputs,
upstream_selected=upstream_selected,
outputs=outputs,
sig=sig,
controls_selected=controls_selected,
status=status,
error=error,
run_id=self.active_run_id,
run_phase=run_phase,
run_step=step,
)
# -------------------------
# COMMIT SNAPSHOT
# -------------------------
if status == "ok":
self.history[node].append(snap)
self.selected_index[node] = len(self.history[node]) - 1
return
if on_error == "snapshot":
self.history[node].append(snap)
self.selected_index[node] = len(self.history[node]) - 1
return
raise RuntimeError(f"{node} failed: {error}")
def run_with_upstream(self, node: str, on_error: str = "no_snapshot") -> None:
"""
Run missing upstream nodes recursively, then run the node.
"""
self._ensure_required_controls_committed(node)
for p in self.g.parents_of(node):
if self.selected_index.get(p, -1) < 0:
self.run_with_upstream(p, on_error=on_error)
self.run_one(node, on_error=on_error, run_phase="upstream(auto)")
def run_force_upstream(self, node: str, topo: bool = True, on_error: str = "no_snapshot") -> None:
"""
Force recompute deps(node) + node, optionally in topological order.
"""
deps_plus = list(self.g.deps_of(node) | {node})
order = self.g.topo_upstream_to(node) if topo else deps_plus
for n in order:
self._ensure_required_controls_committed(n)
# Safety: if parents are missing, auto-create them
for p in self.g.parents_of(n):
if self.selected_index.get(p, -1) < 0:
self.run_with_upstream(p, on_error=on_error)
self.run_one(n, on_error=on_error, run_phase="upstream(force)")
def run_force_downstream_from_selected(self, root: str, on_error: str = "no_snapshot") -> None:
"""
Force recompute downstream nodes starting at root.
Root must have a selected snapshot.
"""
if self.selected_index.get(root, -1) < 0:
raise RuntimeError(f"Cannot force-downstream: root '{root}' has no selected snapshot")
order = self.g.topo_downstream_from(root)
for n in order:
self._ensure_required_controls_committed(n)
for p in self.g.parents_of(n):
if self.selected_index.get(p, -1) < 0:
self.run_with_upstream(p, on_error=on_error)
self.run_one(n, on_error=on_error, run_phase="downstream(force)")
# -------------------------
# Alignment + freshness
# -------------------------
def aligned_edge(self, parent: str, node: str) -> bool:
idx_n = self.selected_index.get(node, -1)
if idx_n < 0 or idx_n >= len(self.history[node]):
return False
snap = self.history[node][idx_n]
return snap.upstream_selected.get(parent) == self.selected_index.get(parent)
def globally_fresh(self, node: str) -> bool:
"""
Fresh if the selected snapshot for node references the same selected controls
(in its recorded scope) as current controls.
"""
idx = self.selected_index.get(node, -1)
if idx < 0 or idx >= len(self.history[node]):
return False
snap = self.history[node][idx]
for c, used_idx in snap.controls_selected.items():
if self.controls.selected.get(c) != used_idx:
return False
return True
def would_run_be_noop(self, node: str) -> bool:
"""
True si re-run du node produirait exactement le même contexte que le snapshot sélectionné:
- mêmes inputs résolus
- mêmes upstream_selected
- mêmes controls_selected (scope)
et snapshot sélectionné ok.
"""
idx = self.selected_index.get(node, -1)
if idx < 0 or idx >= len(self.history.get(node, [])):
return False
snap = self.history[node][idx]
if snap.status != "ok":
return False
# Contexte actuel
inputs, upstream_selected = self._resolve_inputs_for_node(node)
ctrl_scope = self._controls_in_scope(node)
controls_selected = {c: self.controls.selected.get(c, -1) for c in ctrl_scope}
# Comparaison (values_equal gère audio/np.array/dicts)
return (
values_equal(snap.inputs, inputs)
and snap.upstream_selected == upstream_selected
and snap.controls_selected == controls_selected
)
# -------------------------
# Error / blocked UI semantics
# -------------------------
def selected_snapshot(self, node: str) -> Optional[Snapshot]:
idx = self.selected_index.get(node, -1)
if idx < 0:
return None
hist = self.history.get(node, [])
if idx >= len(hist):
return None
return hist[idx]
def is_error_selected(self, node: str) -> bool:
snap = self.selected_snapshot(node)
return bool(snap and snap.status == "error")
def selected_snapshot_status(self, node: str) -> Optional[str]:
idx = self.selected_index.get(node, -1)
if idx < 0 or idx >= len(self.history.get(node, [])):
return None
return self.history[node][idx].status
def last_run_error_relevant(self, node: str) -> bool:
"""
True if:
- last run failed
- failure node == node
- and the failure fingerprint still matches current context (parents + controls-in-scope)
This prevents phantom red after restore/nav/input changes.
"""
err = getattr(self, "last_run_error", None)
if not err or not getattr(self, "last_run_failed", False):
return False
if err.get("node") != node:
return False
parents_sel = err.get("parents_sel", None)
ctrl_sel = err.get("ctrl_sel", None)
# If fingerprint missing (older runs), fall back to old behavior
if parents_sel is None or ctrl_sel is None:
return True
# 1) parents must still match
for p, want in parents_sel.items():
if self.selected_index.get(p, -1) != want:
return False
# 2) controls-in-scope must still match
for c, want in ctrl_sel.items():
if self.controls.selected.get(c, -1) != want:
return False
return True
def edge_is_blocked_ui(self, parent: str, child: str) -> bool:
if not getattr(self, "last_run_failed", False):
return False
err = getattr(self, "last_run_error", None)
if not err:
return False
# node-level blocked: any incoming edge to a blocked node is purple,
# but only if that node did NOT execute in the last run.
return (
child in getattr(self, "last_run_blocked_nodes", set())
and child not in getattr(self, "last_run_executed", set())
)
def node_is_in_error_ui(self, node: str) -> bool:
"""
A node is "in error UI" if:
- it is not blocked by a parent failure (purple dominates for blocked children)
- and either:
- selected snapshot is error, OR
- last run failed on this node and the error fingerprint still matches.
"""
# if blocked => don't show as error
for p in self.g.parents_of(node):
if self.edge_is_blocked_ui(p, node):
return False
if self.is_error_selected(node):
return True
return self.last_run_error_relevant(node)
# -------------------------
# Wire coloring
# -------------------------
def control_wire_color(self, control_name: str) -> str:
"""
Control wire (Input[x] -> target node port):
- blue : target node never ran yet OR no selected snapshot
- red : target node in error UI
- gray : target snapshot doesn't reference this control (rare)
- orange : this control changed since snapshot
- green : this control matches snapshot
"""
if control_name not in self.control_ports:
return "gray"
node, _port = self.control_ports[control_name]
if self.never_ran(node):
return "blue"
idx_n = self.selected_index.get(node, -1)
if idx_n < 0 or idx_n >= len(self.history[node]):
return "blue"
snap = self.history[node][idx_n]
if self.node_is_in_error_ui(node):
return "red"
used = snap.controls_selected.get(control_name, None)
if used is None:
return "gray"
cur = self.controls.selected.get(control_name, -1)
if cur != used:
return "orange"
return "green"
def wire_color(self, parent: str, node: str) -> str:
"""
Graph wire (parent -> child):
Priority:
0) purple blocked (last run)
1) blue never ran (child)
2) red child in error UI
3) gray misaligned structure
4) orange stale (parent not globally fresh)
5) green
"""
if self.edge_is_blocked_ui(parent, node):
return "purple"
if self.never_ran(node):
return "blue"
if self.node_is_in_error_ui(node):
return "red"
if not self.aligned_edge(parent, node):
return "gray"
if not self.globally_fresh(parent):
return "orange"
return "green"
# -------------------------
# Restore (simple) + Restore (context solver)
# -------------------------
def restore(self, node: str, idx: int, mode: str) -> None:
if idx < 0 or idx >= len(self.history[node]):
raise ValueError("bad idx")
snap = self.history[node][idx]
if mode == "node_only":
self.selected_index[node] = idx
return
self.selected_index[node] = idx
for c, used_idx in snap.controls_selected.items():
self.controls.set_selected(c, used_idx)
if mode == "node+controls":
return
if mode in ("node+controls+upstream", "node+controls+upstream+downstream_match"):
self._restore_upstream_recursive(node)
if mode == "node+controls+upstream":
return
if mode == "node+controls+upstream+downstream_match":
self._automatch_downstream_from(node)
return
raise ValueError(f"Unknown restore mode: {mode}")
def _restore_upstream_recursive(self, node: str) -> None:
idx = self.selected_index.get(node, -1)
if idx < 0:
return
snap = self.history[node][idx]
for parent, used_idx in snap.upstream_selected.items():
if used_idx is None or used_idx < 0:
continue
if used_idx < len(self.history[parent]):
self.selected_index[parent] = used_idx
self._restore_upstream_recursive(parent)
# ---- Context solver helpers (downstream matching + reconcile) ----
def restore_with_context(self, node: str, idx: int, mode: str, reconcile: bool = False) -> RestoreContext:
ctx = RestoreContext(root=node, mode=mode)
ctx.reconciled.clear()
ctx.locked.add(node)
ctx.sel[node] = idx
snap = self.history[node][idx]
# restore controls if requested
if "controls" in mode:
for c, used in snap.controls_selected.items():
ctx.ctrl_sel[c] = used
# restore upstream recursively (context)
if "upstream" in mode:
self._restore_upstream_recursive_ctx(node, ctx)
# downstream match (context)
if "downstream_match" in mode:
self._automatch_downstream_ctx(ctx, reconcile=reconcile)
return ctx
def commit_restore_context(self, ctx: RestoreContext) -> None:
# apply node selection
for node, idx in ctx.sel.items():
self.selected_index[node] = idx
# apply control selection
for c, idx in ctx.ctrl_sel.items():
self.controls.set_selected(c, idx)
def _restore_upstream_recursive_ctx(self, node: str, ctx: RestoreContext) -> None:
idx = ctx.get_idx(node, self.selected_index)
if idx < 0 or idx >= len(self.history.get(node, [])):
return
snap = self.history[node][idx]
for parent, used_idx in snap.upstream_selected.items():
if used_idx is None or used_idx < 0:
continue
if used_idx >= len(self.history.get(parent, [])):
continue
if parent in ctx.locked:
continue
prev = ctx.sel.get(parent, None)
if prev is None:
ctx.sel[parent] = used_idx
elif prev != used_idx:
ctx.log.append(
f"upstream CONFLICT {parent}: keep {prev}, ignore {used_idx} (from {node}#{idx})"
)
continue
# propagate controls conservatively
if "controls" in ctx.mode:
psnap = self.history[parent][ctx.sel[parent]]
for c, want in psnap.controls_selected.items():
if c in ctx.ctrl_locked:
continue
ctx.ctrl_sel.setdefault(c, want)
self._restore_upstream_recursive_ctx(parent, ctx)
def _controls_scope_for_node(self, node: str) -> Set[str]:
return self._controls_in_scope(node)
def _is_snapshot_compatible(self, node: str, snap: Snapshot) -> bool:
# parents must match current selected snapshots
for p in self.g.parents_of(node):
cur_p = self.selected_index.get(p, -1)
if cur_p < 0:
return False
if snap.upstream_selected.get(p) != cur_p:
return False
# controls-in-scope must match current selected controls
scope = self._controls_scope_for_node(node)
for c in scope:
cur = self.controls.selected.get(c, -1)
if cur < 0:
return False
used = snap.controls_selected.get(c, None)
if used is None:
return False
if used != cur:
return False
return True
def _is_snapshot_compatible_ctx(self, node: str, snap: Snapshot, ctx: RestoreContext) -> bool:
# parents must match via ctx
for p in self.g.parents_of(node):
want = snap.upstream_selected.get(p)
have = ctx.get_idx(p, self.selected_index)
if have < 0 or want != have:
return False
# controls must match for ALL controls in scope
scope = self._controls_scope_for_node(node)
for c in scope:
want = snap.controls_selected.get(c, None)
if want is None:
return False
have = ctx.ctrl_sel.get(c, self.controls.selected.get(c, -1))
if want != have:
return False
return True
def _find_best_compatible_snapshot_index(self, node: str) -> Optional[int]:
hist = self.history.get(node, [])
for i in range(len(hist) - 1, -1, -1):
if self._is_snapshot_compatible(node, hist[i]):
return i
return None
def _automatch_downstream_from(self, root: str) -> None:
order = self.g.topo_downstream_from(root)
max_passes = max(1, len(order) + 1)
for _ in range(max_passes):
changed = False
for n in order:
best = self._find_best_compatible_snapshot_index(n)
cur = self.selected_index.get(n, -1)
if best is not None and best != cur:
self.selected_index[n] = best
changed = True
if not changed:
break
# --- reconcile helpers (Option 1, inline) ---
def _snapshot_respects_locked_parents_ctx(self, child: str, snap: Snapshot, ctx: RestoreContext) -> bool:
for p in self.g.parents_of(child):
want = snap.upstream_selected.get(p, -1)
have = ctx.get_idx(p, self.selected_index)
if p in ctx.locked or p in ctx.touched:
if want != have:
return False
return True
def _try_reconcile_child_ctx(self, child: str, ctx: RestoreContext) -> bool:
"""
Inline reconcile:
- choose an existing child snapshot that matches locked parents
- move other direct parents to indices required by that snapshot
- select that child snapshot
"""
hist_c = self.history.get(child, [])
if not hist_c:
return False
for cidx in range(len(hist_c) - 1, -1, -1):
snap_c = hist_c[cidx]
if not self._snapshot_respects_locked_parents_ctx(child, snap_c, ctx):
continue
ok = True
moved_any = False
for p in self.g.parents_of(child):
want = snap_c.upstream_selected.get(p, -1)
have = ctx.get_idx(p, self.selected_index)
if want == have:
continue
if p in ctx.locked or p in ctx.touched:
ok = False
break
if want < 0 or want >= len(self.history.get(p, [])):
ok = False
break
if not self._is_snapshot_compatible_ctx(p, self.history[p][want], ctx):
ok = False
break
if not ctx.set_idx(p, want, f"reconcile parent for {child}"):
ok = False
break
moved_any = True
ctx.reconciled.add(p) # freeze across passes
if not ok:
continue
cur_child = ctx.get_idx(child, self.selected_index)
if cur_child != cidx:
if ctx.set_idx(child, cidx, "child snapshot (reconciled)"):
for c, want in snap_c.controls_selected.items():
if c in ctx.ctrl_locked:
continue
ctx.ctrl_sel[c] = want
ctx.reconciled.add(child)
return True
else:
if moved_any:
for c, want in snap_c.controls_selected.items():
if c in ctx.ctrl_locked:
continue
ctx.ctrl_sel[c] = want
ctx.reconciled.add(child)
return True
return False
def _automatch_downstream_ctx(self, ctx: RestoreContext, reconcile: bool = False) -> None:
order = self.g.topo_downstream_from(ctx.root)
if not order:
return
max_passes = max(1, len(order) * (3 if reconcile else 1))
for _ in range(max_passes):
ctx.touched.clear() # per-pass
changed = False
for n in order:
if n in ctx.reconciled:
continue
hist = self.history.get(n, [])
best = None
for i in range(len(hist) - 1, -1, -1):
if self._is_snapshot_compatible_ctx(n, hist[i], ctx):
best = i
break
cur = ctx.get_idx(n, self.selected_index)
if best is not None:
if cur != best:
changed |= ctx.set_idx(n, best, "downstream match")
continue
if reconcile and len(self.g.parents_of(n)) >= 2:
if self._try_reconcile_child_ctx(n, ctx):
changed = True
continue
if not changed:
break
# -------------------------
# UI inspect helpers
# -------------------------
def input_port_colors(self, node: str) -> List[Tuple[str, str, str]]:
res: List[Tuple[str, str, str]] = []
for port in self.g.nodes[node].input_ports:
key = (node, port)
if key in self.g.rev:
src_node, _src_port = self.g.rev[key]
color = self.wire_color(src_node, node)
res.append((port, src_node, color))
continue
control_name = None
for cname, (n, p) in self.control_ports.items():
if n == node and p == port:
control_name = cname
break
if control_name is None:
res.append((port, "unwired", "gray"))
else:
color = self.control_wire_color(control_name)
res.append((port, f"Input[{control_name}]", color))
return res
def output_edge_colors(self, node: str) -> List[Tuple[str, str, str]]:
res: List[Tuple[str, str, str]] = []
for (dst_node, dst_port), (src_node, src_port) in self.g.rev.items():
if src_node != node:
continue
color = self.wire_color(node, dst_node)
res.append((src_port, f"{dst_node}.{dst_port}", color))
res.sort(key=lambda t: (t[0], t[1]))
return res
def history_list(self, node: str) -> List[str]:
hist = self.history.get(node, [])
if not hist:
return ["(empty)"]
out: List[str] = []
for i, s in enumerate(hist):
prefix = "❌" if s.status == "error" else "✓"
rid = f"run={s.run_id}:{s.run_step}" if s.run_id is not None else "run=—"
ph = f"phase={s.run_phase}" if s.run_phase else "phase=—"
st = f"status={s.status}"
out.append(
f"{i} | {prefix} | {rid} | {ph} | {st} | sig={s.sig} | up={s.upstream_selected} | ctrl={s.controls_selected}"
)
return out
def current_output_text(self, node: str) -> str:
idx = self.selected_index.get(node, -1)
if idx < 0 or idx >= len(self.history[node]):
return ""
snap = self.history[node][idx]
if snap.status == "error":
return f"❌ ERROR: {snap.error}"
if not snap.outputs:
return ""
return json.dumps(snap.outputs, ensure_ascii=False)
def debug_state(self) -> str:
obj = {
"selected_index": self.selected_index,
"controls_selected": self.controls.selected,
"controls_values": self.controls.values,
"reuse_identical_values": getattr(self.controls, "reuse_identical_values", False),
"controls_history_sizes": {k: len(v) for k, v in self.controls.history.items()},
"history_sizes": {k: len(v) for k, v in self.history.items()},
}
return json.dumps(obj, ensure_ascii=False, indent=2, default=str)
# =============================================================================
# PIPELINE DSL
# Declarative pipeline, ports, wiring (no strings)
# =============================================================================
def _slug(s: str) -> str:
s = s.strip().lower()
s = re.sub(r"[^a-z0-9]+", "_", s)
s = re.sub(r"_+", "_", s).strip("_")
return s or "item"
def _uniq(base: str, used: set[str]) -> str:
if base not in used:
used.add(base)
return base
i = 2
while f"{base}_{i}" in used:
i += 1
out = f"{base}_{i}"
used.add(out)
return out
@dataclass
class _InputSpec:
name: str # internal stable id (short)
label: str # human label
default: Any = ""
component: Any = gr.Textbox
component_kwargs: Dict[str, Any] = field(default_factory=dict)
badge_position: str = "below"
feeds: Optional[Tuple[str, str]] = None # (node_name, port)
@dataclass
class _NodeSpec:
name: str # internal stable id (short)
label: str # human label
fn: Callable[..., Any]
inputs: List[str] = field(default_factory=list)
outputs: List[str] = field(default_factory=lambda: ["out"])
outputs_open: bool = False # outputs_open=True means outputs may be dynamically extended via tuple fallback (out1, out2, ...)
wires: Dict[str, Tuple[str, str]] = field(default_factory=dict) # child_port -> (parent_node, parent_port)
out_component: Any = gr.Textbox
out_component_kwargs: Dict[str, Any] = field(default_factory=dict)
# ---- Public DSL objects ----
# InputRef, NodeRef, PortRef
class InputRef:
def __init__(self, p: "Pipeline", key: str):
self._p = p
self.key = key
@property
def out(self) -> PortRef:
return PortRef(self._p, kind="control", owner_key=self.key, port="out")
def __rshift__(self, other: PortRef):
self._p.connect(self.out, other)
return other
def feeds(self, node: "NodeRef", *, as_: str):
self._p.connect(
PortRef(self._p, "control", self.key, "out"),
PortRef(self._p, "node_in", node.key, as_),
)
return node
class NodeRef:
def __init__(self, p: "Pipeline", key: str):
self._p = p
self.key = key
self.i = _InPorts(p, key) # ✅ inputs namespace
self.o = _OutPorts(p, key) # ✅ outputs namespace
@property
def out(self) -> PortRef:
# alias pratique: A.out == A.o.out
return self.o.out
def __getattr__(self, name: str) -> PortRef:
if name.startswith("_"):
raise AttributeError(name)
# ✅ permettre A.d (input direct) == A.i.d
return getattr(self.i, name)
def produces(self, port: str):
return Produced(self._p, self.key, port)
def __repr__(self) -> str:
return f"NodeRef({self.key})"
# Ports are accessed dynamically:
# D.x -> input port
# D.out -> default output
# D.o.a -> named output (multi-output)
#
# This allows wiring without strings:
# D.out >> A.d
@dataclass(frozen=True)
class PortRef:
p: "Pipeline"
kind: str # "control" | "node_out" | "node_in"
owner_key: str # input key or node key
port: str # port name
def __rshift__(self, other: "PortRef"):
self.p.connect(self, other)
return other
class Produced:
def __init__(self, p: "Pipeline", node_key: str, port: str):
self._p = p
self._node_key = node_key
self._port = port
def feeds(self, node: "NodeRef", *, as_: str | None = None):
dst_port = as_ or self._port
self._p.connect(
PortRef(self._p, "node_out", self._node_key, self._port),
PortRef(self._p, "node_in", node.key, dst_port),
)
return node
# ---- Internal wiring helpers ----
# _InPorts, _OutPorts
class _InPorts:
def __init__(self, p: "Pipeline", node_key: str):
self._p = p
self._node_key = node_key
def __getattr__(self, name: str) -> PortRef:
if name.startswith("_"):
raise AttributeError(name)
spec = self._p._nodes.get(self._node_key)
if not spec:
raise AttributeError(f"Unknown node '{self._node_key}'")
if name not in spec.inputs:
raise AttributeError(
f"Unknown input port '{name}' on node '{self._node_key}'. "
f"Known inputs: {spec.inputs}"
)
return PortRef(self._p, kind="node_in", owner_key=self._node_key, port=name)
class _OutPorts:
def __init__(self, p: "Pipeline", node_key: str):
self._p = p
self._node_key = node_key
def __getattr__(self, name: str) -> PortRef:
if name.startswith("_"):
raise AttributeError(name)
spec = self._p._nodes.get(self._node_key)
if not spec:
raise AttributeError(f"Unknown node '{self._node_key}'")
if name not in spec.outputs:
if getattr(spec, "outputs_open", False) and re.fullmatch(r"out[1-9]\d*", name):
# retire le placeholder implicite
if spec.outputs == ["out"]:
spec.outputs.clear()
spec.outputs.append(name)
spec.outputs.sort(key=lambda s: int(s[3:]))
else:
raise AttributeError(
f"Unknown output port '{name}' on node '{self._node_key}'. "
f"Known outputs: {spec.outputs}"
)
return PortRef(self._p, kind="node_out", owner_key=self._node_key, port=name)
# ---- Pipeline builder ----
# Pipeline
class Pipeline:
"""
Declarative DSL used to describe:
- Inputs (controls)
- Nodes (functions)
- Ports (inputs / outputs)
- Wiring (using >> operators)
- UI layout (columns)
Design principles:
- Human-readable labels
- Stable internal identifiers
- Explicit wiring
- Minimal boilerplate for the developer
"""
def __init__(self):
self._inputs: Dict[str, _InputSpec] = {}
self._nodes: Dict[str, _NodeSpec] = {}
self._layout_cols: List[List[Tuple[str, str]]] = [] # [ [("input", key), ("node", key)], ... ]
self._used_names: set[str] = set()
def _warn(self, msg: str):
print(f"[Pipeline] {msg}")
# ---- declaration API ----
def input(
self,
label: str,
*,
name: Optional[str] = None,
default: Any = None,
component=gr.Textbox,
component_kwargs: Optional[Dict[str, Any]] = None,
badge_position: str = "below",
) -> InputRef:
key = name or _slug(label)
key = _uniq(key, self._used_names)
if default is None and component is gr.Textbox:
default = ""
if component in FILE_COMPONENTS and default in ("", "/app"):
default = None
self._inputs[key] = _InputSpec(
name=key,
label=label,
default=default,
component=component,
component_kwargs=dict(component_kwargs or {}),
badge_position=badge_position,
)
return InputRef(self, key)
def node(
self,
label: str,
*,
fn: Callable[..., Any],
inputs: Optional[list[str]] = None,
outputs: Optional[list[str]] = None,
name: Optional[str] = None,
out_component=gr.Textbox,
out_component_kwargs=None,
) -> NodeRef:
key = name or _slug(label)
key = _uniq(key, self._used_names)
# ---- inputs ----
if inputs is None:
sig = inspect.signature(fn)
inferred_inputs = [
p.name
for p in sig.parameters.values()
if p.kind in (
inspect.Parameter.POSITIONAL_ONLY,
inspect.Parameter.POSITIONAL_OR_KEYWORD,
)
]
inputs = inferred_inputs
self._warn(
f'Node "{label}": inputs inferred from function signature {inputs}. '
f'Declare inputs=[...] for clarity.'
)
# ---- outputs ----
outputs_open = outputs is None
if outputs is None:
outputs = ["out"] # default, refined at runtime if tuple
self._warn(
f'Node "{label}": outputs not declared. '
f'Will infer from function return at runtime.'
)
self._nodes[key] = _NodeSpec(
name=key,
label=label,
fn=fn,
inputs=list(inputs),
outputs=list(outputs),
outputs_open=outputs_open,
out_component=out_component,
out_component_kwargs=dict(out_component_kwargs or {}),
)
return NodeRef(self, key)
def connect(self, src: PortRef, dst: PortRef) -> None:
if dst.kind != "node_in":
raise TypeError(f"Destination must be a node input port, got {dst.kind}")
child_key = dst.owner_key
child_port = dst.port
child_spec = self._nodes.get(child_key)
if not child_spec:
raise ValueError(f"Unknown child node '{child_key}'")
if child_port not in child_spec.inputs:
raise ValueError(
f"Unknown input port '{child_port}' on node '{child_key}'. "
f"Known inputs: {child_spec.inputs}"
)
# prevent double-source on same dst port (wire)
if child_port in child_spec.wires:
raise ValueError(f"Port {child_key}.{child_port} already wired from a node output")
# prevent double-source on same dst port (control)
for inp in self._inputs.values():
if inp.feeds == (child_key, child_port):
raise ValueError(f"Port {child_key}.{child_port} already fed by an Input[...] control")
if src.kind == "control":
inp = self._inputs.get(src.owner_key)
if not inp:
raise ValueError(f"Unknown input '{src.owner_key}'")
# optional but recommended: forbid one input feeding multiple ports
if inp.feeds is not None and inp.feeds != (child_key, child_port):
raise ValueError(f"Input[{src.owner_key}] already feeds {inp.feeds}")
inp.feeds = (child_key, child_port)
return
if src.kind == "node_out":
parent_key = src.owner_key
parent_port = src.port
parent_spec = self._nodes.get(parent_key)
if not parent_spec:
raise ValueError(f"Unknown parent node '{parent_key}'")
if parent_port not in parent_spec.outputs:
# ✅ support outputs_open fallback out1/out2...
if getattr(parent_spec, "outputs_open", False) and re.fullmatch(r"out[1-9]\d*", parent_port):
if parent_spec.outputs == ["out"]:
parent_spec.outputs.clear()
parent_spec.outputs.append(parent_port)
parent_spec.outputs.sort(key=lambda s: int(s[3:]))
else:
raise ValueError(
f"Unknown output port '{parent_port}' on node '{parent_key}'. "
f"Known outputs: {parent_spec.outputs}"
)
child_spec.wires[child_port] = (parent_key, parent_port)
return
raise TypeError(f"Unsupported src kind: {src.kind}")
def columns(self, *cols: List[Any]) -> "Pipeline":
"""
Optional but recommended:
p.columns([audio, lang], [transcribe], [cleanup], [summarize])
If you never call columns(), layout defaults to:
- all inputs in col0 (in declaration order)
- all nodes in following columns (in declaration order)
"""
out: List[List[Tuple[str, str]]] = []
for col in cols:
c: List[Tuple[str, str]] = []
for item in col:
if isinstance(item, InputRef):
c.append(("input", item.key))
elif isinstance(item, NodeRef):
c.append(("node", item.key))
else:
raise TypeError(f"columns() expects InputRef/NodeRef, got {type(item)}")
out.append(c)
self._layout_cols = out
return self
# ---- spec generation ----
def _infer_node_inputs_from_wires(self) -> None:
"""
Fill node.inputs if empty:
- any port fed by an input or a parent wire becomes a declared input port
This keeps DSL minimal: you don't have to list inputs=["audio","language"] manually.
"""
for nk, ns in self._nodes.items():
if ns.inputs:
continue
ports = set()
# from Input feeds
for inp in self._inputs.values():
if inp.feeds and inp.feeds[0] == nk:
ports.add(inp.feeds[1])
# from parent wires into this node
for child_port in ns.wires.keys():
ports.add(child_port)
ns.inputs = sorted(ports)
def _default_layout(self) -> List[List[Tuple[str, str]]]:
# col0: inputs, col1..: nodes
col0 = [("input", k) for k in self._inputs.keys()]
cols = [col0]
for nk in self._nodes.keys():
cols.append([("node", nk)])
return cols
def to_spec(self) -> Dict[str, Any]:
# sanity: every input must have feeds
for k, inp in self._inputs.items():
if not inp.feeds:
raise ValueError(f"Input '{inp.label}' ({k}) missing .feeds(node, as_=...) wiring.")
# infer node inputs if not given
self._infer_node_inputs_from_wires()
# sanity: forbid duplicate feeds to same node port (double-source via inputs)
seen_target_ports = set()
for inp in self._inputs.values():
tgt = inp.feeds
if tgt in seen_target_ports:
raise ValueError(f"Two inputs feed the same port {tgt}.")
seen_target_ports.add(tgt)
layout_cols = self._layout_cols or self._default_layout()
# build Option-A compatible PIPELINE_SPEC["layout"]
layout: List[List[Dict[str, Any]]] = []
for col in layout_cols:
out_col: List[Dict[str, Any]] = []
for kind, key in col:
if kind == "input":
inp = self._inputs[key]
out_col.append(
{
"kind": "input",
"name": inp.name, # stable short id
"default": inp.default,
"badge_position": inp.badge_position,
"component": inp.component,
"component_kwargs": inp.component_kwargs,
"feeds": inp.feeds, # (node, port)
# if you want to keep label in UI later, keep it:
"label": inp.label,
}
)
else:
nd = self._nodes[key]
out_col.append(
{
"kind": "node",
"name": nd.name, # stable short id
"fn": nd.fn,
"inputs": nd.inputs,
"outputs": nd.outputs,
"wires": nd.wires, # child_port -> (parent, out_port)
"out_component": nd.out_component,
"out_component_kwargs": nd.out_component_kwargs,
"label": nd.label,
}
)
layout.append(out_col)
for nd in self._nodes.values():
if nd.outputs_open and nd.outputs == ["out"]:
self._warn(
f'Node "{nd.label}" has open outputs but none were wired '
f"(only implicit 'out'). Snapshot may contain out1/out2 "
f"but graph exposes no outputs."
)
return {"layout": layout}
# =============================================================================
# PIPELINE PRESETS — EDIT HERE
# =============================================================================
#
# Define one or more pipeline factories.
# Each factory returns a PIPELINE_SPEC (dict) compatible with build_from_spec().
#
# Switching pipelines will rebuild Graph/Controls/FrontendState and re-render UI
# without restarting the server.
# =============================================================================
def make_pipeline_base_deabc() -> dict:
# ---- Functions (pure, testable) ----
def fn_D(d):
time.sleep(0.05)
return d
def fn_E(e):
time.sleep(0.05)
return e
def fn_A(d, e):
time.sleep(0.05)
# combine in a deterministic way for debugging
return {"d": d, "e": e}
def fn_B(a, b):
time.sleep(0.05)
return {"a": a, "b": b, "mix": f"{a} + {b}"}
def fn_C(x):
time.sleep(0.05)
# final “visible” result
return x
# ---- Pipeline DSL ----
p = Pipeline()
# Inputs (versioned controls)
in_d = p.input("Input d", name="d", default="d0", component=gr.Textbox)
in_e = p.input("Input e", name="e", default="e0", component=gr.Textbox)
in_b = p.input("Input b", name="b", default="b0", component=gr.Textbox)
# Nodes
D = p.node("D", name="D", fn=fn_D, inputs=["d"], outputs=["out"], out_component=gr.Textbox)
E = p.node("E", name="E", fn=fn_E, inputs=["e"], outputs=["out"], out_component=gr.Textbox)
A = p.node(
"A",
name="A",
fn=fn_A,
inputs=["d", "e"],
outputs=["out"],
out_component=gr.Textbox, # nice for dict output
)
B = p.node(
"B",
name="B",
fn=fn_B,
inputs=["a", "b"],
outputs=["out"],
out_component=gr.Textbox,
)
C = p.node(
"C",
name="C",
fn=fn_C,
inputs=["x"],
outputs=["out"],
out_component=gr.Textbox,
)
# Wiring (matches the classic map)
in_d.feeds(D, as_="d")
in_e.feeds(E, as_="e")
in_b.feeds(B, as_="b")
D.produces("out").feeds(A, as_="d")
E.produces("out").feeds(A, as_="e")
A.produces("out").feeds(B, as_="a")
B.produces("out").feeds(C, as_="x")
# Layout (inputs column + then D/E + then A + then B + then C)
p.columns(
[in_d, in_e],
[D, E],
[A, in_b],
[B],
[C],
)
return p.to_spec()
def make_pipeline_audio_simple() -> dict:
# ---- Functions (pure, testable) ----
def audio_passthrough(audio):
# audio = (sample_rate, np.array) ou chemin selon gradio config
time.sleep(0.2)
return audio
# ---- Pipeline DSL ----
p = Pipeline()
# -------------------------
# Input audio
# -------------------------
audio_in = p.input(
"Audio source",
component=gr.Audio,
component_kwargs={
"sources": ["upload", "microphone"],
"type": "filepath", # important pour les tests
},
)
# -------------------------
# Nodes
# -------------------------
A = p.node(
"Audio A (identity)",
fn=audio_passthrough,
inputs=["audio"],
outputs=["out"],
out_component=gr.Audio,
out_component_kwargs={
"type": "filepath",
"label": "Audio A output",
},
)
B = p.node(
"Audio B (identity)",
fn=audio_passthrough,
inputs=["audio"],
outputs=["out"],
out_component=gr.Audio,
out_component_kwargs={
"type": "filepath",
"label": "Audio B output",
},
)
# -------------------------
# Wiring
# -------------------------
audio_in.feeds(A, as_="audio")
A.produces("out").feeds(B, as_="audio")
# -------------------------
# Layout
# -------------------------
p.columns(
[audio_in],
[A],
[B],
)
return p.to_spec()
PIPELINE_PRESETS: dict[str, callable] = {
"Audio simple": make_pipeline_audio_simple,
"Base D→E→A→B→C": make_pipeline_base_deabc,
}
DEFAULT_PIPELINE_PRESET = "Base D→E→A→B→C"
CUSTOM_TEMPLATE = """# Allowed imports: datetime, hashlib, json, math, random, re, time
import time
# ============================================================
# 1) FUNCTIONS (pure, testable)
# - define your compute functions here
# ============================================================
def my_fn_example(x):
# TODO: implement
return x
# ============================================================
# 2) PIPELINE DSL (declare inputs + nodes)
# - you MUST create: p = Pipeline()
# ============================================================
def make_pipeline() -> dict:
p = Pipeline()
# -------------------------
# Inputs (controls)
# -------------------------
# TODO: declare at least one input
in_x = p.input(
"Input x",
name="x",
default="x0",
component=gr.Textbox,
)
# -------------------------
# Nodes
# -------------------------
# TODO: declare at least one node
A = p.node(
"A",
name="A",
fn=my_fn_example,
inputs=["x"],
outputs=["out"],
out_component=gr.Textbox,
)
B = p.node(
"B",
name="B",
fn=my_fn_example,
inputs=["x"],
outputs=["out"],
out_component=gr.Textbox,
)
# ============================================================
# 3) WIRING (REQUIRED)
# - at least one input -> node feed
# - show example node -> node wiring
# ============================================================
# Input -> Node (required example)
in_x.feeds(A, as_="x")
# Node -> Node (required example)
A.produces("out").feeds(B, as_="x")
# ============================================================
# 4) LAYOUT (RECOMMENDED)
# - define UI columns order
# ============================================================
p.columns(
[in_x],
[A],
[B],
)
return p.to_spec()
"""
# =============================================================================
# UI — Rendering & event wiring (Gradio)
# =============================================================================
def normalize_pipeline_layout(layout):
out = []
for col in layout:
new_col = []
for item in col:
item = dict(item)
kind = item.get("kind")
if kind == "input":
item["component"] = item.get("component") or gr.Textbox
item["component_kwargs"] = item.get("component_kwargs") or {}
item["default"] = item.get("default", None)
comp = item["component"]
if comp is gr.Audio and item["default"] in ("", "/app"):
item["default"] = None
elif item["default"] is None and comp is gr.Textbox:
item["default"] = ""
item["badge_position"] = item.get("badge_position") or "below"
# Option A requires explicit feeds
if "feeds" not in item:
raise ValueError(f"Input '{item.get('name')}' missing 'feeds': ('Node','port')")
elif kind == "node":
item["out_component"] = item.get("out_component") or gr.Textbox
item["out_component_kwargs"] = item.get("out_component_kwargs") or {}
# Option A requires explicit node signature (fn + ports)
if "fn" not in item:
raise ValueError(f"Node '{item.get('name')}' missing 'fn'")
item["inputs"] = list(item.get("inputs") or [])
item["outputs"] = list(item.get("outputs") or ["out"])
# wires optional (may be pure-control nodes)
item["wires"] = dict(item.get("wires") or {})
else:
raise ValueError(f"Unknown kind in pipeline layout: {kind}")
new_col.append(item)
out.append(new_col)
return out
def normalize_input_value_for_component(comp, value):
if comp in FILE_COMPONENTS:
if value in ("", "/app"):
return None
return value
def build_from_spec(spec: dict) -> tuple[Graph, Controls, FrontendState, list[list[dict]]]:
layout = normalize_pipeline_layout(spec["layout"])
# 1) collect nodes + inputs
node_items = [it for col in layout for it in col if it["kind"] == "node"]
if DISABLE_MULTI_OUTPUT:
for it in node_items:
outs = list(it.get("outputs") or ["out"])
if len(outs) > 1:
raise ValueError(
f"Multi-output disabled: node '{it['name']}' declares outputs {outs}"
)
input_items = [it for col in layout for it in col if it["kind"] == "input"]
node_names = [it["name"] for it in node_items]
input_names = [it["name"] for it in input_items]
# 2) build Graph nodes
g = Graph()
for it in node_items:
g.add_node(
FnNode(
it["name"],
fn=it["fn"],
input_ports=list(it.get("inputs") or []),
output_ports=list(it.get("outputs") or ["out"]),
)
)
# 3) build explicit edges from node.wires
# wires: child_port -> (parent_node, parent_port)
fanout_seen: set[tuple[str, str]] = set() # (parent_node, parent_port)
for it in node_items:
child = it["name"]
for child_port, (parent, parent_port) in it.get("wires", {}).items():
if DISABLE_FANOUT:
key = (parent, parent_port)
if key in fanout_seen:
raise ValueError(
f"Fan-out disabled: '{parent}.{parent_port}' would feed multiple children "
f"(already used; now also feeding '{child}.{child_port}')."
)
fanout_seen.add(key)
if parent not in g.nodes:
raise ValueError(f"Wire error: {child}.{child_port} references unknown parent node '{parent}'")
if child_port not in g.nodes[child].input_ports:
raise ValueError(f"Wire error: {child}.{child_port} not in declared inputs {g.nodes[child].input_ports}")
if parent_port not in g.nodes[parent].output_ports:
raise ValueError(f"Wire error: {parent}.{parent_port} not in declared outputs {g.nodes[parent].output_ports}")
g.connect(parent, parent_port, child, child_port)
# 4) build controls + control_ports from input.feeds
# feeds: ("Node","port")
control_ports: dict[str, tuple[str, str]] = {}
for it in input_items:
cname = it["name"]
node, port = it["feeds"]
if node not in g.nodes:
raise ValueError(f"feeds error: Input[{cname}] targets unknown node '{node}'")
if port not in g.nodes[node].input_ports:
raise ValueError(f"feeds error: Input[{cname}] targets {node}.{port} but node inputs are {g.nodes[node].input_ports}")
control_ports[cname] = (node, port)
controls = Controls(names=input_names)
s = FrontendState(g, controls, control_ports)
# ✅ Commit defaults immediately so the lab is "ready" on first paint
# This avoids needing a manual Reset to create the first control versions.
for it in input_items:
cname = it["name"]
comp = it.get("component") or gr.Textbox
default = it.get("default", None)
default = normalize_input_value_for_component(comp, default)
controls.set_ui_value(cname, default)
# 5) sanity: every node input port must be satisfied by either edge or control
for n, node in g.nodes.items():
for p in node.input_ports:
is_wired = (n, p) in g.rev
is_controlled = any((nn == n and pp == p) for (nn, pp) in control_ports.values())
if not (is_wired or is_controlled):
raise ValueError(f"Unresolved input port: {n}.{p} has no wire and no control feed")
# forbid double-source (wire + control) on the same port
if is_wired and is_controlled:
raise ValueError(f"Double-source: {n}.{p} has BOTH a wire and a control feed")
return g, controls, s, layout
# =============================================================================
# ACTIVE PIPELINE (runtime-rebuildable)
# =============================================================================
# These globals are assigned by set_pipeline_from_spec() when a preset is chosen.
G: Graph
CTRL: Controls
S: FrontendState
PIPELINE_LAYOUT: list[list[dict]]
INPUT_COMPONENTS: dict[str, Any] = {}
INPUT_LABELS: dict[str, str] = {}
NODE_LABELS: dict[str, str] = {}
INPUT_ORDER: list[str] = []
NODE_ORDER_UI: list[str] = []
CONTROL_WIRES: list[str] = []
EDGE_ORDER: list[tuple[str, str]] = []
NODE_ORDER: list[str] = []
def ui_input(name: str) -> str:
return INPUT_LABELS.get(name, name)
def ui_node(name: str) -> str:
return NODE_LABELS.get(name, name)
def set_pipeline_from_spec(spec: dict) -> None:
"""
Build a new Graph/Controls/State/Layout from spec, and recompute all derived UI orders.
Overwrites the module-level globals used by the UI and handlers.
"""
global G, CTRL, S, PIPELINE_LAYOUT
global INPUT_COMPONENTS, INPUT_LABELS, NODE_LABELS
global INPUT_ORDER, NODE_ORDER_UI, CONTROL_WIRES, EDGE_ORDER, NODE_ORDER
G, CTRL, S, PIPELINE_LAYOUT = build_from_spec(spec)
# Map: input_name -> component class (gr.Textbox, gr.Audio, ...)
INPUT_COMPONENTS = {}
for col in PIPELINE_LAYOUT:
for it in col:
if it["kind"] == "input":
INPUT_COMPONENTS[it["name"]] = it.get("component") or gr.Textbox
# Human labels (fallback to internal name)
INPUT_LABELS = {}
NODE_LABELS = {}
for col in PIPELINE_LAYOUT:
for it in col:
if it["kind"] == "input":
INPUT_LABELS[it["name"]] = it.get("label") or it["name"]
elif it["kind"] == "node":
NODE_LABELS[it["name"]] = it.get("label") or it["name"]
# Derived graph/UI orders (single source of truth)
INPUT_ORDER = [it["name"] for col in PIPELINE_LAYOUT for it in col if it["kind"] == "input"]
NODE_ORDER_UI = [it["name"] for col in PIPELINE_LAYOUT for it in col if it["kind"] == "node"]
def derive_control_wires(input_order: list[str], control_ports: dict[str, tuple[str, str]]) -> list[str]:
ui = [c for c in input_order if c in control_ports]
extra = [c for c in control_ports.keys() if c not in ui]
return ui + extra
def derive_edge_order() -> list[tuple[str, str]]:
"""
Unique parent->child edges derived from graph wiring.
Ordered to be stable and UI-friendly:
- primary key: index of parent in NODE_ORDER_UI (fallback large)
- secondary: index of child in NODE_ORDER_UI (fallback large)
- then lexical as final tie-break
"""
edges_set = set()
for (dst_node, _dst_port), (src_node, _src_port) in S.g.rev.items():
edges_set.add((src_node, dst_node))
edges = list(edges_set)
pos = {n: i for i, n in enumerate(NODE_ORDER_UI)}
big = 10**9
edges.sort(key=lambda e: (pos.get(e[0], big), pos.get(e[1], big), e[0], e[1]))
return edges
def derive_node_order_for_summary() -> list[str]:
"""
Order used by render_global() for summary. Prefer UI order,
then add any graph nodes not present in the UI layout (stable).
"""
ui = [n for n in NODE_ORDER_UI if n in S.g.nodes]
extra = [n for n in S.g.nodes.keys() if n not in ui]
return ui + extra
CONTROL_WIRES = derive_control_wires(INPUT_ORDER, S.control_ports)
EDGE_ORDER = derive_edge_order()
NODE_ORDER = derive_node_order_for_summary()
# =============================================================================
# UI rendering helpers (compute vs render)
# =============================================================================
def node_out(node: str, idx: int) -> Any:
if idx is None or idx < 0:
return None
if node not in S.history:
return None
if idx >= len(S.history[node]):
return None
snap = S.history[node][idx]
if snap.status == "error" or not snap.outputs:
return None
outs = snap.outputs
if len(outs) == 1 and "out" in outs:
return outs["out"]
return json.dumps(outs, ensure_ascii=False, indent=2)
def node_out_rendered(node: str) -> Any:
"""
Value meant for the "rendered output" component.
Rules:
- if selected snapshot is error:
- return error message (string) → only meaningful for Textbox
- if ok and has outputs["out"]:
- return outputs["out"]
- else:
- None
"""
idx = S.selected_index.get(node, -1)
if idx is None or idx < 0:
return None
if node not in S.history:
return None
if idx >= len(S.history[node]):
return None
snap = S.history[node][idx]
if snap.status == "error":
return None # au lieu d'une string
if not snap.outputs:
return None
if "out" in snap.outputs:
return snap.outputs["out"]
return None
def control_val(name: str, idx: int) -> Any:
if idx is None or idx < 0:
return None
if idx >= len(CTRL.history.get(name, [])):
return None
return CTRL.history[name][idx]
def input_wire_badge(control_name: str) -> str:
color = S.control_wire_color(control_name)
node, _port = S.control_ports[control_name]
return (
"<div style='display:flex;align-items:center;justify-content:flex-end;gap:6px;'>"
f"<span style='font-size:12px;color:#6b7280;'>feeds {ui_node(node)}</span>"
f"{dot_html(color)}"
"</div>"
)
def node_io_badges(node: str) -> str:
badge_mode = BADGE_LABEL_MODE
# ---- Inputs: show either port names or node/input labels ----
in_lines: List[str] = []
for port, src, color in S.input_port_colors(node):
if badge_mode == "ports":
src_txt = port # ✅ port métier côté entrée du node courant
else:
if src.startswith("Input["):
cname = src[6:-1]
src_txt = ui_input(cname)
elif src == "unwired":
src_txt = "unwired"
else:
src_txt = ui_node(src)
in_lines.append(
"<div style='display:block;line-height:1.35;margin:2px 0;'>"
f"{dot_html(color)}"
f"<span style='color:#6b7280;font-size:12px;margin-left:6px;'>{src_txt}</span>"
"</div>"
)
if not in_lines:
in_lines = [
"<div style='display:block;line-height:1.35;margin:2px 0;'>"
f"{dot_html('gray')}"
"<span style='color:#6b7280;font-size:12px;margin-left:6px;'>no inputs</span>"
"</div>"
]
# ---- Outputs: show either output port names or destination node label ----
out_lines: List[str] = []
for src_port, dst_label, color in S.output_edge_colors(node):
if badge_mode == "ports":
dst_txt = src_port # ✅ port métier côté sortie du node courant
else:
dst_node = dst_label.split(".", 1)[0]
dst_txt = ui_node(dst_node)
out_lines.append(
"<div style='display:block;line-height:1.35;margin:2px 0;text-align:right;'>"
f"<span style='color:#6b7280;font-size:12px;margin-right:6px;'>{dst_txt}</span>"
f"{dot_html(color)}"
"</div>"
)
if not out_lines:
out_lines = [
"<div style='display:block;line-height:1.35;margin:2px 0;text-align:right;'>"
"<span style='color:#6b7280;font-size:12px;margin-right:6px;'>no outputs</span>"
f"{dot_html('gray')}"
"</div>"
]
return (
"<div style='display:flex;justify-content:space-between;gap:14px;'>"
f"<div style='min-width:0;'>{''.join(in_lines)}</div>"
f"<div style='min-width:0;'>{''.join(out_lines)}</div>"
"</div>"
)
def choices_for(node: str) -> List[str]:
return S.history_list(node) or ["(empty)"]
def selected_choice_value(node: str) -> str:
choices = choices_for(node)
if choices == ["(empty)"]:
return "(empty)"
idx = S.selected_index.get(node, -1)
if idx < 0 or idx >= len(choices):
return choices[-1]
return choices[idx]
def nav_button_updates(node: str) -> Tuple[dict, dict]:
"""
Returns (left_update, right_update) for ◀︎ / ▶︎ buttons.
- Disable when no selection, empty history, or at boundaries.
"""
n = len(S.history.get(node, []))
cur = S.selected_index.get(node, -1)
if n <= 1 or cur < 0:
return gr.update(interactive=False), gr.update(interactive=False)
left_ok = cur > 0
right_ok = cur < (n - 1)
return gr.update(interactive=left_ok), gr.update(interactive=right_ok)
def run_need_state(node: str) -> str:
"""
Compute "CTA state" for run buttons:
- force: needs a strong "▶︎ Run" action
- stale: suggest rerun (↻)
- fresh: normal run (✓)
"""
if S.node_is_in_error_ui(node):
return "force"
if S.never_ran(node):
return "force"
# structural mismatch or missing => force
for parent in S.g.parents_of(node):
col = S.wire_color(parent, node)
if col in ("gray", "blue"):
return "force"
# direct input changed => force
for cname, (target_node, _port) in S.control_ports.items():
if target_node == node and S.control_wire_color(cname) == "orange":
return "force"
# upstream stale => stale
for parent in S.g.parents_of(node):
if S.wire_color(parent, node) == "orange":
return "stale"
return "fresh"
def run_button_updates(node: str) -> Tuple[dict, dict, dict]:
state = run_need_state(node)
return (
gr.update(visible=(state == "fresh")), # ✓ Run
gr.update(visible=(state == "stale")), # ↻ Run
gr.update(visible=(state == "force")), # ▶︎ Run
)
def compute_node_status_line(node: str) -> str:
sel = S.selected_index.get(node, None)
gf = S.globally_fresh(node)
never = S.never_ran(node)
extra = ""
if node in S.last_run_meta:
extra = f" | ⚠ computed via {S.last_run_meta[node]}"
status = (
f"{node} | selected={sel} | "
f"never_ran={'✅' if never else '❌'} | "
f"globally_fresh={'✅' if gf else '❌'}"
+ extra
)
if S.node_is_in_error_ui(node):
status += " | ❌ NODE IN ERROR"
else:
snap_status = S.selected_snapshot_status(node)
if snap_status == "error":
status += " | ❌ selected snapshot = error"
if S.last_run_error_relevant(node):
status += " | ❌ FAILED IN LAST RUN"
blocked = any(S.edge_is_blocked_ui(p, node) for p in S.g.parents_of(node))
if blocked:
status += " | ⛔ BLOCKED (parent failed in last run)"
return status
def render_node_block(node: str):
"""
Produces all UI pieces for a node card.
Returns:
status_line, io_html, out_view_val, out_debug_txt, hist_update, left_update, right_update,
run_fresh_upd, run_stale_upd, run_fix_upd
"""
status = compute_node_status_line(node)
io_md = node_io_badges(node)
out_view_val = node_out_rendered(node) # ✅ NEW
out_debug_txt = S.current_output_text(node) # ✅ debug JSON, inchangé
hist = gr.update(choices=choices_for(node), value=selected_choice_value(node))
left_upd, right_upd = nav_button_updates(node)
rf, rs, rx = run_button_updates(node)
return status, io_md, out_view_val, out_debug_txt, hist, left_upd, right_upd, rf, rs, rx
def render_edges_text() -> str:
lines: List[str] = []
if WIRE_COLOR_MODE == "daggr":
lines.append("Daggr-like")
lines.append("Legend: 🟠 ok | ⚪ not ok (details hidden)")
else:
lines.append("Rich-mode")
lines.append(
"Legend: 🔵 blue=never ran | 🔴 red=failed | 🟣 purple=blocked | ⚪ gray=mismatch | 🟠 stale | 🟢 fresh"
)
lines.append("")
# Input wires
lines.append("== Input wires (Input[x] → target node) ==")
for c in CONTROL_WIRES:
node, _port = S.control_ports[c]
color = S.control_wire_color(c)
cur_idx = CTRL.selected.get(c, -1)
cur_val = CTRL.values.get(c)
used_idx = None
used_val = None
nidx = S.selected_index.get(node, -1)
if 0 <= nidx < len(S.history[node]):
snap = S.history[node][nidx]
used_idx = snap.controls_selected.get(c, None)
if used_idx is not None:
used_val = control_val(c, used_idx)
if color == "blue":
extra = " | why=target node not run / no snapshot selected"
elif color == "red":
extra = " | why=target node failed (selected snapshot error OR last run failed on this node)"
elif color == "gray":
if used_idx is None:
extra = " | why=target snapshot doesn't reference this Input yet"
else:
extra = (
f" | why=unknown gray state: used#{used_idx}={pretty_val(used_val)}"
f" / now#{cur_idx}={pretty_val(cur_val)}"
)
elif color == "orange":
extra = (
f" | why=stale: used#{used_idx}={pretty_val(used_val)}"
f" → now#{cur_idx}={pretty_val(cur_val)}"
)
else:
extra = f" | ok=used#{used_idx}={pretty_val(used_val)} == now#{cur_idx}={pretty_val(cur_val)}"
lines.append(f"{emoji_wire(color)} Input[{c}] → {node} | wire={color}{extra}")
lines.append("")
lines.append("== Graph edges (parent → child) ==")
for parent, child in EDGE_ORDER:
color = S.wire_color(parent, child)
aligned = S.aligned_edge(parent, child)
cidx = S.selected_index.get(child, -1)
pidx = S.selected_index.get(parent, -1)
# If child has no selected snapshot, explain (but keep blocked purple)
if not (0 <= cidx < len(S.history[child])):
if S.edge_is_blocked_ui(parent, child):
color = "purple"
lines.append(
f"{emoji_wire(color)} {parent}{child} | aligned=—"
f" | wire={color} | why=blocked: parent failed in last run; child not executed"
)
else:
color = "blue"
lines.append(
f"{emoji_wire(color)} {parent}{child} | aligned=—"
f" | wire={color} | why=child has no selected snapshot"
)
continue
snap_c = S.history[child][cidx]
want_p = snap_c.upstream_selected.get(parent, None)
if color == "red":
extra = " | why=child node failed (incoming wires red)"
elif color == "gray":
want_val = node_out(parent, want_p) if want_p is not None else None
have_val = node_out(parent, pidx)
extra = (
f" | why=parent mismatch: {parent} want#{want_p}={pretty_val(want_val)}"
f" → have#{pidx}={pretty_val(have_val)}"
)
elif color == "orange":
diffs = []
for cc, used_idx in snap_c.controls_selected.items():
cur_cc = CTRL.selected.get(cc, -1)
if cur_cc != used_idx:
uval = control_val(cc, used_idx)
cval = control_val(cc, cur_cc)
diffs.append(
f"Input[{cc}] used#{used_idx}={pretty_val(uval)} → now#{cur_cc}={pretty_val(cval)}"
)
if diffs:
extra = " | why=stale controls: " + "; ".join(diffs[:2]) + ("; …" if len(diffs) > 2 else "")
else:
extra = " | why=stale controls (some in-scope Input[x] changed)"
elif color == "blue":
extra = " | why=child not run yet"
elif color == "purple":
extra = " | why=blocked: parent failed in last run; child not executed; no further propagation"
else:
extra = f" | ok=child uses parent#{want_p} (current parent#{pidx})"
lines.append(
f"{emoji_wire(color)} {parent}{child} | aligned={'✅' if aligned else '❌'}"
f" | wire={color}{extra}"
)
return "\n".join(lines)
def render_global(msg: str) -> Tuple[str, str, str, str]:
lines: List[str] = []
if WIRE_COLOR_MODE == "daggr":
lines.append("Daggr-like")
lines.append("Legend: 🟠 ok | ⚪ not ok (details hidden)")
else:
lines.append("Rich-mode")
lines.append(
"Legend: 🔵 blue=never ran | 🔴 red=failed | 🟣 purple=blocked | ⚪ gray=mismatch | 🟠 stale | 🟢 fresh"
)
lines.append("")
if getattr(S, "last_run_note", None):
lines.append("")
lines.append(S.last_run_note)
if getattr(S, "last_run_failed", False) and getattr(S, "last_run_error", None):
err = S.last_run_error
lines.append("")
lines.append(f"❌ LAST RUN FAILED: node={err['node']} | phase={err['phase']} | error={err['error']}")
for n in NODE_ORDER:
sel = S.selected_index.get(n, None)
gf = S.globally_fresh(n)
out = S.current_output_text(n)
lines.append(f"- {n}: selected={sel} | globally_fresh={'✅' if gf else '❌'} | out={out}")
return msg, "\n".join(lines), render_edges_text(), S.debug_state()
# Rendering is intentionally centralized:
# render_all() recomputes the full UI from state
# This avoids partial updates and UI desync bugs.
def render_all(msg: str = "", input_overrides: dict[str, Any] | None = None) -> Tuple[Any, ...]:
input_overrides = input_overrides or {}
glob = render_global(msg)
out: List[Any] = []
for cname in INPUT_ORDER:
out.append(input_wire_badge(cname))
if cname in input_overrides:
out.append(gr.update(value=input_overrides[cname]))
else:
out.append(gr.update()) # default: don't touch input values
out.extend([glob[0], glob[1], glob[2], glob[3]])
for node in NODE_ORDER_UI:
status, io_md, out_view_val, out_debug_txt, hist, left_upd, right_upd, rf, rs, rx = render_node_block(node)
out.extend([status, io_md, out_view_val, out_debug_txt, hist, left_upd, right_upd, rf, rs, rx])
return tuple(out)
# =============================================================================
# UI event handlers (DRY)
# =============================================================================
def ui_reset(*vals: Any):
CTRL.reset()
S.reset()
for cname, v in zip(INPUT_ORDER, vals):
comp = INPUT_COMPONENTS.get(cname, gr.Textbox)
v = normalize_input_value_for_component(comp, v)
CTRL.set_ui_value(cname, v)
return render_all("Reset state")
def _commit_all_textboxes(*vals: Any) -> None:
for cname, v in zip(INPUT_ORDER, vals):
comp = INPUT_COMPONENTS.get(cname, gr.Textbox)
v = normalize_input_value_for_component(comp, v)
CTRL.set_ui_value(cname, v)
def ui_set_input(control_name: str, value: Any):
comp = INPUT_COMPONENTS.get(control_name, gr.Textbox)
value = normalize_input_value_for_component(comp, value)
CTRL.set_ui_value(control_name, value)
S.clear_transient_run_state()
return render_all(f"Applied input {control_name}")
def ui_set_reuse_identical(flag: bool):
CTRL.reuse_identical_values = bool(flag)
return render_all(f"Reuse identical input values = {CTRL.reuse_identical_values}")
def ui_set_badge_label_mode(mode: str):
global BADGE_LABEL_MODE
BADGE_LABEL_MODE = mode or "nodes"
# pas besoin de clear run state : c'est purement de l'affichage
return render_all(f"Badge labels = {BADGE_LABEL_MODE}")
def ui_run(
node: str,
auto_upstream: bool,
force_upstream: bool,
force_downstream: bool,
force_topo: bool,
on_error: str,
skip_if_unchanged: bool,
*vals: str,
):
try:
_commit_all_textboxes(*vals)
# Reset transient state for a new run
S.clear_transient_run_state()
# -------------------------------------------------
# 🔒 HARD NO-OP MODE (before starting run)
# -------------------------------------------------
if skip_if_unchanged and not (force_upstream or force_downstream):
if (
not S.node_is_in_error_ui(node)
and S.would_run_be_noop(node)
):
yield render_all(
f"⏭️ Skipped {node}: no upstream/control change (true no-op)"
)
return
# -------------------------------------------------
run_id = S.start_run()
meta = []
if force_upstream:
meta.append("force-upstream")
if force_downstream:
meta.append("force-downstream")
tag = ", ".join(meta) if meta else ""
def _yield_step(txt: str):
return render_all(txt)
# First paint
yield _yield_step(f"Starting run {node}… (run_id={run_id})")
def _run_and_yield(n: str, phase: str):
S._ensure_required_controls_committed(n)
# ✅ no-op run (ne skip pas si force_upstream/force_downstream)
if skip_if_unchanged and not (force_upstream or force_downstream):
if not S.node_is_in_error_ui(n) and S.would_run_be_noop(n):
yield _yield_step(f"⏭️ Skipped {n}: no upstream/control change (run_id={run_id})")
return
if tag:
S.last_run_meta[n] = tag
S.last_run_executed.add(n)
S.run_one(n, on_error=on_error, run_phase=phase)
# ✅ if we kept an error snapshot, stop the pipeline here (but snapshot is preserved)
if on_error == "snapshot" and S.is_error_selected(n):
raise RuntimeError(f"{n} failed (snapshot): {S.selected_snapshot(n).error}")
yield _yield_step(f"Ran {n} ({phase}) (run_id={run_id})")
def _safe_run_and_yield(n: str, phase: str) -> bool:
"""
Run node and stream UI. If node fails:
- set last_run_failed, capture fingerprint
- mark direct children as blocked
- stream stop message
"""
try:
yield from _run_and_yield(n, phase)
return True
except Exception as e:
S.last_run_failed = True
parents = S.g.parents_of(n)
S.last_run_error = {
"run_id": run_id,
"node": n,
"phase": phase,
"error": repr(e),
"on_error": on_error,
"root": node,
"force_upstream": bool(force_upstream),
"force_downstream": bool(force_downstream),
"auto_upstream": bool(auto_upstream),
"force_topo": bool(force_topo),
# fingerprint
"parents_sel": {p: S.selected_index.get(p, -1) for p in parents},
"ctrl_sel": {c: CTRL.selected.get(c, -1) for c in S._controls_in_scope(n)},
"root_sel": dict(S.selected_index), # debug
}
# all downstream nodes are blocked by the stop (not executed in this run)
S.last_run_blocked_nodes = set(S.g.downstream_of(n))
yield _yield_step(f"❌ Run stopped: {n} failed ({phase}) → {e} (run_id={run_id})")
return False
# -------------------------
# UPSTREAM
# -------------------------
if force_upstream:
order = S.g.topo_upstream_to(node) if force_topo else list(S.g.deps_of(node) | {node})
for n in order:
for p in S.g.parents_of(n):
if S.selected_index.get(p, -1) < 0:
S.run_with_upstream(p, on_error=on_error)
ok = yield from _safe_run_and_yield(n, "upstream(force)")
if not ok:
S.end_run()
return
else:
if auto_upstream:
order = S.g.topo_upstream_to(node)
for n in order:
if n == node:
continue
if S.selected_index.get(n, -1) < 0:
for p in S.g.parents_of(n):
if S.selected_index.get(p, -1) < 0:
S.run_with_upstream(p, on_error=on_error)
ok = yield from _safe_run_and_yield(n, "upstream(auto)")
if not ok:
S.end_run()
return
ok = yield from _safe_run_and_yield(node, "root")
if not ok:
S.end_run()
return
# -------------------------
# DOWNSTREAM (force)
# -------------------------
if force_downstream:
for n in S.g.topo_downstream_from(node):
for p in S.g.parents_of(n):
if S.selected_index.get(p, -1) < 0:
S.run_with_upstream(p, on_error=on_error)
ok = yield from _safe_run_and_yield(n, "downstream(force)")
if not ok:
S.end_run()
return
if tag:
S.last_run_note = "🔁 Pipeline recomputed (new version)"
yield _yield_step(f"Run complete for {node}{tag} (run_id={run_id})")
else:
yield _yield_step(f"Run complete for {node} (run_id={run_id})")
S.end_run()
except Exception as e:
# hard failure outside node runs
S.end_run()
yield render_all(f"ERROR running {node}: {e}")
def ui_run_node(node_name: str):
def _fn(auto_up, force_up, force_down, force_topo, on_error, skip_if_unchanged, *vals):
yield from ui_run(node_name, auto_up, force_up, force_down, force_topo, on_error, skip_if_unchanged, *vals)
return _fn
def compute_input_overrides_from_ctrl() -> dict[str, Any]:
overrides = {}
for c in INPUT_ORDER:
comp = INPUT_COMPONENTS.get(c, gr.Textbox)
v = CTRL.get_selected_value(c)
v = normalize_input_value_for_component(comp, v)
overrides[c] = v
return overrides
def ui_restore(node: str, idx_str: str, mode: str, reconcile_flag: bool):
try:
# Changing context => clear transient run state
S.clear_transient_run_state()
if not idx_str or idx_str == "(empty)":
return render_all(f"No history for {node}")
idx = parse_hist_choice_idx(idx_str)
if idx is None:
return render_all(f"Bad selection for {node}")
use_ctx = ("downstream_match" in mode)
if not use_ctx:
S.restore(node, idx, mode)
overrides = compute_input_overrides_from_ctrl()
return render_all(f"Restored {node} -> {idx} | mode={mode} (simple)", input_overrides=overrides)
ctx = S.restore_with_context(node, idx, mode, reconcile=bool(reconcile_flag))
S.commit_restore_context(ctx)
overrides = compute_input_overrides_from_ctrl()
return render_all(
f"Restored {node} -> {idx} | mode={mode} | reconcile={reconcile_flag} (ctx)",
input_overrides=overrides,
)
except Exception as e:
return render_all(f"ERROR restoring {node}: {e}")
def ui_nav_hist(node: str, direction: int, mode: str, reconcile_flag: bool):
"""
UI-only: navigate BASE history (full dropdown) by changing selected snapshot index.
direction: -1 (prev) or +1 (next)
"""
try:
S.clear_transient_run_state()
cur = S.selected_index.get(node, -1)
if cur < 0:
return render_all(f"{node}: no selected snapshot yet")
n = len(S.history.get(node, []))
if n <= 1:
return render_all(f"{node}: no navigation (history size={n})")
target = max(0, min(cur + direction, n - 1))
if target == cur:
return render_all(f"{node}: boundary reached ({cur})")
use_ctx = ("downstream_match" in mode)
if not use_ctx:
S.restore(node, target, mode)
else:
ctx = S.restore_with_context(node, target, mode, reconcile=bool(reconcile_flag))
S.commit_restore_context(ctx)
arrow = "→" if direction > 0 else "←"
overrides = compute_input_overrides_from_ctrl()
return render_all(f"{node}: nav {arrow} -> {target} | mode={mode}", input_overrides=overrides)
except Exception as e:
return render_all(f"ERROR nav {node}: {e}")
def values_equal(a: Any, b: Any) -> bool:
# identical object / None
if a is b:
return True
# try numpy-aware equality
try:
import numpy as np # type: ignore
if isinstance(a, np.ndarray) and isinstance(b, np.ndarray):
return np.array_equal(a, b)
except Exception:
pass
# handle tuples/lists that may contain ndarrays (e.g. (sr, np.array))
if isinstance(a, (tuple, list)) and isinstance(b, (tuple, list)) and len(a) == len(b):
return all(values_equal(x, y) for x, y in zip(a, b))
if isinstance(a, dict) and isinstance(b, dict) and a.keys() == b.keys():
return all(values_equal(a[k], b[k]) for k in a.keys())
# fallback
try:
return a == b
except Exception:
return False
def ui_set_wire_color_mode(mode: str):
global WIRE_COLOR_MODE
WIRE_COLOR_MODE = mode
return render_all(f"Wire color mode = {mode}")
# =============================================================================
# Gradio App UI
# =============================================================================
CSS = """
div#pipeline {
align-items: center;
column-gap: 32px;
padding: 32px;
}
div#pipeline .node-card-group {
margin: 40px 0;
align-items: end;
row-gap: 2px;
}
div#pipeline .card-port-status {
padding: 4px;
border-radius: 0;
width: 95%;
margin: auto;
background-color: #27272a;
}
.card-port-status div {
align-items: center;
}
.input-port-status{
text-align: right;
padding: 8px 20px;
}
.input-card-node{
row-gap: 2px;
margin: 40px 0;
}
/* placeholders if you want to vary badge styling */
.badge-pos-below {}
.badge-pos-above {}
.badge-pos-right { text-align: right; }
/*
Nav buttons sizing
*/
button.hist-nav-btn {
padding: 10px;
width: 40px !important;
min-width: 40px !important;
max-width: 40px !important;
padding: 22px !important;
font-size: 16px !important;
line-height: 1 !important;
border-radius: 0 !important;
}
.history-dropdown {
min-width: max-content!important;
}
.history-dropdown > .container > span {
display: none;
}
.node-status-content > .container > span {
display: none;
}
.history-group-row{
column-gap: 0;
}
.restore-selected-history-btn{
}
button.apply-input-btn{
width: auto;
max-width: 194px;
border-radius: 14px;
border-top-left-radius: 0;
border-top-right-radius: 0;
margin-left: 15px;
}
button.run-fresh{
border-radius: 0;
border-top-left-radius: 14px;
border-top-right-radius: 14px;
margin-right: 14px;
width: auto;
max-width: 304px;
background-color: #17a34a;
}
button.run-stale{
border-radius: 0;
border-top-left-radius: 14px;
border-top-right-radius: 14px;
margin-right: 14px;
width: auto;
max-width: 304px;
background-color: orange;
}
button.run-fix{
border-radius: 0;
border-top-left-radius: 14px;
border-top-right-radius: 14px;
margin-right: 14px;
width: auto;
max-width: 304px;
}
/* Optionnel: évite que le container s'étire */
.codebox {
height: 600px !important;
max-height: 600px !important;
}
div#pre-pipe-panel{
margin: 40px 0;
}
"""
# -----------------------------------------------------------------------------
# UI component factories (Gradio widgets)
# -----------------------------------------------------------------------------
def node_card(
key: str,
*,
out_component=None,
out_component_kwargs=None,
):
label = ui_node(key)
dom_io = f"io_{key}"
# composant pour le "rendered"
out_component_kwargs = dict(out_component_kwargs or {})
out_component_kwargs.setdefault("interactive", False)
# 🚫 le label de la node est souverain
out_component_kwargs.pop("label", None)
with gr.Column(elem_classes="node-card-group"):
run_fresh = gr.Button(
f"✓ Run {label}",
variant="primary",
elem_classes="run-fresh",
visible=True,
)
run_stale = gr.Button(
f"↻ Run {label}",
variant="primary",
elem_classes="run-stale",
visible=False,
)
run_fix = gr.Button(
f"▶︎ Run {label}",
variant="primary",
elem_classes="run-fix",
visible=False,
)
# ✅ NEW: output "rendered" (composant choisi)
out_view = out_component(
label=f"{label} output (rendered)",
min_width=100,
**out_component_kwargs,
)
# ✅ debug output (toujours textbox, JSON snapshot)
out = gr.Textbox(
label=f"{label} output (debug snapshot JSON)",
interactive=False,
min_width=100,
)
io = gr.Markdown(
value="",
elem_id=dom_io,
elem_classes="card-port-status",
)
with gr.Group():
with gr.Accordion(f"{label} Status", open=False):
status = gr.Textbox(
label=f"{label} status",
interactive=False,
min_width=100,
elem_classes="node-status-content",
)
with gr.Group():
with gr.Row(elem_classes="history-group-row"):
hist = gr.Dropdown(
label=None,
choices=["(empty)"],
value="(empty)",
min_width=100,
elem_classes="history-dropdown",
)
left_btn = gr.Button("←", elem_classes="hist-nav-btn")
right_btn = gr.Button("→", elem_classes="hist-nav-btn")
restore_btn = gr.Button(
f"Restore {label}",
elem_classes="restore-selected-history-btn",
visible=False,
)
return {
"status": status,
"io": io,
"out_view": out_view, # ✅ NEW
"out": out, # debug JSON
"hist": hist,
"run_fresh": run_fresh,
"run_stale": run_stale,
"run_fix": run_fix,
"restore": restore_btn,
"left": left_btn,
"right": right_btn,
}
def input_card(
key: str,
default_value,
*,
badge_position: str = "below",
min_width: int = 100,
component=None,
component_kwargs=None,
):
label = ui_input(key)
dom_id = f"dot_{key}"
component_kwargs = dict(component_kwargs or {})
component_kwargs.setdefault("interactive", True)
with gr.Column(elem_classes="input-card-node"):
if badge_position != "below":
badge = gr.Markdown(
value="",
elem_id=dom_id,
elem_classes=["card-port-status", "input-port-status", f"badge-pos-{badge_position}"],
)
# ✅ IMPORTANT: Audio has no default value
if component is gr.Audio:
tb = component(
label=label,
min_width=min_width,
**component_kwargs,
)
else:
tb = component(
label=label,
value=default_value,
min_width=min_width,
**component_kwargs,
)
if badge_position == "below":
badge = gr.Markdown(
value="",
elem_id=dom_id,
elem_classes=["card-port-status", "input-port-status", f"badge-pos-{badge_position}"],
)
apply = gr.Button(f"Apply {label}", elem_classes="apply-input-btn", visible=True)
return {
"tb": tb,
"dot": badge,
"apply": apply,
}
# -----------------------------------------------------------------------------
# Construction Gradio (layout + wiring)
# -----------------------------------------------------------------------------
CUSTOM_SPEC: dict | None = None
CUSTOM_SPEC_PREVIEW: dict | None = None
import importlib
ALLOWED_IMPORTS = {"time", "re", "math", "json", "hashlib", "random", "datetime"}
def _needed_imports_from_source(src: str, allowed: set[str]) -> list[str]:
"""
Heuristic:
- Collect top-level module names used in the function body (Name nodes in Load context)
- Keep only those in `allowed`
- Return sorted list
"""
src = textwrap.dedent(src)
try:
tree = ast.parse(src)
except SyntaxError:
return []
used: set[str] = set()
class V(ast.NodeVisitor):
def visit_Name(self, node: ast.Name):
if isinstance(node.ctx, ast.Load):
used.add(node.id)
self.generic_visit(node)
def visit_Attribute(self, node: ast.Attribute):
# time.sleep -> base "time" is a Name, captured above anyway
self.generic_visit(node)
V().visit(tree)
# Only keep allowed module names
mods = sorted(m for m in used if m in allowed)
return mods
def preset_to_python_code(preset_name: str) -> str:
fn = PIPELINE_PRESETS.get(preset_name)
if not fn:
return f"# Unknown preset: {preset_name}\n"
try:
# Build spec for graph map (optional)
spec = fn()
graph_md = graph_map_md_simple_from_spec(spec)
graph_comment = "\n".join(
f"# {line}" if line.strip() else "#"
for line in graph_md.strip().splitlines()
)
src = inspect.getsource(fn)
fn_name = fn.__name__
# ✅ minimal imports inferred from the preset function source
needed = _needed_imports_from_source(src, ALLOWED_IMPORTS)
import_block = ""
if needed:
import_block = "\n".join(f"import {m}" for m in needed) + "\n"
# ✅ one-line reminder
allowed_line = "# Allowed imports: " + ", ".join(sorted(ALLOWED_IMPORTS))
return (
"# Preset pipeline (read-only)\n"
f"# name: {preset_name}\n\n"
"# Graph map\n"
f"{graph_comment}\n\n"
f"{allowed_line}\n"
f"{import_block}\n"
f"{src}\n\n"
"# Entry point expected by the UI\n"
"def make_pipeline():\n"
f" return {fn_name}()\n"
)
except Exception as e:
return (
f"# Could not retrieve source for preset: {preset_name}\n"
f"# {e}\n"
)
def make_limited_import(allowed: set[str]):
def _limited_import(name, globals=None, locals=None, fromlist=(), level=0):
# pas d'imports relatifs
if level and level != 0:
raise ImportError("Relative imports are not allowed")
root = name.split(".", 1)[0]
if root not in allowed:
raise ImportError(f"Import '{root}' is not allowed")
return importlib.import_module(name)
return _limited_import
def ui_load_custom_python(code: str, key: int):
global CUSTOM_SPEC
safe_builtins = {
"str": str, "int": int, "float": float, "bool": bool,
"dict": dict, "list": list, "tuple": tuple, "set": set,
"len": len, "range": range, "min": min, "max": max, "sum": sum,
"enumerate": enumerate, "zip": zip, "sorted": sorted,
"repr": repr, "print": print,
"Exception": Exception,
"ValueError": ValueError, "TypeError": TypeError, "RuntimeError": RuntimeError,
"__import__": make_limited_import(ALLOWED_IMPORTS),
}
env = {
"__builtins__": safe_builtins,
"Pipeline": Pipeline,
"gr": gr,
}
try:
exec(code, env, env) # ✅ IMPORTANT
make = env.get("make_pipeline")
if not callable(make):
raise ValueError("Missing function: make_pipeline()")
spec = make()
if not isinstance(spec, dict) or "layout" not in spec:
raise ValueError("make_pipeline() must return a spec dict with key 'layout'")
CUSTOM_SPEC = spec
return gr.update(value="✅ Loaded custom Python pipeline"), key + 1
except Exception as e:
return gr.update(value=f"❌ Load failed: {e}"), key
def ui_load_custom_json(code: str, key: int):
global CUSTOM_SPEC
try:
spec = json.loads(code)
if not isinstance(spec, dict) or "layout" not in spec:
raise ValueError("Spec must be a dict with key 'layout'")
CUSTOM_SPEC = spec
return gr.update(value="✅ Loaded custom JSON pipeline"), key + 1
except Exception as e:
return gr.update(value=f"❌ Load failed: {e}"), key
def ui_load_dispatch(src: str, py_code: str, key: int, preset_name: str):
if src != "custom_python":
# en preset : tu peux soit no-op, soit forcer reload preset
return gr.update(value="(preset mode: nothing to load)"), key
# custom_python : compile/exec puis set CUSTOM_SPEC
msg_upd, new_key = ui_load_custom_python(py_code, key)
# option UX : si load OK, tu veux que le render se fasse sur CUSTOM_SPEC
return msg_upd, new_key
# --- BOOTSTRAP: always have a valid pipeline in globals before first render ---
set_pipeline_from_spec(PIPELINE_PRESETS[DEFAULT_PIPELINE_PRESET]())
DEFAULT_CODE = preset_to_python_code(DEFAULT_PIPELINE_PRESET)
with gr.Blocks(title="Mini Daggr UI Lab") as demo:
gr.Markdown("# Mini Daggr UI Lab (Clean)\n")
# ============================================================
# 1) STABLE TOP BAR (safe outside render)
# ============================================================
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("# Pipeline creation")
pipeline_source = gr.Radio(
choices=["preset", "custom_python"],
value="preset",
label="Pipeline source",
)
pipeline_choice = gr.Dropdown(
choices=list(PIPELINE_PRESETS.keys()),
value=DEFAULT_PIPELINE_PRESET,
label="Pipeline preset",
interactive=True,
)
# Graph map preview (top) - updates when user explores presets/custom,
# but does NOT control the rendered pipeline subtree.
_default_spec_for_map = PIPELINE_PRESETS[DEFAULT_PIPELINE_PRESET]()
graph_map_top = gr.Markdown(graph_map_md_simple_from_spec(_default_spec_for_map))
# Buttons: user decides when to actually load into the app
load_preset_btn = gr.Button("Load preset", visible=True)
insert_template_btn = gr.Button("Insert custom template", visible=False)
preview_custom_btn = gr.Button("Preview graph", visible=False)
preview_valid = gr.State(False)
load_custom_btn = gr.Button("Load custom pipeline", visible=False, interactive=False)
load_msg = gr.Textbox(label="Load status", interactive=False)
# States used to drive the dynamic subtree.
# IMPORTANT: these change ONLY on button click (and on demo.load auto-commit).
render_key = gr.State(0)
active_preset = gr.State(DEFAULT_PIPELINE_PRESET)
active_src = gr.State("preset")
startup_timer = gr.Timer(value=0.05, active=True)
with gr.Column(scale=2):
code_tabs = gr.Tabs()
with code_tabs:
with gr.TabItem("Preset Pipeline", id="preset_tab"):
preset_py = gr.Code(
language="python",
label="Preset pipeline (read-only)",
interactive=False,
value=DEFAULT_CODE,
lines=30,
max_lines=30,
elem_classes=["codebox"],
)
with gr.TabItem("Custom Pipeline", id="custom_tab"):
custom_py = gr.Code(
language="python",
label="Custom pipeline (editable)",
interactive=True,
value=DEFAULT_CODE,
lines=30,
max_lines=30,
elem_classes=["codebox"],
)
# ============================================================
# Source toggle (stable): tabs + button visibility + preview
# (does NOT trigger render)
# ============================================================
def ui_toggle_source(src: str, preset_name: str):
global CUSTOM_SPEC_PREVIEW
if src == "preset":
spec = PIPELINE_PRESETS[preset_name]()
# reset preview cache + gating
CUSTOM_SPEC_PREVIEW = None
return (
gr.update(selected="preset_tab"),
gr.update(interactive=True), # pipeline_choice
gr.update(visible=True), # load_preset_btn
gr.update(visible=False), # load_custom_btn (hidden)
gr.update(visible=False), # insert_template_btn (hidden)
gr.update(visible=False), # preview_custom_btn (hidden)
gr.update(value=""), # load_msg
gr.update(value=graph_map_md_simple_from_spec(spec)),
False, # preview_valid reset
gr.update(interactive=False), # load_custom_btn interactive gate
)
# --- custom_python mode ---
spec = CUSTOM_SPEC or PIPELINE_PRESETS[preset_name]()
return (
gr.update(selected="custom_tab"),
gr.update(interactive=False), # pipeline_choice locked
gr.update(visible=False), # load_preset_btn hidden
gr.update(visible=True), # load_custom_btn shown
gr.update(visible=True), # insert_template_btn shown
gr.update(visible=True), # preview_custom_btn shown
gr.update(value="Custom mode: edit code, then blur or click 'Preview graph'."), # load_msg
gr.update(value=graph_map_md_simple_from_spec(spec)),
False, # preview_valid reset on entry
gr.update(interactive=False), # load_custom_btn disabled until preview OK
)
pipeline_source.change(
ui_toggle_source,
inputs=[pipeline_source, pipeline_choice],
outputs=[
code_tabs,
pipeline_choice,
load_preset_btn,
load_custom_btn,
insert_template_btn,
preview_custom_btn,
load_msg,
graph_map_top,
preview_valid,
load_custom_btn,
],
show_progress="hidden",
queue=False,
)
# ============================================================
# Preset selection: update code + preview only (NO RENDER)
# ============================================================
def ui_on_preset_change(src: str, preset_name: str):
code = preset_to_python_code(preset_name)
spec = PIPELINE_PRESETS[preset_name]()
preset_upd = gr.update(value=code)
if src == "preset":
custom_upd = gr.update(value=code) # keep custom in sync ONLY in preset mode
graph_upd = gr.update(value=graph_map_md_simple_from_spec(spec))
tabs_upd = gr.update(selected="preset_tab")
else:
custom_upd = gr.update() # don't overwrite user edits
spec2 = CUSTOM_SPEC or spec
graph_upd = gr.update(value=graph_map_md_simple_from_spec(spec2))
tabs_upd = gr.update(selected="custom_tab")
return tabs_upd, preset_upd, custom_upd, graph_upd
pipeline_choice.change(
ui_on_preset_change,
inputs=[pipeline_source, pipeline_choice],
outputs=[code_tabs, preset_py, custom_py, graph_map_top],
show_progress="hidden",
queue=False,
)
def compile_custom_pipeline_to_spec(code: str) -> dict:
safe_builtins = {
"str": str, "int": int, "float": float, "bool": bool,
"dict": dict, "list": list, "tuple": tuple, "set": set,
"len": len, "range": range, "min": min, "max": max, "sum": sum,
"enumerate": enumerate, "zip": zip, "sorted": sorted,
"repr": repr, "print": print,
"Exception": Exception,
"ValueError": ValueError, "TypeError": TypeError, "RuntimeError": RuntimeError,
"__import__": make_limited_import(ALLOWED_IMPORTS),
}
env = {
"__builtins__": safe_builtins,
"Pipeline": Pipeline,
"gr": gr,
}
exec(code, env, env)
make = env.get("make_pipeline")
if not callable(make):
raise ValueError("Missing function: make_pipeline()")
spec = make()
if not isinstance(spec, dict) or "layout" not in spec:
raise ValueError("make_pipeline() must return a spec dict with key 'layout'")
return spec
def ui_preview_custom(py_code: str):
global CUSTOM_SPEC_PREVIEW
try:
spec = compile_custom_pipeline_to_spec(py_code)
md = graph_map_md_simple_from_spec(spec)
CUSTOM_SPEC_PREVIEW = spec
return (
gr.update(value="✅ Preview OK"),
gr.update(value=md),
True,
gr.update(interactive=True), # load_custom_btn
)
except Exception as e:
CUSTOM_SPEC_PREVIEW = None
return (
gr.update(value=f"❌ Preview failed: {e}"),
gr.update(value=f"## Graph map\n\n❌ `{e}`"),
False,
gr.update(interactive=False), # disable load button
)
preview_custom_btn.click(
ui_preview_custom,
inputs=[custom_py],
outputs=[load_msg, graph_map_top, preview_valid, load_custom_btn],
show_progress="hidden",
queue=False,
)
custom_py.blur(
ui_preview_custom,
inputs=[custom_py],
outputs=[load_msg, graph_map_top, preview_valid, load_custom_btn],
show_progress="hidden",
queue=False,
)
# ============================================================
# LOAD ACTIONS (ONLY these trigger render)
# ============================================================
def ui_load_preset(preset_name: str, key: int):
return (
gr.update(value=f"✅ Loaded preset: {preset_name}"),
preset_name, # active_preset
"preset", # active_src
key + 1, # render_key bump
)
load_preset_btn.click(
ui_load_preset,
inputs=[pipeline_choice, render_key],
outputs=[load_msg, active_preset, active_src, render_key],
show_progress="hidden",
queue=False,
)
def ui_load_custom(py_code: str, key: int, preset_name: str):
global CUSTOM_SPEC, CUSTOM_SPEC_PREVIEW
if CUSTOM_SPEC_PREVIEW is None:
return (
gr.update(value="❌ Load refused: run 'Preview graph' successfully first."),
gr.update(), # graph_map_top unchanged
preset_name,
"custom_python",
key, # no bump
)
CUSTOM_SPEC = CUSTOM_SPEC_PREVIEW
# graph already generated => keep it, or regenerate from CUSTOM_SPEC
graph_upd = gr.update(value=graph_map_md_simple_from_spec(CUSTOM_SPEC))
return (
gr.update(value="✅ Loaded custom pipeline (from preview)"),
graph_upd,
preset_name,
"custom_python",
key + 1,
)
load_custom_btn.click(
ui_load_custom,
inputs=[custom_py, render_key, pipeline_choice],
outputs=[load_msg, graph_map_top, active_preset, active_src, render_key],
show_progress="hidden",
queue=False,
)
def ui_boot_default(preset_name: str, key: int):
# on réutilise la même logique que le bouton "Load preset"
msg, ap, asrc, new_key = ui_load_preset(preset_name, key)
# on désactive le timer => one-shot
return msg, ap, asrc, new_key, gr.update(active=False)
startup_timer.tick(
ui_boot_default,
inputs=[pipeline_choice, render_key],
outputs=[load_msg, active_preset, active_src, render_key, startup_timer],
show_progress="hidden",
queue=False,
)
def ui_insert_custom_template_and_validate():
code = CUSTOM_TEMPLATE
# Reuse the exact same validation path as Preview graph / blur
msg_upd, graph_upd, ok, load_btn_upd = ui_preview_custom(code)
return (
gr.update(value=code), # custom_py
msg_upd, # load_msg
graph_upd, # graph_map_top
ok, # preview_valid
load_btn_upd, # load_custom_btn (interactive gate)
)
insert_template_btn.click(
ui_insert_custom_template_and_validate,
inputs=[],
outputs=[custom_py, load_msg, graph_map_top, preview_valid, load_custom_btn],
show_progress="hidden",
queue=False,
)
# ============================================================
# DYNAMIC SUBTREE
# - NOT triggered by dropdown change
# - Triggered ONLY by:
# - preset_load_ev
# - custom_load_ev
# - bootstrap_ev
# ============================================================
@gr.render(
inputs=[active_preset, active_src, render_key],
triggers=[startup_timer.tick, load_preset_btn.click, load_custom_btn.click],
)
def render_app(preset_name: str, src: str, _key: int):
# ---- choose spec ----
if src == "preset":
spec = PIPELINE_PRESETS[preset_name]()
else:
spec = CUSTOM_SPEC or PIPELINE_PRESETS[preset_name]()
# ---- rebuild globals (G/CTRL/S/layout/orders/labels/components) ----
set_pipeline_from_spec(spec)
# ---- one-shot init trigger: runs once after this subtree mounts ----
init_timer = gr.Timer(value=0.05, active=True)
# ============================================================
# 2.b) PIPELINE UI
# ============================================================
gr.Markdown("## Pipeline")
input_ui: dict[str, dict] = {}
node_ui: dict[str, dict] = {}
with gr.Row(elem_id="pipeline"):
for col in PIPELINE_LAYOUT:
with gr.Column():
for item in col:
if item["kind"] == "input":
input_ui[item["name"]] = input_card(
item["name"],
item.get("default", ""),
badge_position=item.get("badge_position", "below"),
min_width=item.get("min_width", 100),
component=item.get("component"),
component_kwargs=item.get("component_kwargs"),
)
elif item["kind"] == "node":
node_ui[item["name"]] = node_card(
item["name"],
out_component=item.get("out_component"),
out_component_kwargs=item.get("out_component_kwargs"),
)
else:
raise ValueError(f"Unknown kind: {item['kind']}")
# ============================================================
# 2.a) PRE-PANEL
# ============================================================
with gr.Row(elem_id="pre-pipe-panel"):
with gr.Column():
gr.Markdown("## Graph settings")
settings_tabs = gr.Tabs(selected="compute_tab")
with settings_tabs:
with gr.TabItem("Compute", id="compute_tab"):
auto_up = gr.Checkbox(value=True, label="Auto-run upstream (Daggr-like)", interactive=True)
force_up = gr.Checkbox(value=False, label="Force upstream recompute", interactive=True)
force_down = gr.Checkbox(value=False, label="Force downstream recompute", interactive=True)
force_topo = gr.Checkbox(value=True, label="Force order: topo (recommended)", interactive=True)
with gr.TabItem("Snapshots", id="snapshots_tab"):
reuse_identical = gr.Checkbox(
value=False,
label="Reuse identical input values (deduplicate input history)",
)
skip_if_unchanged = gr.Checkbox(
value=False,
label="Don't run node if nothing changes upstream (no-op run)",
interactive=True,
)
on_error = gr.Radio(
choices=["no_snapshot", "snapshot"],
value="no_snapshot",
label="On node error",
interactive=True,
)
with gr.TabItem("Restore", id="restore_tab"):
restore_mode = gr.Radio(
choices=[
"node_only",
"node+controls",
"node+controls+upstream",
"node+controls+upstream+downstream_match",
],
value="node+controls+upstream+downstream_match",
label="Restore mode",
interactive=True,
)
reconcile = gr.Checkbox(value=False, label="Restore: reconcile", interactive=True)
with gr.Column():
gr.Markdown("## Display settings")
with gr.Row():
badge_label_mode = gr.Radio(
choices=["ports", "nodes"],
value=BADGE_LABEL_MODE,
label="Badge labels",
interactive=True,
)
wire_color_mode = gr.Radio(
choices=["rich", "daggr"],
value=WIRE_COLOR_MODE,
label="Wire color mode",
interactive=True,
)
with gr.Column():
gr.Markdown("## Debug panels")
global_panels_tabs = gr.Tabs()
with global_panels_tabs:
with gr.TabItem("Nodes Summary", id="nodes_summary_tab"):
nodes_summary = gr.Textbox(label="Nodes summary", lines=8, max_lines=8, interactive=False)
with gr.TabItem("Edges Summary", id="edges_summary_tab"):
edges_summary = gr.Textbox(label="Wires (detailed why)", lines=8, max_lines=8, interactive=False)
with gr.TabItem("Debug state"):
debug = gr.Code(label="Debug state", language="json", lines=14, max_lines=14)
reset_btn = gr.Button("Reset lab")
msg = gr.Textbox(label="Message", interactive=False)
# ============================================================
# 2.c) OUTPUTS (MUST match render_all() exactly)
# ============================================================
OUTPUTS: list[Any] = []
for cname in INPUT_ORDER:
OUTPUTS += [input_ui[cname]["dot"], input_ui[cname]["tb"]]
OUTPUTS += [msg, nodes_summary, edges_summary, debug]
for n in NODE_ORDER_UI:
ui = node_ui[n]
OUTPUTS += [
ui["status"],
ui["io"],
ui["out_view"],
ui["out"],
ui["hist"],
ui["left"],
ui["right"],
ui["run_fresh"],
ui["run_stale"],
ui["run_fix"],
]
# ============================================================
# 2.d) EVENT WIRING (inside subtree => no stacking)
# ============================================================
ALL_TBS = [input_ui[c]["tb"] for c in INPUT_ORDER]
run_inputs = [auto_up, force_up, force_down, force_topo, on_error, skip_if_unchanged, *ALL_TBS]
reset_btn.click(ui_reset, inputs=ALL_TBS, outputs=OUTPUTS, show_progress="hidden", queue=False)
for cname in INPUT_ORDER:
tb = input_ui[cname]["tb"]
apply = input_ui[cname]["apply"]
def _commit(v, cn=cname):
return ui_set_input(cn, v)
apply.click(_commit, inputs=[tb], outputs=OUTPUTS, show_progress="hidden", queue=False)
if hasattr(tb, "blur"):
tb.blur(_commit, inputs=[tb], outputs=OUTPUTS, show_progress="hidden", queue=False)
if hasattr(tb, "submit"):
tb.submit(_commit, inputs=[tb], outputs=OUTPUTS, show_progress="hidden", queue=False)
else:
for ev in ("upload", "stop_recording", "clear"):
if hasattr(tb, ev):
getattr(tb, ev)(_commit, inputs=[tb], outputs=OUTPUTS, show_progress="hidden", queue=False)
for node_name in NODE_ORDER_UI:
fn = ui_run_node(node_name)
ui = node_ui[node_name]
ui["run_fresh"].click(fn, inputs=run_inputs, outputs=OUTPUTS, show_progress="hidden")
ui["run_stale"].click(fn, inputs=run_inputs, outputs=OUTPUTS, show_progress="hidden")
ui["run_fix"].click(fn, inputs=run_inputs, outputs=OUTPUTS, show_progress="hidden")
for node_name in NODE_ORDER_UI:
ui = node_ui[node_name]
hist = ui["hist"]
cb = hist.blur if hasattr(hist, "blur") else hist.change
cb(
lambda idx, m, r, nn=node_name: ui_restore(nn, idx, m, r),
inputs=[hist, restore_mode, reconcile],
outputs=OUTPUTS,
show_progress="hidden",
queue=False,
)
ui["left"].click(
lambda m, r, nn=node_name: ui_nav_hist(nn, -1, m, r),
inputs=[restore_mode, reconcile],
outputs=OUTPUTS,
show_progress="hidden",
queue=False,
)
ui["right"].click(
lambda m, r, nn=node_name: ui_nav_hist(nn, +1, m, r),
inputs=[restore_mode, reconcile],
outputs=OUTPUTS,
show_progress="hidden",
queue=False,
)
badge_label_mode.change(
ui_set_badge_label_mode,
inputs=[badge_label_mode],
outputs=OUTPUTS,
show_progress="hidden",
queue=False,
)
wire_color_mode.change(
ui_set_wire_color_mode,
inputs=[wire_color_mode],
outputs=OUTPUTS,
show_progress="hidden",
queue=False,
)
defaults = [
next(
it["default"]
for col in PIPELINE_LAYOUT
for it in col
if it["kind"] == "input" and it["name"] == cname
)
for cname in INPUT_ORDER
]
def ui_init_once():
payload = ui_reset(*defaults)
return (*payload, gr.update(active=False))
init_timer.tick(
ui_init_once,
outputs=OUTPUTS + [init_timer],
show_progress="hidden",
queue=False,
)
if __name__ == "__main__":
demo.launch(css=CSS, ssr_mode=False)