Spaces:
Sleeping
Sleeping
| """ | |
| Mini Daggr UI Lab | |
| ================= | |
| A self-contained experimental UI to explore and debug Daggr-like execution semantics. | |
| This app is NOT a reimplementation of Daggr. | |
| It is a **debug and exploration lab** focused on clarity and edge cases. | |
| What this app models: | |
| - A small DAG of function nodes | |
| - Versioned inputs (controls) with history | |
| - Nodes produce snapshots (history of executions) | |
| - Each snapshot records: | |
| - inputs values | |
| - upstream snapshot indices | |
| - control versions | |
| - execution status (ok / error) | |
| What the UI shows: | |
| - Wire colors that explain *why* a node is fresh, stale, mismatched, failed, or blocked | |
| - Snapshot history per node | |
| - Restore strategies (node-only, upstream, downstream auto-match) | |
| - Explicit visualization of error propagation and blocking | |
| Key design goals: | |
| - Make execution semantics observable | |
| - Make restore behavior explainable | |
| - Make failure propagation debuggable | |
| - Keep everything explicit and readable | |
| This file is intentionally large but structured in clear sections: | |
| 1. Helpers & utilities | |
| 2. Core execution engine (Graph + FrontendState) | |
| 3. Pipeline DSL (Pipeline, ports, wiring) | |
| 4. Pipeline definition (the part a developer edits) | |
| 5. UI rendering & event wiring (Gradio) | |
| If you want to understand or modify the pipeline, | |
| jump directly to the **PIPELINE DEFINITION** section. | |
| """ | |
| # ============================================================ | |
| # JUMP POINTS | |
| # - PIPELINE DEFINITION | |
| # - PIPELINE DSL | |
| # - EXECUTION ENGINE | |
| # - UI (GRADIO) | |
| # ============================================================ | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from typing import Any, Callable, Dict, List, Optional, Set, Tuple | |
| import ast | |
| import inspect | |
| import textwrap | |
| import hashlib | |
| import json | |
| import time | |
| import re | |
| import gradio as gr | |
| # ============================================================================= | |
| # Helpers & UI primitives | |
| # ============================================================================= | |
| def stable_hash(obj: Any) -> str: | |
| """ | |
| Stable, short hash for signatures: | |
| - JSON-dumps with sort_keys | |
| - sha256 truncated to 12 | |
| """ | |
| s = json.dumps(obj, sort_keys=True, ensure_ascii=False, default=str) | |
| return hashlib.sha256(s.encode("utf-8")).hexdigest()[:12] | |
| WIRE_COLOR_MODE = "rich" # "rich" | "daggr" | |
| def map_wire_color_for_ui(color: str) -> str: | |
| if WIRE_COLOR_MODE == "daggr": | |
| # Daggr-like: only "ok" vs "not ok" | |
| if color == "green": | |
| return "orange" # Gradio official orange | |
| return "gray" | |
| return color | |
| def dot_html(color: str) -> str: | |
| color = map_wire_color_for_ui(color) | |
| css = { | |
| "green": "#16a34a", | |
| "orange": "#f59e0b", | |
| "red": "#ef4444", | |
| "blue": "#3b82f6", | |
| "gray": "#9ca3af", | |
| "purple": "#a855f7", # blocked | |
| }.get(color, "#9ca3af") | |
| return ( | |
| "<span style='display:inline-block;width:10px;height:10px;" | |
| f"border-radius:999px;background:{css};vertical-align:middle;'></span>" | |
| ) | |
| def emoji_wire(color: str) -> str: | |
| color = map_wire_color_for_ui(color) | |
| return { | |
| "green": "🟢", | |
| "orange": "🟠", | |
| "red": "🔴", | |
| "blue": "🔵", | |
| "gray": "⚪", | |
| "purple": "🟣", | |
| }.get(color, "⚪") | |
| def pretty_val(v: Any, max_len: int = 80) -> str: | |
| s = repr(v) | |
| if len(s) > max_len: | |
| return s[: max_len - 1] + "…" | |
| return s | |
| def parse_hist_choice_idx(idx_str: str) -> Optional[int]: | |
| """ | |
| Parse strings like: | |
| "12 | ✓ | run=... | ..." | |
| Returns int index, or None. | |
| """ | |
| if not idx_str or idx_str.startswith("("): | |
| return None | |
| try: | |
| return int(idx_str.split("|", 1)[0].strip()) | |
| except Exception: | |
| return None | |
| FILE_COMPONENTS = { | |
| gr.Audio, | |
| gr.Video, | |
| gr.Image, | |
| gr.File, | |
| } | |
| # ============================================================================= | |
| # TEMPORARY HARD LIMITS (build-time constraints) | |
| # ============================================================================= | |
| DISABLE_MULTI_OUTPUT = True # multi-return + multi declared outputs disabled | |
| DISABLE_FANOUT = True # same (node,port) cannot feed multiple children | |
| # ============================================================================= | |
| # Wire legend (shown in UI) | |
| # ============================================================================= | |
| COLOR_RULES_MD = ( | |
| "### Wire color rules\n" | |
| "- 🔵 **blue**: node never ran yet (no selected snapshot)\n" | |
| "- 🔴 **red**: node failed (selected error snapshot OR last run failed on this node)\n" | |
| "- 🟣 **purple**: **blocked** (this node did not run because an upstream parent failed in the last run)\n" | |
| "- ⚪ **gray**: structurally incompatible (indices not aligned)\n" | |
| "- 🟠 **orange**: aligned but stale (controls changed since snapshot)\n" | |
| "- 🟢 **green**: aligned and fresh\n" | |
| ) | |
| GRAPH_MAP_MD = ( | |
| "## Graph map\n" | |
| "```\n" | |
| "Input[d] → D →\\\n" | |
| " A → B → C\n" | |
| "Input[e] → E →/ ^\n" | |
| " Input[b]\n" | |
| "```\n" | |
| "\n" | |
| "**Legend:**\n" | |
| "- Boxes (D, E, A, B, C) are function nodes\n" | |
| "- Input[x] are UI inputs (versioned, no run button)\n" | |
| "- Colors apply to **wires and dots only** (see Global panels)\n" | |
| ) | |
| def graph_map_md_simple_from_spec(spec: dict) -> str: | |
| """ | |
| Simple + pretty ASCII map (no globals), derived from a pipeline spec. | |
| Goals: | |
| - readable map like: | |
| Input[d] → D →\ | |
| A → B → C | |
| Input[e] → E →/ ^ | |
| Input[b] | |
| - handles: | |
| - one main merge (2+ parents into one node) if present | |
| - multiple "late inputs" shown under a single caret | |
| - chooses a stable main chain (longest path), so it doesn't randomly miss nodes | |
| Not a general ASCII router: prioritizes clarity. | |
| """ | |
| # ----------------------------- | |
| # Helpers to read spec + labels | |
| # ----------------------------- | |
| layout = spec.get("layout") or [] | |
| node_items = [it for col in layout for it in col if it.get("kind") == "node"] | |
| input_items = [it for col in layout for it in col if it.get("kind") == "input"] | |
| node_label = {it["name"]: it.get("label") or it["name"] for it in node_items} | |
| input_label = {it["name"]: it.get("label") or it["name"] for it in input_items} | |
| def N(n: str) -> str: | |
| return node_label.get(n, n) | |
| def I(c: str) -> str: | |
| # display "Input[d]" even if label is "Input d" | |
| # keep your existing style | |
| return f"Input[{c}]" | |
| # ----------------------------------------- | |
| # Build a minimal graph from node_items/wires | |
| # ----------------------------------------- | |
| class _MiniGraph: | |
| def __init__(self): | |
| self.nodes = set() | |
| self.parents = {} # node -> list[parent] | |
| self.children = {} # node -> list[child] | |
| def add_node(self, n: str): | |
| self.nodes.add(n) | |
| self.parents.setdefault(n, []) | |
| self.children.setdefault(n, []) | |
| def add_edge(self, p: str, c: str): | |
| self.add_node(p) | |
| self.add_node(c) | |
| if p not in self.parents[c]: | |
| self.parents[c].append(p) | |
| if c not in self.children[p]: | |
| self.children[p].append(c) | |
| def parents_of(self, n: str) -> list[str]: | |
| return list(self.parents.get(n, [])) | |
| def children_of(self, n: str) -> list[str]: | |
| return list(self.children.get(n, [])) | |
| g = _MiniGraph() | |
| for it in node_items: | |
| g.add_node(it["name"]) | |
| # node wires: child_port -> (parent_node, parent_port) | |
| for it in node_items: | |
| child = it["name"] | |
| for _child_port, (parent, _parent_port) in (it.get("wires") or {}).items(): | |
| if parent in g.nodes and child in g.nodes: | |
| g.add_edge(parent, child) | |
| # control feeds: input -> (node, port) | |
| control_ports = {} | |
| for it in input_items: | |
| cname = it["name"] | |
| feeds = it.get("feeds") | |
| if isinstance(feeds, (tuple, list)) and len(feeds) == 2: | |
| control_ports[cname] = (feeds[0], feeds[1]) # (node, port) | |
| # map control -> target node | |
| control_to_node = {c: tgt[0] for c, tgt in control_ports.items()} | |
| def controls_feeding_node(n: str) -> list[str]: | |
| return [c for c, nn in control_to_node.items() if nn == n] | |
| nodes = list(g.nodes) | |
| if not nodes: | |
| return "## Graph map\n```\n(empty)\n```" | |
| # ----------------------------- | |
| # Longest-path chain (stable) | |
| # ----------------------------- | |
| def topo_order() -> list[str]: | |
| indeg = {n: 0 for n in nodes} | |
| for n in nodes: | |
| for ch in g.children_of(n): | |
| if ch in indeg: | |
| indeg[ch] += 1 | |
| q = [n for n in nodes if indeg[n] == 0] | |
| out = [] | |
| while q: | |
| cur = q.pop(0) | |
| out.append(cur) | |
| for ch in g.children_of(cur): | |
| if ch in indeg: | |
| indeg[ch] -= 1 | |
| if indeg[ch] == 0: | |
| q.append(ch) | |
| # fallback if cycle or disconnected weirdness | |
| for n in nodes: | |
| if n not in out: | |
| out.append(n) | |
| return out | |
| def longest_path_chain() -> list[str]: | |
| topo = topo_order() | |
| dist = {n: 0 for n in topo} | |
| prev = {n: None for n in topo} | |
| for n in topo: | |
| for ch in g.children_of(n): | |
| if ch not in dist: | |
| continue | |
| if dist[n] + 1 > dist[ch]: | |
| dist[ch] = dist[n] + 1 | |
| prev[ch] = n | |
| end = max(topo, key=lambda n: dist.get(n, 0)) | |
| chain = [] | |
| cur = end | |
| while cur is not None: | |
| chain.append(cur) | |
| cur = prev[cur] | |
| chain.reverse() | |
| return chain | |
| chain = longest_path_chain() | |
| chain_set = set(chain) | |
| # ----------------------------- | |
| # Pick a merge node (prefer on chain) | |
| # ----------------------------- | |
| merge = None | |
| for n in chain: | |
| if len(g.parents_of(n)) >= 2: | |
| merge = n | |
| break | |
| if merge is None: | |
| # any merge anywhere | |
| merge = next((n for n in nodes if len(g.parents_of(n)) >= 2), None) | |
| # ----------------------------- | |
| # Build lines | |
| # ----------------------------- | |
| lines: list[str] = [] | |
| # Case A: we have a merge and can show 2 parent lines nicely | |
| if merge: | |
| parents = g.parents_of(merge) | |
| # choose two parents that themselves have a control (if possible) | |
| def parent_score(p: str) -> int: | |
| return 1 if controls_feeding_node(p) else 0 | |
| parents_sorted = sorted(parents, key=parent_score, reverse=True) | |
| p1 = parents_sorted[0] | |
| p2 = parents_sorted[1] if len(parents_sorted) > 1 else None | |
| # left texts "Input[x] → P" | |
| def left_txt(p: str) -> str: | |
| cs = controls_feeding_node(p) | |
| if cs: | |
| # if multiple controls feed same node, show first (keep simple) | |
| return f"{I(cs[0])} → {N(p)}" | |
| return f"{N(p)}" | |
| top = f"{left_txt(p1)} →\\" | |
| bot = f"{left_txt(p2)} →/" if p2 else "" | |
| # chain from merge onward (if merge not in chain, just show merge then follow one child) | |
| if merge in chain: | |
| chain_from_merge = chain[chain.index(merge):] | |
| else: | |
| # fallback: merge + follow first child path | |
| chain_from_merge = [merge] | |
| cur = merge | |
| seen = {merge} | |
| while True: | |
| kids = g.children_of(cur) | |
| if not kids: | |
| break | |
| nxt = kids[0] | |
| if nxt in seen: | |
| break | |
| chain_from_merge.append(nxt) | |
| seen.add(nxt) | |
| cur = nxt | |
| mid_txt = " → ".join(N(n) for n in chain_from_merge) | |
| slash_col = max(len(top), len(bot)) if bot else len(top) | |
| top_line = top.ljust(slash_col).rstrip() | |
| bot_line = bot.ljust(slash_col).rstrip() if bot else "" | |
| mid_prefix = " " * slash_col | |
| mid_line = f"{mid_prefix}{mid_txt}".rstrip() | |
| lines.append(top_line) | |
| lines.append(mid_line) | |
| if bot_line: | |
| lines.append(bot_line) | |
| # Late controls (controls feeding nodes in chain, excluding parents shown above) | |
| excluded_nodes = {p1} | |
| if p2: | |
| excluded_nodes.add(p2) | |
| late_controls = [] | |
| for c, n in control_to_node.items(): | |
| if n in chain_set and n not in excluded_nodes: | |
| late_controls.append((c, n)) | |
| # Put caret under the FIRST late target if any, else none | |
| if late_controls: | |
| c0, n0 = late_controls[0] | |
| target = N(n0) | |
| idx = mid_line.find(target) | |
| if idx != -1: | |
| caret_pos = idx + max(1, len(target) // 2) | |
| lines.append((" " * caret_pos + "^").rstrip()) | |
| # show ALL late inputs, stacked under caret | |
| for c_late, _n_late in late_controls: | |
| pad = max(0, caret_pos - len(I(c_late)) // 2) | |
| lines.append((" " * pad + I(c_late)).rstrip()) | |
| # Case B: no merge → single chain + (optional) late inputs | |
| else: | |
| chain_txt = " → ".join(N(n) for n in chain) | |
| lines.append(chain_txt.rstrip()) | |
| late_controls = [] | |
| for c, n in control_to_node.items(): | |
| if n in chain_set: | |
| late_controls.append((c, n)) | |
| if late_controls: | |
| c0, n0 = late_controls[0] | |
| target = N(n0) | |
| idx = chain_txt.find(target) | |
| if idx != -1: | |
| caret_pos = idx + max(1, len(target) // 2) | |
| lines.append((" " * caret_pos + "^").rstrip()) | |
| for c_late, _n_late in late_controls: | |
| pad = max(0, caret_pos - len(I(c_late)) // 2) | |
| lines.append((" " * pad + I(c_late)).rstrip()) | |
| # ----------------------------- | |
| # Ensure we didn't "forget" isolated nodes: | |
| # if some nodes aren't in chain and not merge parents, list them in a small footer | |
| # ----------------------------- | |
| shown_nodes = set() | |
| for ln in lines: | |
| for n in nodes: | |
| if N(n) in ln: | |
| shown_nodes.add(n) | |
| missing = [n for n in nodes if n not in shown_nodes] | |
| if missing: | |
| lines.append("") | |
| lines.append("Other nodes:") | |
| lines.append(", ".join(N(n) for n in missing)) | |
| return "## Graph map\n```\n" + "\n".join(lines).rstrip() + "\n```" | |
| def graph_map_md_simple() -> str: | |
| """ | |
| Simple + pretty ASCII map: | |
| - supports one main merge (two parents into one node) | |
| - supports one or more "late inputs" into chain nodes (rendered with ^ and Input[...] below) | |
| Intended for readability, not full general routing. | |
| Uses current globals: S (FrontendState), PIPELINE_LAYOUT, ui_node/ui_input, S.control_ports | |
| """ | |
| # --- collect nodes (exclude inputs) --- | |
| nodes = list(S.g.nodes.keys()) | |
| # indegree / parents | |
| parents = {n: S.g.parents_of(n) for n in nodes} | |
| # find merge node: prefer first with >=2 parents | |
| merge_nodes = [n for n in nodes if len(parents[n]) >= 2] | |
| merge = merge_nodes[0] if merge_nodes else None | |
| # build a main chain starting at merge (or a topo-first node) | |
| def topo(): | |
| # tiny topo based on existing helper | |
| # pick any node and ask topo_upstream_to then filter unique order | |
| # better: compute topo over all nodes | |
| indeg = {n: 0 for n in nodes} | |
| for n in nodes: | |
| for ch in S.g.children_of(n): | |
| if ch in indeg: | |
| indeg[ch] += 1 | |
| q = [n for n in nodes if indeg[n] == 0] | |
| out = [] | |
| while q: | |
| cur = q.pop(0) | |
| out.append(cur) | |
| for ch in S.g.children_of(cur): | |
| if ch in indeg: | |
| indeg[ch] -= 1 | |
| if indeg[ch] == 0: | |
| q.append(ch) | |
| # stable fallback | |
| if len(out) != len(nodes): | |
| for n in nodes: | |
| if n not in out: | |
| out.append(n) | |
| return out | |
| topo_order = topo() | |
| # choose start of chain: | |
| # - if merge exists, chain starts at merge | |
| # - else, pick a sink-ish path from first topo node | |
| if merge: | |
| chain = [merge] | |
| cur = merge | |
| while True: | |
| kids = S.g.children_of(cur) | |
| if not kids: | |
| break | |
| # choose the child that appears earliest in topo_order after cur | |
| kids_sorted = sorted(kids, key=lambda k: topo_order.index(k) if k in topo_order else 10**9) | |
| nxt = kids_sorted[0] | |
| if nxt in chain: | |
| break | |
| chain.append(nxt) | |
| cur = nxt | |
| else: | |
| # pick a simple forward chain from first topo node | |
| start = topo_order[0] if topo_order else None | |
| chain = [] | |
| cur = start | |
| while cur: | |
| chain.append(cur) | |
| kids = S.g.children_of(cur) | |
| if not kids: | |
| break | |
| kids_sorted = sorted(kids, key=lambda k: topo_order.index(k) if k in topo_order else 10**9) | |
| nxt = kids_sorted[0] | |
| if nxt in chain: | |
| break | |
| cur = nxt | |
| # helper labels | |
| def N(n): # node label | |
| return ui_node(n) if "ui_node" in globals() else n | |
| def I(c): # input label | |
| return f"Input[{c}]" | |
| # map controls to their target node | |
| control_to_node = {c: tgt[0] for c, tgt in S.control_ports.items()} | |
| # find controls that feed the two parents of merge (if any) | |
| # and controls that feed nodes inside the chain (late inputs) | |
| def control_feeding_node(n): | |
| return [c for c, nn in control_to_node.items() if nn == n] | |
| # parent lines if merge | |
| top_line = "" | |
| bot_line = "" | |
| mid_line = "" | |
| caret_line = "" | |
| below_line = "" | |
| if merge and len(parents[merge]) >= 2: | |
| p1, p2 = parents[merge][0], parents[merge][1] | |
| # assume each parent has one direct control (Input[d] -> D, etc.) for pretty rendering | |
| c1 = control_feeding_node(p1) | |
| c2 = control_feeding_node(p2) | |
| left1 = f"{I(c1[0])} → {N(p1)}" if c1 else f"{N(p1)}" | |
| left2 = f"{I(c2[0])} → {N(p2)}" if c2 else f"{N(p2)}" | |
| # build chain text from merge onward | |
| chain_txt = " → ".join(N(n) for n in chain) | |
| # We want: | |
| # left1 → p1 →\ | |
| # merge → ... | |
| # left2 → p2 →/ | |
| # | |
| # alignment: put merge start under the end of slashes | |
| left1_txt = f"{left1} →\\" | |
| left2_txt = f"{left2} →/" | |
| # choose indent so that merge begins under the slashes column | |
| slash_col = max(len(left1_txt), len(left2_txt)) | |
| indent = " " * (slash_col - len(N(merge)) - 2) # approximate, then we’ll correct below | |
| # better: force merge to start at column = slash_col | |
| # So mid line begins with spaces to slash_col, then chain. | |
| mid_prefix = " " * (slash_col) | |
| mid_line = f"{mid_prefix}{chain_txt}" | |
| # top and bottom lines padded to slash_col | |
| top_line = left1_txt.ljust(slash_col) # ends at slash | |
| bot_line = left2_txt.ljust(slash_col) | |
| # Now add the merge node label on mid line, aligned under the slash join: | |
| # We want the merge label at exactly slash_col - 1 maybe, but simpler: | |
| # Put 15 spaces then "A → B → C" | |
| # We'll insert merge label at start of chain_txt already includes it (chain starts with merge) | |
| # So mid_line is fine. | |
| # Late inputs: any control feeding a node in chain but not feeding the first parent nodes | |
| late_controls = [] | |
| chain_set = set(chain) | |
| for c, n in control_to_node.items(): | |
| if n in chain_set and n not in (p1, p2): | |
| late_controls.append((c, n)) | |
| # only render the first late control nicely (matches your example) | |
| if late_controls: | |
| c_late, n_late = late_controls[0] | |
| # position caret under the target node label in mid_line | |
| # find substring " → <node> →" or "<node>" occurrence | |
| target = N(n_late) | |
| idx = mid_line.find(target) | |
| if idx != -1: | |
| caret_pos = idx + len(target)//2 | |
| caret_line = " " * caret_pos + "^" | |
| below_line = " " * (max(0, caret_pos - len(I(c_late))//2)) + I(c_late) | |
| else: | |
| # no merge: just print a single chain, and optionally a late input caret | |
| chain_txt = " → ".join(N(n) for n in chain) | |
| mid_line = chain_txt | |
| late_controls = [] | |
| chain_set = set(chain) | |
| for c, n in control_to_node.items(): | |
| if n in chain_set: | |
| late_controls.append((c, n)) | |
| if late_controls: | |
| c_late, n_late = late_controls[0] | |
| target = N(n_late) | |
| idx = mid_line.find(target) | |
| if idx != -1: | |
| caret_pos = idx + len(target)//2 | |
| caret_line = " " * caret_pos + "^" | |
| below_line = " " * (max(0, caret_pos - len(I(c_late))//2)) + I(c_late) | |
| # assemble | |
| lines = [] | |
| if top_line: | |
| lines.append(top_line.rstrip()) | |
| if mid_line: | |
| lines.append(mid_line.rstrip()) | |
| if bot_line: | |
| lines.append(bot_line.rstrip()) | |
| if caret_line: | |
| lines.append(caret_line.rstrip()) | |
| if below_line: | |
| lines.append(below_line.rstrip()) | |
| # wrap markdown | |
| return "## Graph map\n```\n" + "\n".join(lines) + "\n```" | |
| # ----------------------------------------------------------------------------- | |
| # Badge label mode (global UI toggle) | |
| # ----------------------------------------------------------------------------- | |
| BADGE_LABEL_MODE = "nodes" # "ports" | "nodes" | |
| # ============================================================================= | |
| # EXECUTION ENGINE | |
| # - Snapshot, RestoreContext | |
| # - Graph | |
| # - Controls | |
| # - FrontendState | |
| # ============================================================================= | |
| class Snapshot: | |
| inputs: Dict[str, Any] | |
| upstream_selected: Dict[str, int] | |
| outputs: Optional[Dict[str, Any]] | |
| sig: str | |
| controls_selected: Dict[str, int] | |
| status: str = "ok" # "ok" | "error" | |
| error: Optional[str] = None | |
| run_id: Optional[int] = None | |
| run_phase: Optional[str] = None # upstream(auto) | upstream(force) | root | downstream(force) | |
| run_step: Optional[int] = None # 0,1,2... within the run | |
| class RestoreContext: | |
| """ | |
| A "solver context" used by restore_with_context: | |
| - ctx.sel: proposed selected snapshot index per node | |
| - ctx.ctrl_sel: proposed selected index per control | |
| - ctx.locked: nodes that cannot be moved (root etc.) | |
| - ctx.touched: nodes moved in a *pass* (prevents double-move within pass) | |
| - ctx.reconciled: nodes frozen across passes (patch 3) | |
| """ | |
| root: str | |
| mode: str | |
| sel: Dict[str, int] = field(default_factory=dict) | |
| locked: Set[str] = field(default_factory=set) | |
| touched: Set[str] = field(default_factory=set) | |
| ctrl_sel: Dict[str, int] = field(default_factory=dict) | |
| ctrl_locked: Set[str] = field(default_factory=set) | |
| reconciled: Set[str] = field(default_factory=set) | |
| log: List[str] = field(default_factory=list) | |
| def get_idx(self, node: str, fallback: Dict[str, int]) -> int: | |
| return self.sel.get(node, fallback.get(node, -1)) | |
| def set_idx(self, node: str, idx: int, reason: str) -> bool: | |
| if node in self.locked: | |
| return False | |
| if node in self.touched: | |
| return False | |
| self.sel[node] = idx | |
| self.touched.add(node) | |
| self.log.append(f"set {node} -> {idx} ({reason})") | |
| return True | |
| class FnNode: | |
| def __init__(self, name: str, fn: Callable[..., Any], input_ports: List[str], output_ports: List[str]): | |
| self.name = name | |
| self.fn = fn | |
| self.input_ports = input_ports | |
| self.output_ports = output_ports | |
| class Graph: | |
| """ | |
| Minimal directed graph with port-level wiring. | |
| - rev maps (dst_node, dst_port) -> (src_node, src_port) | |
| - fwd maps src_node -> [child_nodes] | |
| """ | |
| def __init__(self): | |
| self.nodes: Dict[str, FnNode] = {} | |
| self.rev: Dict[Tuple[str, str], Tuple[str, str]] = {} | |
| self.fwd: Dict[str, List[str]] = {} | |
| def add_node(self, node: FnNode) -> None: | |
| self.nodes[node.name] = node | |
| self.fwd.setdefault(node.name, []) | |
| def connect(self, src_node: str, src_port: str, dst_node: str, dst_port: str) -> None: | |
| self.rev[(dst_node, dst_port)] = (src_node, src_port) | |
| self.fwd.setdefault(src_node, []) | |
| if dst_node not in self.fwd[src_node]: | |
| self.fwd[src_node].append(dst_node) | |
| def parents_of(self, node_name: str) -> List[str]: | |
| parents: List[str] = [] | |
| for port in self.nodes[node_name].input_ports: | |
| key = (node_name, port) | |
| if key in self.rev: | |
| src, _ = self.rev[key] | |
| parents.append(src) | |
| return list(dict.fromkeys(parents)) | |
| def children_of(self, node_name: str) -> List[str]: | |
| return list(self.fwd.get(node_name, [])) | |
| def deps_of(self, node_name: str) -> Set[str]: | |
| """All upstream nodes that node_name depends on (graph structure).""" | |
| seen: Set[str] = set() | |
| stack = [node_name] | |
| while stack: | |
| n = stack.pop() | |
| for port in self.nodes[n].input_ports: | |
| key = (n, port) | |
| if key in self.rev: | |
| src, _ = self.rev[key] | |
| if src not in seen: | |
| seen.add(src) | |
| stack.append(src) | |
| return seen | |
| def downstream_of(self, node_name: str) -> Set[str]: | |
| """All downstream nodes reachable from node_name.""" | |
| seen: Set[str] = set() | |
| stack = [node_name] | |
| while stack: | |
| n = stack.pop() | |
| for ch in self.children_of(n): | |
| if ch not in seen: | |
| seen.add(ch) | |
| stack.append(ch) | |
| return seen | |
| def topo_downstream_from(self, node_name: str) -> List[str]: | |
| """Topologically sorted downstream nodes starting from node_name (excluding node_name).""" | |
| nodes = self.downstream_of(node_name) | |
| if not nodes: | |
| return [] | |
| nodes.add(node_name) | |
| indeg = {n: 0 for n in nodes} | |
| for n in nodes: | |
| for ch in self.children_of(n): | |
| if ch in nodes: | |
| indeg[ch] += 1 | |
| q = [n for n in nodes if indeg[n] == 0] | |
| order: List[str] = [] | |
| while q: | |
| cur = q.pop(0) | |
| if cur != node_name: | |
| order.append(cur) | |
| for ch in self.children_of(cur): | |
| if ch in nodes: | |
| indeg[ch] -= 1 | |
| if indeg[ch] == 0: | |
| q.append(ch) | |
| return order | |
| def topo_upstream_to(self, node_name: str) -> List[str]: | |
| """ | |
| Topological order of deps(node_name) plus node_name, upstream -> downstream. | |
| """ | |
| nodes = self.deps_of(node_name) | {node_name} | |
| indeg = {n: 0 for n in nodes} | |
| for n in nodes: | |
| for ch in self.children_of(n): | |
| if ch in nodes: | |
| indeg[ch] += 1 | |
| q = [n for n in nodes if indeg[n] == 0] | |
| order: List[str] = [] | |
| while q: | |
| cur = q.pop(0) | |
| order.append(cur) | |
| for ch in self.children_of(cur): | |
| if ch in nodes: | |
| indeg[ch] -= 1 | |
| if indeg[ch] == 0: | |
| q.append(ch) | |
| return order | |
| # ============================================================================= | |
| # Versioned UI controls (Daggr-like) | |
| # ============================================================================= | |
| class Controls: | |
| """ | |
| Versioned controls: | |
| - values[name]: current UI string (not necessarily committed) | |
| - history[name]: list of committed values | |
| - selected[name]: index into history | |
| """ | |
| def __init__(self, names: List[str], reuse_identical_values: bool = False): | |
| self.names = names | |
| self.reuse_identical_values = reuse_identical_values | |
| self.reset() | |
| def reset(self) -> None: | |
| self.values: Dict[str, Any] = {n: "" for n in self.names} | |
| self.history: Dict[str, List[Any]] = {n: [] for n in self.names} | |
| self.selected: Dict[str, int] = {n: -1 for n in self.names} | |
| def set_ui_value(self, name: str, value: Any) -> None: | |
| self.values[name] = value | |
| cur_idx = self.selected.get(name, -1) | |
| if 0 <= cur_idx < len(self.history[name]) and values_equal(self.history[name][cur_idx], value): | |
| return | |
| if self.reuse_identical_values: | |
| for i, v in enumerate(self.history[name]): | |
| if values_equal(v, value): | |
| self.selected[name] = i | |
| return | |
| self.history[name].append(value) | |
| self.selected[name] = len(self.history[name]) - 1 | |
| def set_selected(self, name: str, idx: int) -> None: | |
| if idx < 0 or idx >= len(self.history[name]): | |
| raise ValueError(f"bad control idx for {name}: {idx}") | |
| self.selected[name] = idx | |
| self.values[name] = self.history[name][idx] | |
| def get_selected_value(self, name: str) -> Any: | |
| idx = self.selected.get(name, -1) | |
| if idx < 0: | |
| return None | |
| return self.history[name][idx] | |
| # ============================================================================= | |
| # Frontend-like state engine | |
| # ============================================================================= | |
| class FrontendState: | |
| """ | |
| Central state machine of the app. | |
| Responsibilities: | |
| - Store snapshot histories per node | |
| - Track selected snapshots | |
| - Execute nodes (run semantics) | |
| - Resolve inputs from parents and controls | |
| - Determine wire colors (fresh / stale / error / blocked) | |
| - Implement restore strategies | |
| This class intentionally mixes "engine" and "UI-facing semantics": | |
| the goal is debuggability, not strict layering. | |
| """ | |
| def __init__(self, g: Graph, controls: Controls, control_ports: Dict[str, Tuple[str, str]]): | |
| self.g = g | |
| self.controls = controls | |
| self.control_ports = control_ports | |
| self.last_run_meta: Dict[str, str] = {} | |
| self.reset() | |
| # ------------------------- | |
| # Reset / lifecycle helpers | |
| # ------------------------- | |
| def reset(self) -> None: | |
| self.selected_index: Dict[str, int] = {} | |
| self.history: Dict[str, List[Snapshot]] = {n: [] for n in self.g.nodes} | |
| self.last_run_note: Optional[str] = None | |
| self.last_run_meta = {} | |
| self.last_run_failed: bool = False | |
| self.last_run_error: Optional[dict] = None | |
| self.run_counter: int = 0 | |
| self.active_run_id: Optional[int] = None | |
| self.active_run_step: int = 0 | |
| self.last_run_executed: Set[str] = set() | |
| self.last_run_blocked_nodes: Set[str] = set() | |
| def clear_transient_run_state(self) -> None: | |
| """ | |
| Clears transient UI state (used by restore/nav/input changes). | |
| This prevents "phantom red" and stale banners after changing context. | |
| """ | |
| self.last_run_note = None | |
| self.last_run_meta.clear() | |
| self.last_run_failed = False | |
| self.last_run_error = None | |
| self.last_run_executed = set() | |
| self.last_run_blocked_nodes = set() | |
| def start_run(self) -> int: | |
| """ | |
| Begin a run transaction: | |
| - increments run_counter | |
| - allocates active_run_id | |
| - resets active_run_step | |
| """ | |
| self.run_counter += 1 | |
| self.active_run_id = self.run_counter | |
| self.active_run_step = 0 | |
| return self.active_run_id | |
| def end_run(self) -> None: | |
| self.active_run_id = None | |
| self.active_run_step = 0 | |
| # ------------------------- | |
| # Input resolution | |
| # ------------------------- | |
| def never_ran(self, node: str) -> bool: | |
| return len(self.history.get(node, [])) == 0 | |
| def _resolve_inputs_for_node(self, node: str) -> Tuple[Dict[str, Any], Dict[str, int]]: | |
| """ | |
| Build inputs dict for node: | |
| - for wired ports: read selected snapshot output from parent | |
| - for control ports: read selected control version | |
| Also returns upstream_selected fingerprints. | |
| """ | |
| inputs: Dict[str, Any] = {} | |
| upstream_selected: Dict[str, int] = {} | |
| for port in self.g.nodes[node].input_ports: | |
| key = (node, port) | |
| if key in self.g.rev: | |
| src_node, src_port = self.g.rev[key] | |
| idx = self.selected_index.get(src_node, -1) | |
| upstream_selected[src_node] = idx | |
| if idx < 0: | |
| raise RuntimeError(f"Upstream '{src_node}' has no selected snapshot for {node}.{port}") | |
| snap = self.history[src_node][idx] | |
| if not snap.outputs: | |
| raise RuntimeError(f"Upstream '{src_node}' selected snapshot has no outputs for {node}.{port}") | |
| inputs[port] = snap.outputs[src_port] | |
| continue | |
| # Find matching control for this (node, port) | |
| control_name = None | |
| for cname, (n, p) in self.control_ports.items(): | |
| if n == node and p == port: | |
| control_name = cname | |
| break | |
| if control_name is None: | |
| raise RuntimeError(f"Unwired port {node}.{port} has no control mapping.") | |
| inputs[port] = self.controls.get_selected_value(control_name) | |
| return inputs, upstream_selected | |
| def _controls_in_scope(self, node: str) -> Set[str]: | |
| """ | |
| Controls considered "in-scope" for snapshot freshness: | |
| all controls feeding the dependency subgraph (deps(node) ∪ {node}). | |
| """ | |
| deps = self.g.deps_of(node) | {node} | |
| used = set() | |
| for cname, (n, _p) in self.control_ports.items(): | |
| if n in deps: | |
| used.add(cname) | |
| return used | |
| def _ensure_required_controls_committed(self, node: str) -> None: | |
| """ | |
| Controls that directly feed this node must have a selected version. | |
| (UI: blur/apply/Run commits current textbox values.) | |
| """ | |
| for cname, (n, _p) in self.control_ports.items(): | |
| if n == node and self.controls.selected.get(cname, -1) < 0: | |
| raise RuntimeError( | |
| f"Control '{cname}' has no selected value yet " | |
| f"(edit textbox + blur, or click Apply)." | |
| ) | |
| # ------------------------- | |
| # Running nodes | |
| # ------------------------- | |
| def run_one( | |
| self, | |
| node: str, | |
| on_error: str = "no_snapshot", | |
| run_phase: Optional[str] = None, | |
| ) -> None: | |
| inputs, upstream_selected = self._resolve_inputs_for_node(node) | |
| node_spec = self.g.nodes[node] | |
| out_ports = node_spec.output_ports | |
| status = "ok" | |
| error: Optional[str] = None | |
| outputs: Optional[Dict[str, Any]] = None | |
| try: | |
| raw = node_spec.fn(**inputs) | |
| # ------------------------------------------------- | |
| # OUTPUT RESOLUTION (single vs multi-output) | |
| # ------------------------------------------------- | |
| # Case 1: function returned multiple values | |
| if isinstance(raw, (tuple, list)): | |
| if DISABLE_MULTI_OUTPUT: | |
| raise RuntimeError( | |
| f"Node '{node}' returned {len(raw)} values but multi-output is disabled " | |
| f"(DISABLE_MULTI_OUTPUT=True)." | |
| ) | |
| # Fallback: implicit out1/out2/... if only 'out' declared | |
| if out_ports == ["out"]: | |
| outputs = {f"out{i+1}": raw[i] for i in range(len(raw))} | |
| else: | |
| if len(raw) != len(out_ports): | |
| raise RuntimeError( | |
| f"{node} returned {len(raw)} values but " | |
| f"{len(out_ports)} outputs are declared: {out_ports}" | |
| ) | |
| outputs = { | |
| out_ports[i]: raw[i] | |
| for i in range(len(out_ports)) | |
| } | |
| # Case 2: single value returned | |
| else: | |
| if len(out_ports) != 1: | |
| raise RuntimeError( | |
| f"{node} returned a single value but declares outputs {out_ports}" | |
| ) | |
| outputs = {out_ports[0]: raw} | |
| except Exception as e: | |
| status = "error" | |
| error = repr(e) | |
| outputs = None | |
| # ------------------------- | |
| # SNAPSHOT METADATA | |
| # ------------------------- | |
| ctrl_scope = self._controls_in_scope(node) | |
| controls_selected = {c: self.controls.selected[c] for c in ctrl_scope} | |
| step = self.active_run_step | |
| self.active_run_step += 1 | |
| sig = stable_hash( | |
| { | |
| "status": status, | |
| "error": error, | |
| "inputs": inputs, | |
| "up": upstream_selected, | |
| "outputs": outputs, | |
| "ctrl": controls_selected, | |
| "run_id": self.active_run_id, | |
| "run_phase": run_phase, | |
| "run_step": step, | |
| } | |
| ) | |
| snap = Snapshot( | |
| inputs=inputs, | |
| upstream_selected=upstream_selected, | |
| outputs=outputs, | |
| sig=sig, | |
| controls_selected=controls_selected, | |
| status=status, | |
| error=error, | |
| run_id=self.active_run_id, | |
| run_phase=run_phase, | |
| run_step=step, | |
| ) | |
| # ------------------------- | |
| # COMMIT SNAPSHOT | |
| # ------------------------- | |
| if status == "ok": | |
| self.history[node].append(snap) | |
| self.selected_index[node] = len(self.history[node]) - 1 | |
| return | |
| if on_error == "snapshot": | |
| self.history[node].append(snap) | |
| self.selected_index[node] = len(self.history[node]) - 1 | |
| return | |
| raise RuntimeError(f"{node} failed: {error}") | |
| def run_with_upstream(self, node: str, on_error: str = "no_snapshot") -> None: | |
| """ | |
| Run missing upstream nodes recursively, then run the node. | |
| """ | |
| self._ensure_required_controls_committed(node) | |
| for p in self.g.parents_of(node): | |
| if self.selected_index.get(p, -1) < 0: | |
| self.run_with_upstream(p, on_error=on_error) | |
| self.run_one(node, on_error=on_error, run_phase="upstream(auto)") | |
| def run_force_upstream(self, node: str, topo: bool = True, on_error: str = "no_snapshot") -> None: | |
| """ | |
| Force recompute deps(node) + node, optionally in topological order. | |
| """ | |
| deps_plus = list(self.g.deps_of(node) | {node}) | |
| order = self.g.topo_upstream_to(node) if topo else deps_plus | |
| for n in order: | |
| self._ensure_required_controls_committed(n) | |
| # Safety: if parents are missing, auto-create them | |
| for p in self.g.parents_of(n): | |
| if self.selected_index.get(p, -1) < 0: | |
| self.run_with_upstream(p, on_error=on_error) | |
| self.run_one(n, on_error=on_error, run_phase="upstream(force)") | |
| def run_force_downstream_from_selected(self, root: str, on_error: str = "no_snapshot") -> None: | |
| """ | |
| Force recompute downstream nodes starting at root. | |
| Root must have a selected snapshot. | |
| """ | |
| if self.selected_index.get(root, -1) < 0: | |
| raise RuntimeError(f"Cannot force-downstream: root '{root}' has no selected snapshot") | |
| order = self.g.topo_downstream_from(root) | |
| for n in order: | |
| self._ensure_required_controls_committed(n) | |
| for p in self.g.parents_of(n): | |
| if self.selected_index.get(p, -1) < 0: | |
| self.run_with_upstream(p, on_error=on_error) | |
| self.run_one(n, on_error=on_error, run_phase="downstream(force)") | |
| # ------------------------- | |
| # Alignment + freshness | |
| # ------------------------- | |
| def aligned_edge(self, parent: str, node: str) -> bool: | |
| idx_n = self.selected_index.get(node, -1) | |
| if idx_n < 0 or idx_n >= len(self.history[node]): | |
| return False | |
| snap = self.history[node][idx_n] | |
| return snap.upstream_selected.get(parent) == self.selected_index.get(parent) | |
| def globally_fresh(self, node: str) -> bool: | |
| """ | |
| Fresh if the selected snapshot for node references the same selected controls | |
| (in its recorded scope) as current controls. | |
| """ | |
| idx = self.selected_index.get(node, -1) | |
| if idx < 0 or idx >= len(self.history[node]): | |
| return False | |
| snap = self.history[node][idx] | |
| for c, used_idx in snap.controls_selected.items(): | |
| if self.controls.selected.get(c) != used_idx: | |
| return False | |
| return True | |
| def would_run_be_noop(self, node: str) -> bool: | |
| """ | |
| True si re-run du node produirait exactement le même contexte que le snapshot sélectionné: | |
| - mêmes inputs résolus | |
| - mêmes upstream_selected | |
| - mêmes controls_selected (scope) | |
| et snapshot sélectionné ok. | |
| """ | |
| idx = self.selected_index.get(node, -1) | |
| if idx < 0 or idx >= len(self.history.get(node, [])): | |
| return False | |
| snap = self.history[node][idx] | |
| if snap.status != "ok": | |
| return False | |
| # Contexte actuel | |
| inputs, upstream_selected = self._resolve_inputs_for_node(node) | |
| ctrl_scope = self._controls_in_scope(node) | |
| controls_selected = {c: self.controls.selected.get(c, -1) for c in ctrl_scope} | |
| # Comparaison (values_equal gère audio/np.array/dicts) | |
| return ( | |
| values_equal(snap.inputs, inputs) | |
| and snap.upstream_selected == upstream_selected | |
| and snap.controls_selected == controls_selected | |
| ) | |
| # ------------------------- | |
| # Error / blocked UI semantics | |
| # ------------------------- | |
| def selected_snapshot(self, node: str) -> Optional[Snapshot]: | |
| idx = self.selected_index.get(node, -1) | |
| if idx < 0: | |
| return None | |
| hist = self.history.get(node, []) | |
| if idx >= len(hist): | |
| return None | |
| return hist[idx] | |
| def is_error_selected(self, node: str) -> bool: | |
| snap = self.selected_snapshot(node) | |
| return bool(snap and snap.status == "error") | |
| def selected_snapshot_status(self, node: str) -> Optional[str]: | |
| idx = self.selected_index.get(node, -1) | |
| if idx < 0 or idx >= len(self.history.get(node, [])): | |
| return None | |
| return self.history[node][idx].status | |
| def last_run_error_relevant(self, node: str) -> bool: | |
| """ | |
| True if: | |
| - last run failed | |
| - failure node == node | |
| - and the failure fingerprint still matches current context (parents + controls-in-scope) | |
| This prevents phantom red after restore/nav/input changes. | |
| """ | |
| err = getattr(self, "last_run_error", None) | |
| if not err or not getattr(self, "last_run_failed", False): | |
| return False | |
| if err.get("node") != node: | |
| return False | |
| parents_sel = err.get("parents_sel", None) | |
| ctrl_sel = err.get("ctrl_sel", None) | |
| # If fingerprint missing (older runs), fall back to old behavior | |
| if parents_sel is None or ctrl_sel is None: | |
| return True | |
| # 1) parents must still match | |
| for p, want in parents_sel.items(): | |
| if self.selected_index.get(p, -1) != want: | |
| return False | |
| # 2) controls-in-scope must still match | |
| for c, want in ctrl_sel.items(): | |
| if self.controls.selected.get(c, -1) != want: | |
| return False | |
| return True | |
| def edge_is_blocked_ui(self, parent: str, child: str) -> bool: | |
| if not getattr(self, "last_run_failed", False): | |
| return False | |
| err = getattr(self, "last_run_error", None) | |
| if not err: | |
| return False | |
| # node-level blocked: any incoming edge to a blocked node is purple, | |
| # but only if that node did NOT execute in the last run. | |
| return ( | |
| child in getattr(self, "last_run_blocked_nodes", set()) | |
| and child not in getattr(self, "last_run_executed", set()) | |
| ) | |
| def node_is_in_error_ui(self, node: str) -> bool: | |
| """ | |
| A node is "in error UI" if: | |
| - it is not blocked by a parent failure (purple dominates for blocked children) | |
| - and either: | |
| - selected snapshot is error, OR | |
| - last run failed on this node and the error fingerprint still matches. | |
| """ | |
| # if blocked => don't show as error | |
| for p in self.g.parents_of(node): | |
| if self.edge_is_blocked_ui(p, node): | |
| return False | |
| if self.is_error_selected(node): | |
| return True | |
| return self.last_run_error_relevant(node) | |
| # ------------------------- | |
| # Wire coloring | |
| # ------------------------- | |
| def control_wire_color(self, control_name: str) -> str: | |
| """ | |
| Control wire (Input[x] -> target node port): | |
| - blue : target node never ran yet OR no selected snapshot | |
| - red : target node in error UI | |
| - gray : target snapshot doesn't reference this control (rare) | |
| - orange : this control changed since snapshot | |
| - green : this control matches snapshot | |
| """ | |
| if control_name not in self.control_ports: | |
| return "gray" | |
| node, _port = self.control_ports[control_name] | |
| if self.never_ran(node): | |
| return "blue" | |
| idx_n = self.selected_index.get(node, -1) | |
| if idx_n < 0 or idx_n >= len(self.history[node]): | |
| return "blue" | |
| snap = self.history[node][idx_n] | |
| if self.node_is_in_error_ui(node): | |
| return "red" | |
| used = snap.controls_selected.get(control_name, None) | |
| if used is None: | |
| return "gray" | |
| cur = self.controls.selected.get(control_name, -1) | |
| if cur != used: | |
| return "orange" | |
| return "green" | |
| def wire_color(self, parent: str, node: str) -> str: | |
| """ | |
| Graph wire (parent -> child): | |
| Priority: | |
| 0) purple blocked (last run) | |
| 1) blue never ran (child) | |
| 2) red child in error UI | |
| 3) gray misaligned structure | |
| 4) orange stale (parent not globally fresh) | |
| 5) green | |
| """ | |
| if self.edge_is_blocked_ui(parent, node): | |
| return "purple" | |
| if self.never_ran(node): | |
| return "blue" | |
| if self.node_is_in_error_ui(node): | |
| return "red" | |
| if not self.aligned_edge(parent, node): | |
| return "gray" | |
| if not self.globally_fresh(parent): | |
| return "orange" | |
| return "green" | |
| # ------------------------- | |
| # Restore (simple) + Restore (context solver) | |
| # ------------------------- | |
| def restore(self, node: str, idx: int, mode: str) -> None: | |
| if idx < 0 or idx >= len(self.history[node]): | |
| raise ValueError("bad idx") | |
| snap = self.history[node][idx] | |
| if mode == "node_only": | |
| self.selected_index[node] = idx | |
| return | |
| self.selected_index[node] = idx | |
| for c, used_idx in snap.controls_selected.items(): | |
| self.controls.set_selected(c, used_idx) | |
| if mode == "node+controls": | |
| return | |
| if mode in ("node+controls+upstream", "node+controls+upstream+downstream_match"): | |
| self._restore_upstream_recursive(node) | |
| if mode == "node+controls+upstream": | |
| return | |
| if mode == "node+controls+upstream+downstream_match": | |
| self._automatch_downstream_from(node) | |
| return | |
| raise ValueError(f"Unknown restore mode: {mode}") | |
| def _restore_upstream_recursive(self, node: str) -> None: | |
| idx = self.selected_index.get(node, -1) | |
| if idx < 0: | |
| return | |
| snap = self.history[node][idx] | |
| for parent, used_idx in snap.upstream_selected.items(): | |
| if used_idx is None or used_idx < 0: | |
| continue | |
| if used_idx < len(self.history[parent]): | |
| self.selected_index[parent] = used_idx | |
| self._restore_upstream_recursive(parent) | |
| # ---- Context solver helpers (downstream matching + reconcile) ---- | |
| def restore_with_context(self, node: str, idx: int, mode: str, reconcile: bool = False) -> RestoreContext: | |
| ctx = RestoreContext(root=node, mode=mode) | |
| ctx.reconciled.clear() | |
| ctx.locked.add(node) | |
| ctx.sel[node] = idx | |
| snap = self.history[node][idx] | |
| # restore controls if requested | |
| if "controls" in mode: | |
| for c, used in snap.controls_selected.items(): | |
| ctx.ctrl_sel[c] = used | |
| # restore upstream recursively (context) | |
| if "upstream" in mode: | |
| self._restore_upstream_recursive_ctx(node, ctx) | |
| # downstream match (context) | |
| if "downstream_match" in mode: | |
| self._automatch_downstream_ctx(ctx, reconcile=reconcile) | |
| return ctx | |
| def commit_restore_context(self, ctx: RestoreContext) -> None: | |
| # apply node selection | |
| for node, idx in ctx.sel.items(): | |
| self.selected_index[node] = idx | |
| # apply control selection | |
| for c, idx in ctx.ctrl_sel.items(): | |
| self.controls.set_selected(c, idx) | |
| def _restore_upstream_recursive_ctx(self, node: str, ctx: RestoreContext) -> None: | |
| idx = ctx.get_idx(node, self.selected_index) | |
| if idx < 0 or idx >= len(self.history.get(node, [])): | |
| return | |
| snap = self.history[node][idx] | |
| for parent, used_idx in snap.upstream_selected.items(): | |
| if used_idx is None or used_idx < 0: | |
| continue | |
| if used_idx >= len(self.history.get(parent, [])): | |
| continue | |
| if parent in ctx.locked: | |
| continue | |
| prev = ctx.sel.get(parent, None) | |
| if prev is None: | |
| ctx.sel[parent] = used_idx | |
| elif prev != used_idx: | |
| ctx.log.append( | |
| f"upstream CONFLICT {parent}: keep {prev}, ignore {used_idx} (from {node}#{idx})" | |
| ) | |
| continue | |
| # propagate controls conservatively | |
| if "controls" in ctx.mode: | |
| psnap = self.history[parent][ctx.sel[parent]] | |
| for c, want in psnap.controls_selected.items(): | |
| if c in ctx.ctrl_locked: | |
| continue | |
| ctx.ctrl_sel.setdefault(c, want) | |
| self._restore_upstream_recursive_ctx(parent, ctx) | |
| def _controls_scope_for_node(self, node: str) -> Set[str]: | |
| return self._controls_in_scope(node) | |
| def _is_snapshot_compatible(self, node: str, snap: Snapshot) -> bool: | |
| # parents must match current selected snapshots | |
| for p in self.g.parents_of(node): | |
| cur_p = self.selected_index.get(p, -1) | |
| if cur_p < 0: | |
| return False | |
| if snap.upstream_selected.get(p) != cur_p: | |
| return False | |
| # controls-in-scope must match current selected controls | |
| scope = self._controls_scope_for_node(node) | |
| for c in scope: | |
| cur = self.controls.selected.get(c, -1) | |
| if cur < 0: | |
| return False | |
| used = snap.controls_selected.get(c, None) | |
| if used is None: | |
| return False | |
| if used != cur: | |
| return False | |
| return True | |
| def _is_snapshot_compatible_ctx(self, node: str, snap: Snapshot, ctx: RestoreContext) -> bool: | |
| # parents must match via ctx | |
| for p in self.g.parents_of(node): | |
| want = snap.upstream_selected.get(p) | |
| have = ctx.get_idx(p, self.selected_index) | |
| if have < 0 or want != have: | |
| return False | |
| # controls must match for ALL controls in scope | |
| scope = self._controls_scope_for_node(node) | |
| for c in scope: | |
| want = snap.controls_selected.get(c, None) | |
| if want is None: | |
| return False | |
| have = ctx.ctrl_sel.get(c, self.controls.selected.get(c, -1)) | |
| if want != have: | |
| return False | |
| return True | |
| def _find_best_compatible_snapshot_index(self, node: str) -> Optional[int]: | |
| hist = self.history.get(node, []) | |
| for i in range(len(hist) - 1, -1, -1): | |
| if self._is_snapshot_compatible(node, hist[i]): | |
| return i | |
| return None | |
| def _automatch_downstream_from(self, root: str) -> None: | |
| order = self.g.topo_downstream_from(root) | |
| max_passes = max(1, len(order) + 1) | |
| for _ in range(max_passes): | |
| changed = False | |
| for n in order: | |
| best = self._find_best_compatible_snapshot_index(n) | |
| cur = self.selected_index.get(n, -1) | |
| if best is not None and best != cur: | |
| self.selected_index[n] = best | |
| changed = True | |
| if not changed: | |
| break | |
| # --- reconcile helpers (Option 1, inline) --- | |
| def _snapshot_respects_locked_parents_ctx(self, child: str, snap: Snapshot, ctx: RestoreContext) -> bool: | |
| for p in self.g.parents_of(child): | |
| want = snap.upstream_selected.get(p, -1) | |
| have = ctx.get_idx(p, self.selected_index) | |
| if p in ctx.locked or p in ctx.touched: | |
| if want != have: | |
| return False | |
| return True | |
| def _try_reconcile_child_ctx(self, child: str, ctx: RestoreContext) -> bool: | |
| """ | |
| Inline reconcile: | |
| - choose an existing child snapshot that matches locked parents | |
| - move other direct parents to indices required by that snapshot | |
| - select that child snapshot | |
| """ | |
| hist_c = self.history.get(child, []) | |
| if not hist_c: | |
| return False | |
| for cidx in range(len(hist_c) - 1, -1, -1): | |
| snap_c = hist_c[cidx] | |
| if not self._snapshot_respects_locked_parents_ctx(child, snap_c, ctx): | |
| continue | |
| ok = True | |
| moved_any = False | |
| for p in self.g.parents_of(child): | |
| want = snap_c.upstream_selected.get(p, -1) | |
| have = ctx.get_idx(p, self.selected_index) | |
| if want == have: | |
| continue | |
| if p in ctx.locked or p in ctx.touched: | |
| ok = False | |
| break | |
| if want < 0 or want >= len(self.history.get(p, [])): | |
| ok = False | |
| break | |
| if not self._is_snapshot_compatible_ctx(p, self.history[p][want], ctx): | |
| ok = False | |
| break | |
| if not ctx.set_idx(p, want, f"reconcile parent for {child}"): | |
| ok = False | |
| break | |
| moved_any = True | |
| ctx.reconciled.add(p) # freeze across passes | |
| if not ok: | |
| continue | |
| cur_child = ctx.get_idx(child, self.selected_index) | |
| if cur_child != cidx: | |
| if ctx.set_idx(child, cidx, "child snapshot (reconciled)"): | |
| for c, want in snap_c.controls_selected.items(): | |
| if c in ctx.ctrl_locked: | |
| continue | |
| ctx.ctrl_sel[c] = want | |
| ctx.reconciled.add(child) | |
| return True | |
| else: | |
| if moved_any: | |
| for c, want in snap_c.controls_selected.items(): | |
| if c in ctx.ctrl_locked: | |
| continue | |
| ctx.ctrl_sel[c] = want | |
| ctx.reconciled.add(child) | |
| return True | |
| return False | |
| def _automatch_downstream_ctx(self, ctx: RestoreContext, reconcile: bool = False) -> None: | |
| order = self.g.topo_downstream_from(ctx.root) | |
| if not order: | |
| return | |
| max_passes = max(1, len(order) * (3 if reconcile else 1)) | |
| for _ in range(max_passes): | |
| ctx.touched.clear() # per-pass | |
| changed = False | |
| for n in order: | |
| if n in ctx.reconciled: | |
| continue | |
| hist = self.history.get(n, []) | |
| best = None | |
| for i in range(len(hist) - 1, -1, -1): | |
| if self._is_snapshot_compatible_ctx(n, hist[i], ctx): | |
| best = i | |
| break | |
| cur = ctx.get_idx(n, self.selected_index) | |
| if best is not None: | |
| if cur != best: | |
| changed |= ctx.set_idx(n, best, "downstream match") | |
| continue | |
| if reconcile and len(self.g.parents_of(n)) >= 2: | |
| if self._try_reconcile_child_ctx(n, ctx): | |
| changed = True | |
| continue | |
| if not changed: | |
| break | |
| # ------------------------- | |
| # UI inspect helpers | |
| # ------------------------- | |
| def input_port_colors(self, node: str) -> List[Tuple[str, str, str]]: | |
| res: List[Tuple[str, str, str]] = [] | |
| for port in self.g.nodes[node].input_ports: | |
| key = (node, port) | |
| if key in self.g.rev: | |
| src_node, _src_port = self.g.rev[key] | |
| color = self.wire_color(src_node, node) | |
| res.append((port, src_node, color)) | |
| continue | |
| control_name = None | |
| for cname, (n, p) in self.control_ports.items(): | |
| if n == node and p == port: | |
| control_name = cname | |
| break | |
| if control_name is None: | |
| res.append((port, "unwired", "gray")) | |
| else: | |
| color = self.control_wire_color(control_name) | |
| res.append((port, f"Input[{control_name}]", color)) | |
| return res | |
| def output_edge_colors(self, node: str) -> List[Tuple[str, str, str]]: | |
| res: List[Tuple[str, str, str]] = [] | |
| for (dst_node, dst_port), (src_node, src_port) in self.g.rev.items(): | |
| if src_node != node: | |
| continue | |
| color = self.wire_color(node, dst_node) | |
| res.append((src_port, f"{dst_node}.{dst_port}", color)) | |
| res.sort(key=lambda t: (t[0], t[1])) | |
| return res | |
| def history_list(self, node: str) -> List[str]: | |
| hist = self.history.get(node, []) | |
| if not hist: | |
| return ["(empty)"] | |
| out: List[str] = [] | |
| for i, s in enumerate(hist): | |
| prefix = "❌" if s.status == "error" else "✓" | |
| rid = f"run={s.run_id}:{s.run_step}" if s.run_id is not None else "run=—" | |
| ph = f"phase={s.run_phase}" if s.run_phase else "phase=—" | |
| st = f"status={s.status}" | |
| out.append( | |
| f"{i} | {prefix} | {rid} | {ph} | {st} | sig={s.sig} | up={s.upstream_selected} | ctrl={s.controls_selected}" | |
| ) | |
| return out | |
| def current_output_text(self, node: str) -> str: | |
| idx = self.selected_index.get(node, -1) | |
| if idx < 0 or idx >= len(self.history[node]): | |
| return "" | |
| snap = self.history[node][idx] | |
| if snap.status == "error": | |
| return f"❌ ERROR: {snap.error}" | |
| if not snap.outputs: | |
| return "" | |
| return json.dumps(snap.outputs, ensure_ascii=False) | |
| def debug_state(self) -> str: | |
| obj = { | |
| "selected_index": self.selected_index, | |
| "controls_selected": self.controls.selected, | |
| "controls_values": self.controls.values, | |
| "reuse_identical_values": getattr(self.controls, "reuse_identical_values", False), | |
| "controls_history_sizes": {k: len(v) for k, v in self.controls.history.items()}, | |
| "history_sizes": {k: len(v) for k, v in self.history.items()}, | |
| } | |
| return json.dumps(obj, ensure_ascii=False, indent=2, default=str) | |
| # ============================================================================= | |
| # PIPELINE DSL | |
| # Declarative pipeline, ports, wiring (no strings) | |
| # ============================================================================= | |
| def _slug(s: str) -> str: | |
| s = s.strip().lower() | |
| s = re.sub(r"[^a-z0-9]+", "_", s) | |
| s = re.sub(r"_+", "_", s).strip("_") | |
| return s or "item" | |
| def _uniq(base: str, used: set[str]) -> str: | |
| if base not in used: | |
| used.add(base) | |
| return base | |
| i = 2 | |
| while f"{base}_{i}" in used: | |
| i += 1 | |
| out = f"{base}_{i}" | |
| used.add(out) | |
| return out | |
| class _InputSpec: | |
| name: str # internal stable id (short) | |
| label: str # human label | |
| default: Any = "" | |
| component: Any = gr.Textbox | |
| component_kwargs: Dict[str, Any] = field(default_factory=dict) | |
| badge_position: str = "below" | |
| feeds: Optional[Tuple[str, str]] = None # (node_name, port) | |
| class _NodeSpec: | |
| name: str # internal stable id (short) | |
| label: str # human label | |
| fn: Callable[..., Any] | |
| inputs: List[str] = field(default_factory=list) | |
| outputs: List[str] = field(default_factory=lambda: ["out"]) | |
| outputs_open: bool = False # outputs_open=True means outputs may be dynamically extended via tuple fallback (out1, out2, ...) | |
| wires: Dict[str, Tuple[str, str]] = field(default_factory=dict) # child_port -> (parent_node, parent_port) | |
| out_component: Any = gr.Textbox | |
| out_component_kwargs: Dict[str, Any] = field(default_factory=dict) | |
| # ---- Public DSL objects ---- | |
| # InputRef, NodeRef, PortRef | |
| class InputRef: | |
| def __init__(self, p: "Pipeline", key: str): | |
| self._p = p | |
| self.key = key | |
| def out(self) -> PortRef: | |
| return PortRef(self._p, kind="control", owner_key=self.key, port="out") | |
| def __rshift__(self, other: PortRef): | |
| self._p.connect(self.out, other) | |
| return other | |
| def feeds(self, node: "NodeRef", *, as_: str): | |
| self._p.connect( | |
| PortRef(self._p, "control", self.key, "out"), | |
| PortRef(self._p, "node_in", node.key, as_), | |
| ) | |
| return node | |
| class NodeRef: | |
| def __init__(self, p: "Pipeline", key: str): | |
| self._p = p | |
| self.key = key | |
| self.i = _InPorts(p, key) # ✅ inputs namespace | |
| self.o = _OutPorts(p, key) # ✅ outputs namespace | |
| def out(self) -> PortRef: | |
| # alias pratique: A.out == A.o.out | |
| return self.o.out | |
| def __getattr__(self, name: str) -> PortRef: | |
| if name.startswith("_"): | |
| raise AttributeError(name) | |
| # ✅ permettre A.d (input direct) == A.i.d | |
| return getattr(self.i, name) | |
| def produces(self, port: str): | |
| return Produced(self._p, self.key, port) | |
| def __repr__(self) -> str: | |
| return f"NodeRef({self.key})" | |
| # Ports are accessed dynamically: | |
| # D.x -> input port | |
| # D.out -> default output | |
| # D.o.a -> named output (multi-output) | |
| # | |
| # This allows wiring without strings: | |
| # D.out >> A.d | |
| class PortRef: | |
| p: "Pipeline" | |
| kind: str # "control" | "node_out" | "node_in" | |
| owner_key: str # input key or node key | |
| port: str # port name | |
| def __rshift__(self, other: "PortRef"): | |
| self.p.connect(self, other) | |
| return other | |
| class Produced: | |
| def __init__(self, p: "Pipeline", node_key: str, port: str): | |
| self._p = p | |
| self._node_key = node_key | |
| self._port = port | |
| def feeds(self, node: "NodeRef", *, as_: str | None = None): | |
| dst_port = as_ or self._port | |
| self._p.connect( | |
| PortRef(self._p, "node_out", self._node_key, self._port), | |
| PortRef(self._p, "node_in", node.key, dst_port), | |
| ) | |
| return node | |
| # ---- Internal wiring helpers ---- | |
| # _InPorts, _OutPorts | |
| class _InPorts: | |
| def __init__(self, p: "Pipeline", node_key: str): | |
| self._p = p | |
| self._node_key = node_key | |
| def __getattr__(self, name: str) -> PortRef: | |
| if name.startswith("_"): | |
| raise AttributeError(name) | |
| spec = self._p._nodes.get(self._node_key) | |
| if not spec: | |
| raise AttributeError(f"Unknown node '{self._node_key}'") | |
| if name not in spec.inputs: | |
| raise AttributeError( | |
| f"Unknown input port '{name}' on node '{self._node_key}'. " | |
| f"Known inputs: {spec.inputs}" | |
| ) | |
| return PortRef(self._p, kind="node_in", owner_key=self._node_key, port=name) | |
| class _OutPorts: | |
| def __init__(self, p: "Pipeline", node_key: str): | |
| self._p = p | |
| self._node_key = node_key | |
| def __getattr__(self, name: str) -> PortRef: | |
| if name.startswith("_"): | |
| raise AttributeError(name) | |
| spec = self._p._nodes.get(self._node_key) | |
| if not spec: | |
| raise AttributeError(f"Unknown node '{self._node_key}'") | |
| if name not in spec.outputs: | |
| if getattr(spec, "outputs_open", False) and re.fullmatch(r"out[1-9]\d*", name): | |
| # retire le placeholder implicite | |
| if spec.outputs == ["out"]: | |
| spec.outputs.clear() | |
| spec.outputs.append(name) | |
| spec.outputs.sort(key=lambda s: int(s[3:])) | |
| else: | |
| raise AttributeError( | |
| f"Unknown output port '{name}' on node '{self._node_key}'. " | |
| f"Known outputs: {spec.outputs}" | |
| ) | |
| return PortRef(self._p, kind="node_out", owner_key=self._node_key, port=name) | |
| # ---- Pipeline builder ---- | |
| # Pipeline | |
| class Pipeline: | |
| """ | |
| Declarative DSL used to describe: | |
| - Inputs (controls) | |
| - Nodes (functions) | |
| - Ports (inputs / outputs) | |
| - Wiring (using >> operators) | |
| - UI layout (columns) | |
| Design principles: | |
| - Human-readable labels | |
| - Stable internal identifiers | |
| - Explicit wiring | |
| - Minimal boilerplate for the developer | |
| """ | |
| def __init__(self): | |
| self._inputs: Dict[str, _InputSpec] = {} | |
| self._nodes: Dict[str, _NodeSpec] = {} | |
| self._layout_cols: List[List[Tuple[str, str]]] = [] # [ [("input", key), ("node", key)], ... ] | |
| self._used_names: set[str] = set() | |
| def _warn(self, msg: str): | |
| print(f"[Pipeline] {msg}") | |
| # ---- declaration API ---- | |
| def input( | |
| self, | |
| label: str, | |
| *, | |
| name: Optional[str] = None, | |
| default: Any = None, | |
| component=gr.Textbox, | |
| component_kwargs: Optional[Dict[str, Any]] = None, | |
| badge_position: str = "below", | |
| ) -> InputRef: | |
| key = name or _slug(label) | |
| key = _uniq(key, self._used_names) | |
| if default is None and component is gr.Textbox: | |
| default = "" | |
| if component in FILE_COMPONENTS and default in ("", "/app"): | |
| default = None | |
| self._inputs[key] = _InputSpec( | |
| name=key, | |
| label=label, | |
| default=default, | |
| component=component, | |
| component_kwargs=dict(component_kwargs or {}), | |
| badge_position=badge_position, | |
| ) | |
| return InputRef(self, key) | |
| def node( | |
| self, | |
| label: str, | |
| *, | |
| fn: Callable[..., Any], | |
| inputs: Optional[list[str]] = None, | |
| outputs: Optional[list[str]] = None, | |
| name: Optional[str] = None, | |
| out_component=gr.Textbox, | |
| out_component_kwargs=None, | |
| ) -> NodeRef: | |
| key = name or _slug(label) | |
| key = _uniq(key, self._used_names) | |
| # ---- inputs ---- | |
| if inputs is None: | |
| sig = inspect.signature(fn) | |
| inferred_inputs = [ | |
| p.name | |
| for p in sig.parameters.values() | |
| if p.kind in ( | |
| inspect.Parameter.POSITIONAL_ONLY, | |
| inspect.Parameter.POSITIONAL_OR_KEYWORD, | |
| ) | |
| ] | |
| inputs = inferred_inputs | |
| self._warn( | |
| f'Node "{label}": inputs inferred from function signature {inputs}. ' | |
| f'Declare inputs=[...] for clarity.' | |
| ) | |
| # ---- outputs ---- | |
| outputs_open = outputs is None | |
| if outputs is None: | |
| outputs = ["out"] # default, refined at runtime if tuple | |
| self._warn( | |
| f'Node "{label}": outputs not declared. ' | |
| f'Will infer from function return at runtime.' | |
| ) | |
| self._nodes[key] = _NodeSpec( | |
| name=key, | |
| label=label, | |
| fn=fn, | |
| inputs=list(inputs), | |
| outputs=list(outputs), | |
| outputs_open=outputs_open, | |
| out_component=out_component, | |
| out_component_kwargs=dict(out_component_kwargs or {}), | |
| ) | |
| return NodeRef(self, key) | |
| def connect(self, src: PortRef, dst: PortRef) -> None: | |
| if dst.kind != "node_in": | |
| raise TypeError(f"Destination must be a node input port, got {dst.kind}") | |
| child_key = dst.owner_key | |
| child_port = dst.port | |
| child_spec = self._nodes.get(child_key) | |
| if not child_spec: | |
| raise ValueError(f"Unknown child node '{child_key}'") | |
| if child_port not in child_spec.inputs: | |
| raise ValueError( | |
| f"Unknown input port '{child_port}' on node '{child_key}'. " | |
| f"Known inputs: {child_spec.inputs}" | |
| ) | |
| # prevent double-source on same dst port (wire) | |
| if child_port in child_spec.wires: | |
| raise ValueError(f"Port {child_key}.{child_port} already wired from a node output") | |
| # prevent double-source on same dst port (control) | |
| for inp in self._inputs.values(): | |
| if inp.feeds == (child_key, child_port): | |
| raise ValueError(f"Port {child_key}.{child_port} already fed by an Input[...] control") | |
| if src.kind == "control": | |
| inp = self._inputs.get(src.owner_key) | |
| if not inp: | |
| raise ValueError(f"Unknown input '{src.owner_key}'") | |
| # optional but recommended: forbid one input feeding multiple ports | |
| if inp.feeds is not None and inp.feeds != (child_key, child_port): | |
| raise ValueError(f"Input[{src.owner_key}] already feeds {inp.feeds}") | |
| inp.feeds = (child_key, child_port) | |
| return | |
| if src.kind == "node_out": | |
| parent_key = src.owner_key | |
| parent_port = src.port | |
| parent_spec = self._nodes.get(parent_key) | |
| if not parent_spec: | |
| raise ValueError(f"Unknown parent node '{parent_key}'") | |
| if parent_port not in parent_spec.outputs: | |
| # ✅ support outputs_open fallback out1/out2... | |
| if getattr(parent_spec, "outputs_open", False) and re.fullmatch(r"out[1-9]\d*", parent_port): | |
| if parent_spec.outputs == ["out"]: | |
| parent_spec.outputs.clear() | |
| parent_spec.outputs.append(parent_port) | |
| parent_spec.outputs.sort(key=lambda s: int(s[3:])) | |
| else: | |
| raise ValueError( | |
| f"Unknown output port '{parent_port}' on node '{parent_key}'. " | |
| f"Known outputs: {parent_spec.outputs}" | |
| ) | |
| child_spec.wires[child_port] = (parent_key, parent_port) | |
| return | |
| raise TypeError(f"Unsupported src kind: {src.kind}") | |
| def columns(self, *cols: List[Any]) -> "Pipeline": | |
| """ | |
| Optional but recommended: | |
| p.columns([audio, lang], [transcribe], [cleanup], [summarize]) | |
| If you never call columns(), layout defaults to: | |
| - all inputs in col0 (in declaration order) | |
| - all nodes in following columns (in declaration order) | |
| """ | |
| out: List[List[Tuple[str, str]]] = [] | |
| for col in cols: | |
| c: List[Tuple[str, str]] = [] | |
| for item in col: | |
| if isinstance(item, InputRef): | |
| c.append(("input", item.key)) | |
| elif isinstance(item, NodeRef): | |
| c.append(("node", item.key)) | |
| else: | |
| raise TypeError(f"columns() expects InputRef/NodeRef, got {type(item)}") | |
| out.append(c) | |
| self._layout_cols = out | |
| return self | |
| # ---- spec generation ---- | |
| def _infer_node_inputs_from_wires(self) -> None: | |
| """ | |
| Fill node.inputs if empty: | |
| - any port fed by an input or a parent wire becomes a declared input port | |
| This keeps DSL minimal: you don't have to list inputs=["audio","language"] manually. | |
| """ | |
| for nk, ns in self._nodes.items(): | |
| if ns.inputs: | |
| continue | |
| ports = set() | |
| # from Input feeds | |
| for inp in self._inputs.values(): | |
| if inp.feeds and inp.feeds[0] == nk: | |
| ports.add(inp.feeds[1]) | |
| # from parent wires into this node | |
| for child_port in ns.wires.keys(): | |
| ports.add(child_port) | |
| ns.inputs = sorted(ports) | |
| def _default_layout(self) -> List[List[Tuple[str, str]]]: | |
| # col0: inputs, col1..: nodes | |
| col0 = [("input", k) for k in self._inputs.keys()] | |
| cols = [col0] | |
| for nk in self._nodes.keys(): | |
| cols.append([("node", nk)]) | |
| return cols | |
| def to_spec(self) -> Dict[str, Any]: | |
| # sanity: every input must have feeds | |
| for k, inp in self._inputs.items(): | |
| if not inp.feeds: | |
| raise ValueError(f"Input '{inp.label}' ({k}) missing .feeds(node, as_=...) wiring.") | |
| # infer node inputs if not given | |
| self._infer_node_inputs_from_wires() | |
| # sanity: forbid duplicate feeds to same node port (double-source via inputs) | |
| seen_target_ports = set() | |
| for inp in self._inputs.values(): | |
| tgt = inp.feeds | |
| if tgt in seen_target_ports: | |
| raise ValueError(f"Two inputs feed the same port {tgt}.") | |
| seen_target_ports.add(tgt) | |
| layout_cols = self._layout_cols or self._default_layout() | |
| # build Option-A compatible PIPELINE_SPEC["layout"] | |
| layout: List[List[Dict[str, Any]]] = [] | |
| for col in layout_cols: | |
| out_col: List[Dict[str, Any]] = [] | |
| for kind, key in col: | |
| if kind == "input": | |
| inp = self._inputs[key] | |
| out_col.append( | |
| { | |
| "kind": "input", | |
| "name": inp.name, # stable short id | |
| "default": inp.default, | |
| "badge_position": inp.badge_position, | |
| "component": inp.component, | |
| "component_kwargs": inp.component_kwargs, | |
| "feeds": inp.feeds, # (node, port) | |
| # if you want to keep label in UI later, keep it: | |
| "label": inp.label, | |
| } | |
| ) | |
| else: | |
| nd = self._nodes[key] | |
| out_col.append( | |
| { | |
| "kind": "node", | |
| "name": nd.name, # stable short id | |
| "fn": nd.fn, | |
| "inputs": nd.inputs, | |
| "outputs": nd.outputs, | |
| "wires": nd.wires, # child_port -> (parent, out_port) | |
| "out_component": nd.out_component, | |
| "out_component_kwargs": nd.out_component_kwargs, | |
| "label": nd.label, | |
| } | |
| ) | |
| layout.append(out_col) | |
| for nd in self._nodes.values(): | |
| if nd.outputs_open and nd.outputs == ["out"]: | |
| self._warn( | |
| f'Node "{nd.label}" has open outputs but none were wired ' | |
| f"(only implicit 'out'). Snapshot may contain out1/out2 " | |
| f"but graph exposes no outputs." | |
| ) | |
| return {"layout": layout} | |
| # ============================================================================= | |
| # PIPELINE PRESETS — EDIT HERE | |
| # ============================================================================= | |
| # | |
| # Define one or more pipeline factories. | |
| # Each factory returns a PIPELINE_SPEC (dict) compatible with build_from_spec(). | |
| # | |
| # Switching pipelines will rebuild Graph/Controls/FrontendState and re-render UI | |
| # without restarting the server. | |
| # ============================================================================= | |
| def make_pipeline_base_deabc() -> dict: | |
| # ---- Functions (pure, testable) ---- | |
| def fn_D(d): | |
| time.sleep(0.05) | |
| return d | |
| def fn_E(e): | |
| time.sleep(0.05) | |
| return e | |
| def fn_A(d, e): | |
| time.sleep(0.05) | |
| # combine in a deterministic way for debugging | |
| return {"d": d, "e": e} | |
| def fn_B(a, b): | |
| time.sleep(0.05) | |
| return {"a": a, "b": b, "mix": f"{a} + {b}"} | |
| def fn_C(x): | |
| time.sleep(0.05) | |
| # final “visible” result | |
| return x | |
| # ---- Pipeline DSL ---- | |
| p = Pipeline() | |
| # Inputs (versioned controls) | |
| in_d = p.input("Input d", name="d", default="d0", component=gr.Textbox) | |
| in_e = p.input("Input e", name="e", default="e0", component=gr.Textbox) | |
| in_b = p.input("Input b", name="b", default="b0", component=gr.Textbox) | |
| # Nodes | |
| D = p.node("D", name="D", fn=fn_D, inputs=["d"], outputs=["out"], out_component=gr.Textbox) | |
| E = p.node("E", name="E", fn=fn_E, inputs=["e"], outputs=["out"], out_component=gr.Textbox) | |
| A = p.node( | |
| "A", | |
| name="A", | |
| fn=fn_A, | |
| inputs=["d", "e"], | |
| outputs=["out"], | |
| out_component=gr.Textbox, # nice for dict output | |
| ) | |
| B = p.node( | |
| "B", | |
| name="B", | |
| fn=fn_B, | |
| inputs=["a", "b"], | |
| outputs=["out"], | |
| out_component=gr.Textbox, | |
| ) | |
| C = p.node( | |
| "C", | |
| name="C", | |
| fn=fn_C, | |
| inputs=["x"], | |
| outputs=["out"], | |
| out_component=gr.Textbox, | |
| ) | |
| # Wiring (matches the classic map) | |
| in_d.feeds(D, as_="d") | |
| in_e.feeds(E, as_="e") | |
| in_b.feeds(B, as_="b") | |
| D.produces("out").feeds(A, as_="d") | |
| E.produces("out").feeds(A, as_="e") | |
| A.produces("out").feeds(B, as_="a") | |
| B.produces("out").feeds(C, as_="x") | |
| # Layout (inputs column + then D/E + then A + then B + then C) | |
| p.columns( | |
| [in_d, in_e], | |
| [D, E], | |
| [A, in_b], | |
| [B], | |
| [C], | |
| ) | |
| return p.to_spec() | |
| def make_pipeline_audio_simple() -> dict: | |
| # ---- Functions (pure, testable) ---- | |
| def audio_passthrough(audio): | |
| # audio = (sample_rate, np.array) ou chemin selon gradio config | |
| time.sleep(0.2) | |
| return audio | |
| # ---- Pipeline DSL ---- | |
| p = Pipeline() | |
| # ------------------------- | |
| # Input audio | |
| # ------------------------- | |
| audio_in = p.input( | |
| "Audio source", | |
| component=gr.Audio, | |
| component_kwargs={ | |
| "sources": ["upload", "microphone"], | |
| "type": "filepath", # important pour les tests | |
| }, | |
| ) | |
| # ------------------------- | |
| # Nodes | |
| # ------------------------- | |
| A = p.node( | |
| "Audio A (identity)", | |
| fn=audio_passthrough, | |
| inputs=["audio"], | |
| outputs=["out"], | |
| out_component=gr.Audio, | |
| out_component_kwargs={ | |
| "type": "filepath", | |
| "label": "Audio A output", | |
| }, | |
| ) | |
| B = p.node( | |
| "Audio B (identity)", | |
| fn=audio_passthrough, | |
| inputs=["audio"], | |
| outputs=["out"], | |
| out_component=gr.Audio, | |
| out_component_kwargs={ | |
| "type": "filepath", | |
| "label": "Audio B output", | |
| }, | |
| ) | |
| # ------------------------- | |
| # Wiring | |
| # ------------------------- | |
| audio_in.feeds(A, as_="audio") | |
| A.produces("out").feeds(B, as_="audio") | |
| # ------------------------- | |
| # Layout | |
| # ------------------------- | |
| p.columns( | |
| [audio_in], | |
| [A], | |
| [B], | |
| ) | |
| return p.to_spec() | |
| PIPELINE_PRESETS: dict[str, callable] = { | |
| "Audio simple": make_pipeline_audio_simple, | |
| "Base D→E→A→B→C": make_pipeline_base_deabc, | |
| } | |
| DEFAULT_PIPELINE_PRESET = "Base D→E→A→B→C" | |
| CUSTOM_TEMPLATE = """# Allowed imports: datetime, hashlib, json, math, random, re, time | |
| import time | |
| # ============================================================ | |
| # 1) FUNCTIONS (pure, testable) | |
| # - define your compute functions here | |
| # ============================================================ | |
| def my_fn_example(x): | |
| # TODO: implement | |
| return x | |
| # ============================================================ | |
| # 2) PIPELINE DSL (declare inputs + nodes) | |
| # - you MUST create: p = Pipeline() | |
| # ============================================================ | |
| def make_pipeline() -> dict: | |
| p = Pipeline() | |
| # ------------------------- | |
| # Inputs (controls) | |
| # ------------------------- | |
| # TODO: declare at least one input | |
| in_x = p.input( | |
| "Input x", | |
| name="x", | |
| default="x0", | |
| component=gr.Textbox, | |
| ) | |
| # ------------------------- | |
| # Nodes | |
| # ------------------------- | |
| # TODO: declare at least one node | |
| A = p.node( | |
| "A", | |
| name="A", | |
| fn=my_fn_example, | |
| inputs=["x"], | |
| outputs=["out"], | |
| out_component=gr.Textbox, | |
| ) | |
| B = p.node( | |
| "B", | |
| name="B", | |
| fn=my_fn_example, | |
| inputs=["x"], | |
| outputs=["out"], | |
| out_component=gr.Textbox, | |
| ) | |
| # ============================================================ | |
| # 3) WIRING (REQUIRED) | |
| # - at least one input -> node feed | |
| # - show example node -> node wiring | |
| # ============================================================ | |
| # Input -> Node (required example) | |
| in_x.feeds(A, as_="x") | |
| # Node -> Node (required example) | |
| A.produces("out").feeds(B, as_="x") | |
| # ============================================================ | |
| # 4) LAYOUT (RECOMMENDED) | |
| # - define UI columns order | |
| # ============================================================ | |
| p.columns( | |
| [in_x], | |
| [A], | |
| [B], | |
| ) | |
| return p.to_spec() | |
| """ | |
| # ============================================================================= | |
| # UI — Rendering & event wiring (Gradio) | |
| # ============================================================================= | |
| def normalize_pipeline_layout(layout): | |
| out = [] | |
| for col in layout: | |
| new_col = [] | |
| for item in col: | |
| item = dict(item) | |
| kind = item.get("kind") | |
| if kind == "input": | |
| item["component"] = item.get("component") or gr.Textbox | |
| item["component_kwargs"] = item.get("component_kwargs") or {} | |
| item["default"] = item.get("default", None) | |
| comp = item["component"] | |
| if comp is gr.Audio and item["default"] in ("", "/app"): | |
| item["default"] = None | |
| elif item["default"] is None and comp is gr.Textbox: | |
| item["default"] = "" | |
| item["badge_position"] = item.get("badge_position") or "below" | |
| # Option A requires explicit feeds | |
| if "feeds" not in item: | |
| raise ValueError(f"Input '{item.get('name')}' missing 'feeds': ('Node','port')") | |
| elif kind == "node": | |
| item["out_component"] = item.get("out_component") or gr.Textbox | |
| item["out_component_kwargs"] = item.get("out_component_kwargs") or {} | |
| # Option A requires explicit node signature (fn + ports) | |
| if "fn" not in item: | |
| raise ValueError(f"Node '{item.get('name')}' missing 'fn'") | |
| item["inputs"] = list(item.get("inputs") or []) | |
| item["outputs"] = list(item.get("outputs") or ["out"]) | |
| # wires optional (may be pure-control nodes) | |
| item["wires"] = dict(item.get("wires") or {}) | |
| else: | |
| raise ValueError(f"Unknown kind in pipeline layout: {kind}") | |
| new_col.append(item) | |
| out.append(new_col) | |
| return out | |
| def normalize_input_value_for_component(comp, value): | |
| if comp in FILE_COMPONENTS: | |
| if value in ("", "/app"): | |
| return None | |
| return value | |
| def build_from_spec(spec: dict) -> tuple[Graph, Controls, FrontendState, list[list[dict]]]: | |
| layout = normalize_pipeline_layout(spec["layout"]) | |
| # 1) collect nodes + inputs | |
| node_items = [it for col in layout for it in col if it["kind"] == "node"] | |
| if DISABLE_MULTI_OUTPUT: | |
| for it in node_items: | |
| outs = list(it.get("outputs") or ["out"]) | |
| if len(outs) > 1: | |
| raise ValueError( | |
| f"Multi-output disabled: node '{it['name']}' declares outputs {outs}" | |
| ) | |
| input_items = [it for col in layout for it in col if it["kind"] == "input"] | |
| node_names = [it["name"] for it in node_items] | |
| input_names = [it["name"] for it in input_items] | |
| # 2) build Graph nodes | |
| g = Graph() | |
| for it in node_items: | |
| g.add_node( | |
| FnNode( | |
| it["name"], | |
| fn=it["fn"], | |
| input_ports=list(it.get("inputs") or []), | |
| output_ports=list(it.get("outputs") or ["out"]), | |
| ) | |
| ) | |
| # 3) build explicit edges from node.wires | |
| # wires: child_port -> (parent_node, parent_port) | |
| fanout_seen: set[tuple[str, str]] = set() # (parent_node, parent_port) | |
| for it in node_items: | |
| child = it["name"] | |
| for child_port, (parent, parent_port) in it.get("wires", {}).items(): | |
| if DISABLE_FANOUT: | |
| key = (parent, parent_port) | |
| if key in fanout_seen: | |
| raise ValueError( | |
| f"Fan-out disabled: '{parent}.{parent_port}' would feed multiple children " | |
| f"(already used; now also feeding '{child}.{child_port}')." | |
| ) | |
| fanout_seen.add(key) | |
| if parent not in g.nodes: | |
| raise ValueError(f"Wire error: {child}.{child_port} references unknown parent node '{parent}'") | |
| if child_port not in g.nodes[child].input_ports: | |
| raise ValueError(f"Wire error: {child}.{child_port} not in declared inputs {g.nodes[child].input_ports}") | |
| if parent_port not in g.nodes[parent].output_ports: | |
| raise ValueError(f"Wire error: {parent}.{parent_port} not in declared outputs {g.nodes[parent].output_ports}") | |
| g.connect(parent, parent_port, child, child_port) | |
| # 4) build controls + control_ports from input.feeds | |
| # feeds: ("Node","port") | |
| control_ports: dict[str, tuple[str, str]] = {} | |
| for it in input_items: | |
| cname = it["name"] | |
| node, port = it["feeds"] | |
| if node not in g.nodes: | |
| raise ValueError(f"feeds error: Input[{cname}] targets unknown node '{node}'") | |
| if port not in g.nodes[node].input_ports: | |
| raise ValueError(f"feeds error: Input[{cname}] targets {node}.{port} but node inputs are {g.nodes[node].input_ports}") | |
| control_ports[cname] = (node, port) | |
| controls = Controls(names=input_names) | |
| s = FrontendState(g, controls, control_ports) | |
| # ✅ Commit defaults immediately so the lab is "ready" on first paint | |
| # This avoids needing a manual Reset to create the first control versions. | |
| for it in input_items: | |
| cname = it["name"] | |
| comp = it.get("component") or gr.Textbox | |
| default = it.get("default", None) | |
| default = normalize_input_value_for_component(comp, default) | |
| controls.set_ui_value(cname, default) | |
| # 5) sanity: every node input port must be satisfied by either edge or control | |
| for n, node in g.nodes.items(): | |
| for p in node.input_ports: | |
| is_wired = (n, p) in g.rev | |
| is_controlled = any((nn == n and pp == p) for (nn, pp) in control_ports.values()) | |
| if not (is_wired or is_controlled): | |
| raise ValueError(f"Unresolved input port: {n}.{p} has no wire and no control feed") | |
| # forbid double-source (wire + control) on the same port | |
| if is_wired and is_controlled: | |
| raise ValueError(f"Double-source: {n}.{p} has BOTH a wire and a control feed") | |
| return g, controls, s, layout | |
| # ============================================================================= | |
| # ACTIVE PIPELINE (runtime-rebuildable) | |
| # ============================================================================= | |
| # These globals are assigned by set_pipeline_from_spec() when a preset is chosen. | |
| G: Graph | |
| CTRL: Controls | |
| S: FrontendState | |
| PIPELINE_LAYOUT: list[list[dict]] | |
| INPUT_COMPONENTS: dict[str, Any] = {} | |
| INPUT_LABELS: dict[str, str] = {} | |
| NODE_LABELS: dict[str, str] = {} | |
| INPUT_ORDER: list[str] = [] | |
| NODE_ORDER_UI: list[str] = [] | |
| CONTROL_WIRES: list[str] = [] | |
| EDGE_ORDER: list[tuple[str, str]] = [] | |
| NODE_ORDER: list[str] = [] | |
| def ui_input(name: str) -> str: | |
| return INPUT_LABELS.get(name, name) | |
| def ui_node(name: str) -> str: | |
| return NODE_LABELS.get(name, name) | |
| def set_pipeline_from_spec(spec: dict) -> None: | |
| """ | |
| Build a new Graph/Controls/State/Layout from spec, and recompute all derived UI orders. | |
| Overwrites the module-level globals used by the UI and handlers. | |
| """ | |
| global G, CTRL, S, PIPELINE_LAYOUT | |
| global INPUT_COMPONENTS, INPUT_LABELS, NODE_LABELS | |
| global INPUT_ORDER, NODE_ORDER_UI, CONTROL_WIRES, EDGE_ORDER, NODE_ORDER | |
| G, CTRL, S, PIPELINE_LAYOUT = build_from_spec(spec) | |
| # Map: input_name -> component class (gr.Textbox, gr.Audio, ...) | |
| INPUT_COMPONENTS = {} | |
| for col in PIPELINE_LAYOUT: | |
| for it in col: | |
| if it["kind"] == "input": | |
| INPUT_COMPONENTS[it["name"]] = it.get("component") or gr.Textbox | |
| # Human labels (fallback to internal name) | |
| INPUT_LABELS = {} | |
| NODE_LABELS = {} | |
| for col in PIPELINE_LAYOUT: | |
| for it in col: | |
| if it["kind"] == "input": | |
| INPUT_LABELS[it["name"]] = it.get("label") or it["name"] | |
| elif it["kind"] == "node": | |
| NODE_LABELS[it["name"]] = it.get("label") or it["name"] | |
| # Derived graph/UI orders (single source of truth) | |
| INPUT_ORDER = [it["name"] for col in PIPELINE_LAYOUT for it in col if it["kind"] == "input"] | |
| NODE_ORDER_UI = [it["name"] for col in PIPELINE_LAYOUT for it in col if it["kind"] == "node"] | |
| def derive_control_wires(input_order: list[str], control_ports: dict[str, tuple[str, str]]) -> list[str]: | |
| ui = [c for c in input_order if c in control_ports] | |
| extra = [c for c in control_ports.keys() if c not in ui] | |
| return ui + extra | |
| def derive_edge_order() -> list[tuple[str, str]]: | |
| """ | |
| Unique parent->child edges derived from graph wiring. | |
| Ordered to be stable and UI-friendly: | |
| - primary key: index of parent in NODE_ORDER_UI (fallback large) | |
| - secondary: index of child in NODE_ORDER_UI (fallback large) | |
| - then lexical as final tie-break | |
| """ | |
| edges_set = set() | |
| for (dst_node, _dst_port), (src_node, _src_port) in S.g.rev.items(): | |
| edges_set.add((src_node, dst_node)) | |
| edges = list(edges_set) | |
| pos = {n: i for i, n in enumerate(NODE_ORDER_UI)} | |
| big = 10**9 | |
| edges.sort(key=lambda e: (pos.get(e[0], big), pos.get(e[1], big), e[0], e[1])) | |
| return edges | |
| def derive_node_order_for_summary() -> list[str]: | |
| """ | |
| Order used by render_global() for summary. Prefer UI order, | |
| then add any graph nodes not present in the UI layout (stable). | |
| """ | |
| ui = [n for n in NODE_ORDER_UI if n in S.g.nodes] | |
| extra = [n for n in S.g.nodes.keys() if n not in ui] | |
| return ui + extra | |
| CONTROL_WIRES = derive_control_wires(INPUT_ORDER, S.control_ports) | |
| EDGE_ORDER = derive_edge_order() | |
| NODE_ORDER = derive_node_order_for_summary() | |
| # ============================================================================= | |
| # UI rendering helpers (compute vs render) | |
| # ============================================================================= | |
| def node_out(node: str, idx: int) -> Any: | |
| if idx is None or idx < 0: | |
| return None | |
| if node not in S.history: | |
| return None | |
| if idx >= len(S.history[node]): | |
| return None | |
| snap = S.history[node][idx] | |
| if snap.status == "error" or not snap.outputs: | |
| return None | |
| outs = snap.outputs | |
| if len(outs) == 1 and "out" in outs: | |
| return outs["out"] | |
| return json.dumps(outs, ensure_ascii=False, indent=2) | |
| def node_out_rendered(node: str) -> Any: | |
| """ | |
| Value meant for the "rendered output" component. | |
| Rules: | |
| - if selected snapshot is error: | |
| - return error message (string) → only meaningful for Textbox | |
| - if ok and has outputs["out"]: | |
| - return outputs["out"] | |
| - else: | |
| - None | |
| """ | |
| idx = S.selected_index.get(node, -1) | |
| if idx is None or idx < 0: | |
| return None | |
| if node not in S.history: | |
| return None | |
| if idx >= len(S.history[node]): | |
| return None | |
| snap = S.history[node][idx] | |
| if snap.status == "error": | |
| return None # au lieu d'une string | |
| if not snap.outputs: | |
| return None | |
| if "out" in snap.outputs: | |
| return snap.outputs["out"] | |
| return None | |
| def control_val(name: str, idx: int) -> Any: | |
| if idx is None or idx < 0: | |
| return None | |
| if idx >= len(CTRL.history.get(name, [])): | |
| return None | |
| return CTRL.history[name][idx] | |
| def input_wire_badge(control_name: str) -> str: | |
| color = S.control_wire_color(control_name) | |
| node, _port = S.control_ports[control_name] | |
| return ( | |
| "<div style='display:flex;align-items:center;justify-content:flex-end;gap:6px;'>" | |
| f"<span style='font-size:12px;color:#6b7280;'>feeds {ui_node(node)}</span>" | |
| f"{dot_html(color)}" | |
| "</div>" | |
| ) | |
| def node_io_badges(node: str) -> str: | |
| badge_mode = BADGE_LABEL_MODE | |
| # ---- Inputs: show either port names or node/input labels ---- | |
| in_lines: List[str] = [] | |
| for port, src, color in S.input_port_colors(node): | |
| if badge_mode == "ports": | |
| src_txt = port # ✅ port métier côté entrée du node courant | |
| else: | |
| if src.startswith("Input["): | |
| cname = src[6:-1] | |
| src_txt = ui_input(cname) | |
| elif src == "unwired": | |
| src_txt = "unwired" | |
| else: | |
| src_txt = ui_node(src) | |
| in_lines.append( | |
| "<div style='display:block;line-height:1.35;margin:2px 0;'>" | |
| f"{dot_html(color)}" | |
| f"<span style='color:#6b7280;font-size:12px;margin-left:6px;'>{src_txt}</span>" | |
| "</div>" | |
| ) | |
| if not in_lines: | |
| in_lines = [ | |
| "<div style='display:block;line-height:1.35;margin:2px 0;'>" | |
| f"{dot_html('gray')}" | |
| "<span style='color:#6b7280;font-size:12px;margin-left:6px;'>no inputs</span>" | |
| "</div>" | |
| ] | |
| # ---- Outputs: show either output port names or destination node label ---- | |
| out_lines: List[str] = [] | |
| for src_port, dst_label, color in S.output_edge_colors(node): | |
| if badge_mode == "ports": | |
| dst_txt = src_port # ✅ port métier côté sortie du node courant | |
| else: | |
| dst_node = dst_label.split(".", 1)[0] | |
| dst_txt = ui_node(dst_node) | |
| out_lines.append( | |
| "<div style='display:block;line-height:1.35;margin:2px 0;text-align:right;'>" | |
| f"<span style='color:#6b7280;font-size:12px;margin-right:6px;'>{dst_txt}</span>" | |
| f"{dot_html(color)}" | |
| "</div>" | |
| ) | |
| if not out_lines: | |
| out_lines = [ | |
| "<div style='display:block;line-height:1.35;margin:2px 0;text-align:right;'>" | |
| "<span style='color:#6b7280;font-size:12px;margin-right:6px;'>no outputs</span>" | |
| f"{dot_html('gray')}" | |
| "</div>" | |
| ] | |
| return ( | |
| "<div style='display:flex;justify-content:space-between;gap:14px;'>" | |
| f"<div style='min-width:0;'>{''.join(in_lines)}</div>" | |
| f"<div style='min-width:0;'>{''.join(out_lines)}</div>" | |
| "</div>" | |
| ) | |
| def choices_for(node: str) -> List[str]: | |
| return S.history_list(node) or ["(empty)"] | |
| def selected_choice_value(node: str) -> str: | |
| choices = choices_for(node) | |
| if choices == ["(empty)"]: | |
| return "(empty)" | |
| idx = S.selected_index.get(node, -1) | |
| if idx < 0 or idx >= len(choices): | |
| return choices[-1] | |
| return choices[idx] | |
| def nav_button_updates(node: str) -> Tuple[dict, dict]: | |
| """ | |
| Returns (left_update, right_update) for ◀︎ / ▶︎ buttons. | |
| - Disable when no selection, empty history, or at boundaries. | |
| """ | |
| n = len(S.history.get(node, [])) | |
| cur = S.selected_index.get(node, -1) | |
| if n <= 1 or cur < 0: | |
| return gr.update(interactive=False), gr.update(interactive=False) | |
| left_ok = cur > 0 | |
| right_ok = cur < (n - 1) | |
| return gr.update(interactive=left_ok), gr.update(interactive=right_ok) | |
| def run_need_state(node: str) -> str: | |
| """ | |
| Compute "CTA state" for run buttons: | |
| - force: needs a strong "▶︎ Run" action | |
| - stale: suggest rerun (↻) | |
| - fresh: normal run (✓) | |
| """ | |
| if S.node_is_in_error_ui(node): | |
| return "force" | |
| if S.never_ran(node): | |
| return "force" | |
| # structural mismatch or missing => force | |
| for parent in S.g.parents_of(node): | |
| col = S.wire_color(parent, node) | |
| if col in ("gray", "blue"): | |
| return "force" | |
| # direct input changed => force | |
| for cname, (target_node, _port) in S.control_ports.items(): | |
| if target_node == node and S.control_wire_color(cname) == "orange": | |
| return "force" | |
| # upstream stale => stale | |
| for parent in S.g.parents_of(node): | |
| if S.wire_color(parent, node) == "orange": | |
| return "stale" | |
| return "fresh" | |
| def run_button_updates(node: str) -> Tuple[dict, dict, dict]: | |
| state = run_need_state(node) | |
| return ( | |
| gr.update(visible=(state == "fresh")), # ✓ Run | |
| gr.update(visible=(state == "stale")), # ↻ Run | |
| gr.update(visible=(state == "force")), # ▶︎ Run | |
| ) | |
| def compute_node_status_line(node: str) -> str: | |
| sel = S.selected_index.get(node, None) | |
| gf = S.globally_fresh(node) | |
| never = S.never_ran(node) | |
| extra = "" | |
| if node in S.last_run_meta: | |
| extra = f" | ⚠ computed via {S.last_run_meta[node]}" | |
| status = ( | |
| f"{node} | selected={sel} | " | |
| f"never_ran={'✅' if never else '❌'} | " | |
| f"globally_fresh={'✅' if gf else '❌'}" | |
| + extra | |
| ) | |
| if S.node_is_in_error_ui(node): | |
| status += " | ❌ NODE IN ERROR" | |
| else: | |
| snap_status = S.selected_snapshot_status(node) | |
| if snap_status == "error": | |
| status += " | ❌ selected snapshot = error" | |
| if S.last_run_error_relevant(node): | |
| status += " | ❌ FAILED IN LAST RUN" | |
| blocked = any(S.edge_is_blocked_ui(p, node) for p in S.g.parents_of(node)) | |
| if blocked: | |
| status += " | ⛔ BLOCKED (parent failed in last run)" | |
| return status | |
| def render_node_block(node: str): | |
| """ | |
| Produces all UI pieces for a node card. | |
| Returns: | |
| status_line, io_html, out_view_val, out_debug_txt, hist_update, left_update, right_update, | |
| run_fresh_upd, run_stale_upd, run_fix_upd | |
| """ | |
| status = compute_node_status_line(node) | |
| io_md = node_io_badges(node) | |
| out_view_val = node_out_rendered(node) # ✅ NEW | |
| out_debug_txt = S.current_output_text(node) # ✅ debug JSON, inchangé | |
| hist = gr.update(choices=choices_for(node), value=selected_choice_value(node)) | |
| left_upd, right_upd = nav_button_updates(node) | |
| rf, rs, rx = run_button_updates(node) | |
| return status, io_md, out_view_val, out_debug_txt, hist, left_upd, right_upd, rf, rs, rx | |
| def render_edges_text() -> str: | |
| lines: List[str] = [] | |
| if WIRE_COLOR_MODE == "daggr": | |
| lines.append("Daggr-like") | |
| lines.append("Legend: 🟠 ok | ⚪ not ok (details hidden)") | |
| else: | |
| lines.append("Rich-mode") | |
| lines.append( | |
| "Legend: 🔵 blue=never ran | 🔴 red=failed | 🟣 purple=blocked | ⚪ gray=mismatch | 🟠 stale | 🟢 fresh" | |
| ) | |
| lines.append("") | |
| # Input wires | |
| lines.append("== Input wires (Input[x] → target node) ==") | |
| for c in CONTROL_WIRES: | |
| node, _port = S.control_ports[c] | |
| color = S.control_wire_color(c) | |
| cur_idx = CTRL.selected.get(c, -1) | |
| cur_val = CTRL.values.get(c) | |
| used_idx = None | |
| used_val = None | |
| nidx = S.selected_index.get(node, -1) | |
| if 0 <= nidx < len(S.history[node]): | |
| snap = S.history[node][nidx] | |
| used_idx = snap.controls_selected.get(c, None) | |
| if used_idx is not None: | |
| used_val = control_val(c, used_idx) | |
| if color == "blue": | |
| extra = " | why=target node not run / no snapshot selected" | |
| elif color == "red": | |
| extra = " | why=target node failed (selected snapshot error OR last run failed on this node)" | |
| elif color == "gray": | |
| if used_idx is None: | |
| extra = " | why=target snapshot doesn't reference this Input yet" | |
| else: | |
| extra = ( | |
| f" | why=unknown gray state: used#{used_idx}={pretty_val(used_val)}" | |
| f" / now#{cur_idx}={pretty_val(cur_val)}" | |
| ) | |
| elif color == "orange": | |
| extra = ( | |
| f" | why=stale: used#{used_idx}={pretty_val(used_val)}" | |
| f" → now#{cur_idx}={pretty_val(cur_val)}" | |
| ) | |
| else: | |
| extra = f" | ok=used#{used_idx}={pretty_val(used_val)} == now#{cur_idx}={pretty_val(cur_val)}" | |
| lines.append(f"{emoji_wire(color)} Input[{c}] → {node} | wire={color}{extra}") | |
| lines.append("") | |
| lines.append("== Graph edges (parent → child) ==") | |
| for parent, child in EDGE_ORDER: | |
| color = S.wire_color(parent, child) | |
| aligned = S.aligned_edge(parent, child) | |
| cidx = S.selected_index.get(child, -1) | |
| pidx = S.selected_index.get(parent, -1) | |
| # If child has no selected snapshot, explain (but keep blocked purple) | |
| if not (0 <= cidx < len(S.history[child])): | |
| if S.edge_is_blocked_ui(parent, child): | |
| color = "purple" | |
| lines.append( | |
| f"{emoji_wire(color)} {parent} → {child} | aligned=—" | |
| f" | wire={color} | why=blocked: parent failed in last run; child not executed" | |
| ) | |
| else: | |
| color = "blue" | |
| lines.append( | |
| f"{emoji_wire(color)} {parent} → {child} | aligned=—" | |
| f" | wire={color} | why=child has no selected snapshot" | |
| ) | |
| continue | |
| snap_c = S.history[child][cidx] | |
| want_p = snap_c.upstream_selected.get(parent, None) | |
| if color == "red": | |
| extra = " | why=child node failed (incoming wires red)" | |
| elif color == "gray": | |
| want_val = node_out(parent, want_p) if want_p is not None else None | |
| have_val = node_out(parent, pidx) | |
| extra = ( | |
| f" | why=parent mismatch: {parent} want#{want_p}={pretty_val(want_val)}" | |
| f" → have#{pidx}={pretty_val(have_val)}" | |
| ) | |
| elif color == "orange": | |
| diffs = [] | |
| for cc, used_idx in snap_c.controls_selected.items(): | |
| cur_cc = CTRL.selected.get(cc, -1) | |
| if cur_cc != used_idx: | |
| uval = control_val(cc, used_idx) | |
| cval = control_val(cc, cur_cc) | |
| diffs.append( | |
| f"Input[{cc}] used#{used_idx}={pretty_val(uval)} → now#{cur_cc}={pretty_val(cval)}" | |
| ) | |
| if diffs: | |
| extra = " | why=stale controls: " + "; ".join(diffs[:2]) + ("; …" if len(diffs) > 2 else "") | |
| else: | |
| extra = " | why=stale controls (some in-scope Input[x] changed)" | |
| elif color == "blue": | |
| extra = " | why=child not run yet" | |
| elif color == "purple": | |
| extra = " | why=blocked: parent failed in last run; child not executed; no further propagation" | |
| else: | |
| extra = f" | ok=child uses parent#{want_p} (current parent#{pidx})" | |
| lines.append( | |
| f"{emoji_wire(color)} {parent} → {child} | aligned={'✅' if aligned else '❌'}" | |
| f" | wire={color}{extra}" | |
| ) | |
| return "\n".join(lines) | |
| def render_global(msg: str) -> Tuple[str, str, str, str]: | |
| lines: List[str] = [] | |
| if WIRE_COLOR_MODE == "daggr": | |
| lines.append("Daggr-like") | |
| lines.append("Legend: 🟠 ok | ⚪ not ok (details hidden)") | |
| else: | |
| lines.append("Rich-mode") | |
| lines.append( | |
| "Legend: 🔵 blue=never ran | 🔴 red=failed | 🟣 purple=blocked | ⚪ gray=mismatch | 🟠 stale | 🟢 fresh" | |
| ) | |
| lines.append("") | |
| if getattr(S, "last_run_note", None): | |
| lines.append("") | |
| lines.append(S.last_run_note) | |
| if getattr(S, "last_run_failed", False) and getattr(S, "last_run_error", None): | |
| err = S.last_run_error | |
| lines.append("") | |
| lines.append(f"❌ LAST RUN FAILED: node={err['node']} | phase={err['phase']} | error={err['error']}") | |
| for n in NODE_ORDER: | |
| sel = S.selected_index.get(n, None) | |
| gf = S.globally_fresh(n) | |
| out = S.current_output_text(n) | |
| lines.append(f"- {n}: selected={sel} | globally_fresh={'✅' if gf else '❌'} | out={out}") | |
| return msg, "\n".join(lines), render_edges_text(), S.debug_state() | |
| # Rendering is intentionally centralized: | |
| # render_all() recomputes the full UI from state | |
| # This avoids partial updates and UI desync bugs. | |
| def render_all(msg: str = "", input_overrides: dict[str, Any] | None = None) -> Tuple[Any, ...]: | |
| input_overrides = input_overrides or {} | |
| glob = render_global(msg) | |
| out: List[Any] = [] | |
| for cname in INPUT_ORDER: | |
| out.append(input_wire_badge(cname)) | |
| if cname in input_overrides: | |
| out.append(gr.update(value=input_overrides[cname])) | |
| else: | |
| out.append(gr.update()) # default: don't touch input values | |
| out.extend([glob[0], glob[1], glob[2], glob[3]]) | |
| for node in NODE_ORDER_UI: | |
| status, io_md, out_view_val, out_debug_txt, hist, left_upd, right_upd, rf, rs, rx = render_node_block(node) | |
| out.extend([status, io_md, out_view_val, out_debug_txt, hist, left_upd, right_upd, rf, rs, rx]) | |
| return tuple(out) | |
| # ============================================================================= | |
| # UI event handlers (DRY) | |
| # ============================================================================= | |
| def ui_reset(*vals: Any): | |
| CTRL.reset() | |
| S.reset() | |
| for cname, v in zip(INPUT_ORDER, vals): | |
| comp = INPUT_COMPONENTS.get(cname, gr.Textbox) | |
| v = normalize_input_value_for_component(comp, v) | |
| CTRL.set_ui_value(cname, v) | |
| return render_all("Reset state") | |
| def _commit_all_textboxes(*vals: Any) -> None: | |
| for cname, v in zip(INPUT_ORDER, vals): | |
| comp = INPUT_COMPONENTS.get(cname, gr.Textbox) | |
| v = normalize_input_value_for_component(comp, v) | |
| CTRL.set_ui_value(cname, v) | |
| def ui_set_input(control_name: str, value: Any): | |
| comp = INPUT_COMPONENTS.get(control_name, gr.Textbox) | |
| value = normalize_input_value_for_component(comp, value) | |
| CTRL.set_ui_value(control_name, value) | |
| S.clear_transient_run_state() | |
| return render_all(f"Applied input {control_name}") | |
| def ui_set_reuse_identical(flag: bool): | |
| CTRL.reuse_identical_values = bool(flag) | |
| return render_all(f"Reuse identical input values = {CTRL.reuse_identical_values}") | |
| def ui_set_badge_label_mode(mode: str): | |
| global BADGE_LABEL_MODE | |
| BADGE_LABEL_MODE = mode or "nodes" | |
| # pas besoin de clear run state : c'est purement de l'affichage | |
| return render_all(f"Badge labels = {BADGE_LABEL_MODE}") | |
| def ui_run( | |
| node: str, | |
| auto_upstream: bool, | |
| force_upstream: bool, | |
| force_downstream: bool, | |
| force_topo: bool, | |
| on_error: str, | |
| skip_if_unchanged: bool, | |
| *vals: str, | |
| ): | |
| try: | |
| _commit_all_textboxes(*vals) | |
| # Reset transient state for a new run | |
| S.clear_transient_run_state() | |
| # ------------------------------------------------- | |
| # 🔒 HARD NO-OP MODE (before starting run) | |
| # ------------------------------------------------- | |
| if skip_if_unchanged and not (force_upstream or force_downstream): | |
| if ( | |
| not S.node_is_in_error_ui(node) | |
| and S.would_run_be_noop(node) | |
| ): | |
| yield render_all( | |
| f"⏭️ Skipped {node}: no upstream/control change (true no-op)" | |
| ) | |
| return | |
| # ------------------------------------------------- | |
| run_id = S.start_run() | |
| meta = [] | |
| if force_upstream: | |
| meta.append("force-upstream") | |
| if force_downstream: | |
| meta.append("force-downstream") | |
| tag = ", ".join(meta) if meta else "" | |
| def _yield_step(txt: str): | |
| return render_all(txt) | |
| # First paint | |
| yield _yield_step(f"Starting run {node}… (run_id={run_id})") | |
| def _run_and_yield(n: str, phase: str): | |
| S._ensure_required_controls_committed(n) | |
| # ✅ no-op run (ne skip pas si force_upstream/force_downstream) | |
| if skip_if_unchanged and not (force_upstream or force_downstream): | |
| if not S.node_is_in_error_ui(n) and S.would_run_be_noop(n): | |
| yield _yield_step(f"⏭️ Skipped {n}: no upstream/control change (run_id={run_id})") | |
| return | |
| if tag: | |
| S.last_run_meta[n] = tag | |
| S.last_run_executed.add(n) | |
| S.run_one(n, on_error=on_error, run_phase=phase) | |
| # ✅ if we kept an error snapshot, stop the pipeline here (but snapshot is preserved) | |
| if on_error == "snapshot" and S.is_error_selected(n): | |
| raise RuntimeError(f"{n} failed (snapshot): {S.selected_snapshot(n).error}") | |
| yield _yield_step(f"Ran {n} ({phase}) (run_id={run_id})") | |
| def _safe_run_and_yield(n: str, phase: str) -> bool: | |
| """ | |
| Run node and stream UI. If node fails: | |
| - set last_run_failed, capture fingerprint | |
| - mark direct children as blocked | |
| - stream stop message | |
| """ | |
| try: | |
| yield from _run_and_yield(n, phase) | |
| return True | |
| except Exception as e: | |
| S.last_run_failed = True | |
| parents = S.g.parents_of(n) | |
| S.last_run_error = { | |
| "run_id": run_id, | |
| "node": n, | |
| "phase": phase, | |
| "error": repr(e), | |
| "on_error": on_error, | |
| "root": node, | |
| "force_upstream": bool(force_upstream), | |
| "force_downstream": bool(force_downstream), | |
| "auto_upstream": bool(auto_upstream), | |
| "force_topo": bool(force_topo), | |
| # fingerprint | |
| "parents_sel": {p: S.selected_index.get(p, -1) for p in parents}, | |
| "ctrl_sel": {c: CTRL.selected.get(c, -1) for c in S._controls_in_scope(n)}, | |
| "root_sel": dict(S.selected_index), # debug | |
| } | |
| # all downstream nodes are blocked by the stop (not executed in this run) | |
| S.last_run_blocked_nodes = set(S.g.downstream_of(n)) | |
| yield _yield_step(f"❌ Run stopped: {n} failed ({phase}) → {e} (run_id={run_id})") | |
| return False | |
| # ------------------------- | |
| # UPSTREAM | |
| # ------------------------- | |
| if force_upstream: | |
| order = S.g.topo_upstream_to(node) if force_topo else list(S.g.deps_of(node) | {node}) | |
| for n in order: | |
| for p in S.g.parents_of(n): | |
| if S.selected_index.get(p, -1) < 0: | |
| S.run_with_upstream(p, on_error=on_error) | |
| ok = yield from _safe_run_and_yield(n, "upstream(force)") | |
| if not ok: | |
| S.end_run() | |
| return | |
| else: | |
| if auto_upstream: | |
| order = S.g.topo_upstream_to(node) | |
| for n in order: | |
| if n == node: | |
| continue | |
| if S.selected_index.get(n, -1) < 0: | |
| for p in S.g.parents_of(n): | |
| if S.selected_index.get(p, -1) < 0: | |
| S.run_with_upstream(p, on_error=on_error) | |
| ok = yield from _safe_run_and_yield(n, "upstream(auto)") | |
| if not ok: | |
| S.end_run() | |
| return | |
| ok = yield from _safe_run_and_yield(node, "root") | |
| if not ok: | |
| S.end_run() | |
| return | |
| # ------------------------- | |
| # DOWNSTREAM (force) | |
| # ------------------------- | |
| if force_downstream: | |
| for n in S.g.topo_downstream_from(node): | |
| for p in S.g.parents_of(n): | |
| if S.selected_index.get(p, -1) < 0: | |
| S.run_with_upstream(p, on_error=on_error) | |
| ok = yield from _safe_run_and_yield(n, "downstream(force)") | |
| if not ok: | |
| S.end_run() | |
| return | |
| if tag: | |
| S.last_run_note = "🔁 Pipeline recomputed (new version)" | |
| yield _yield_step(f"Run complete for {node} ⚠ {tag} (run_id={run_id})") | |
| else: | |
| yield _yield_step(f"Run complete for {node} (run_id={run_id})") | |
| S.end_run() | |
| except Exception as e: | |
| # hard failure outside node runs | |
| S.end_run() | |
| yield render_all(f"ERROR running {node}: {e}") | |
| def ui_run_node(node_name: str): | |
| def _fn(auto_up, force_up, force_down, force_topo, on_error, skip_if_unchanged, *vals): | |
| yield from ui_run(node_name, auto_up, force_up, force_down, force_topo, on_error, skip_if_unchanged, *vals) | |
| return _fn | |
| def compute_input_overrides_from_ctrl() -> dict[str, Any]: | |
| overrides = {} | |
| for c in INPUT_ORDER: | |
| comp = INPUT_COMPONENTS.get(c, gr.Textbox) | |
| v = CTRL.get_selected_value(c) | |
| v = normalize_input_value_for_component(comp, v) | |
| overrides[c] = v | |
| return overrides | |
| def ui_restore(node: str, idx_str: str, mode: str, reconcile_flag: bool): | |
| try: | |
| # Changing context => clear transient run state | |
| S.clear_transient_run_state() | |
| if not idx_str or idx_str == "(empty)": | |
| return render_all(f"No history for {node}") | |
| idx = parse_hist_choice_idx(idx_str) | |
| if idx is None: | |
| return render_all(f"Bad selection for {node}") | |
| use_ctx = ("downstream_match" in mode) | |
| if not use_ctx: | |
| S.restore(node, idx, mode) | |
| overrides = compute_input_overrides_from_ctrl() | |
| return render_all(f"Restored {node} -> {idx} | mode={mode} (simple)", input_overrides=overrides) | |
| ctx = S.restore_with_context(node, idx, mode, reconcile=bool(reconcile_flag)) | |
| S.commit_restore_context(ctx) | |
| overrides = compute_input_overrides_from_ctrl() | |
| return render_all( | |
| f"Restored {node} -> {idx} | mode={mode} | reconcile={reconcile_flag} (ctx)", | |
| input_overrides=overrides, | |
| ) | |
| except Exception as e: | |
| return render_all(f"ERROR restoring {node}: {e}") | |
| def ui_nav_hist(node: str, direction: int, mode: str, reconcile_flag: bool): | |
| """ | |
| UI-only: navigate BASE history (full dropdown) by changing selected snapshot index. | |
| direction: -1 (prev) or +1 (next) | |
| """ | |
| try: | |
| S.clear_transient_run_state() | |
| cur = S.selected_index.get(node, -1) | |
| if cur < 0: | |
| return render_all(f"{node}: no selected snapshot yet") | |
| n = len(S.history.get(node, [])) | |
| if n <= 1: | |
| return render_all(f"{node}: no navigation (history size={n})") | |
| target = max(0, min(cur + direction, n - 1)) | |
| if target == cur: | |
| return render_all(f"{node}: boundary reached ({cur})") | |
| use_ctx = ("downstream_match" in mode) | |
| if not use_ctx: | |
| S.restore(node, target, mode) | |
| else: | |
| ctx = S.restore_with_context(node, target, mode, reconcile=bool(reconcile_flag)) | |
| S.commit_restore_context(ctx) | |
| arrow = "→" if direction > 0 else "←" | |
| overrides = compute_input_overrides_from_ctrl() | |
| return render_all(f"{node}: nav {arrow} -> {target} | mode={mode}", input_overrides=overrides) | |
| except Exception as e: | |
| return render_all(f"ERROR nav {node}: {e}") | |
| def values_equal(a: Any, b: Any) -> bool: | |
| # identical object / None | |
| if a is b: | |
| return True | |
| # try numpy-aware equality | |
| try: | |
| import numpy as np # type: ignore | |
| if isinstance(a, np.ndarray) and isinstance(b, np.ndarray): | |
| return np.array_equal(a, b) | |
| except Exception: | |
| pass | |
| # handle tuples/lists that may contain ndarrays (e.g. (sr, np.array)) | |
| if isinstance(a, (tuple, list)) and isinstance(b, (tuple, list)) and len(a) == len(b): | |
| return all(values_equal(x, y) for x, y in zip(a, b)) | |
| if isinstance(a, dict) and isinstance(b, dict) and a.keys() == b.keys(): | |
| return all(values_equal(a[k], b[k]) for k in a.keys()) | |
| # fallback | |
| try: | |
| return a == b | |
| except Exception: | |
| return False | |
| def ui_set_wire_color_mode(mode: str): | |
| global WIRE_COLOR_MODE | |
| WIRE_COLOR_MODE = mode | |
| return render_all(f"Wire color mode = {mode}") | |
| # ============================================================================= | |
| # Gradio App UI | |
| # ============================================================================= | |
| CSS = """ | |
| div#pipeline { | |
| align-items: center; | |
| column-gap: 32px; | |
| padding: 32px; | |
| } | |
| div#pipeline .node-card-group { | |
| margin: 40px 0; | |
| align-items: end; | |
| row-gap: 2px; | |
| } | |
| div#pipeline .card-port-status { | |
| padding: 4px; | |
| border-radius: 0; | |
| width: 95%; | |
| margin: auto; | |
| background-color: #27272a; | |
| } | |
| .card-port-status div { | |
| align-items: center; | |
| } | |
| .input-port-status{ | |
| text-align: right; | |
| padding: 8px 20px; | |
| } | |
| .input-card-node{ | |
| row-gap: 2px; | |
| margin: 40px 0; | |
| } | |
| /* placeholders if you want to vary badge styling */ | |
| .badge-pos-below {} | |
| .badge-pos-above {} | |
| .badge-pos-right { text-align: right; } | |
| /* | |
| Nav buttons sizing | |
| */ | |
| button.hist-nav-btn { | |
| padding: 10px; | |
| width: 40px !important; | |
| min-width: 40px !important; | |
| max-width: 40px !important; | |
| padding: 22px !important; | |
| font-size: 16px !important; | |
| line-height: 1 !important; | |
| border-radius: 0 !important; | |
| } | |
| .history-dropdown { | |
| min-width: max-content!important; | |
| } | |
| .history-dropdown > .container > span { | |
| display: none; | |
| } | |
| .node-status-content > .container > span { | |
| display: none; | |
| } | |
| .history-group-row{ | |
| column-gap: 0; | |
| } | |
| .restore-selected-history-btn{ | |
| } | |
| button.apply-input-btn{ | |
| width: auto; | |
| max-width: 194px; | |
| border-radius: 14px; | |
| border-top-left-radius: 0; | |
| border-top-right-radius: 0; | |
| margin-left: 15px; | |
| } | |
| button.run-fresh{ | |
| border-radius: 0; | |
| border-top-left-radius: 14px; | |
| border-top-right-radius: 14px; | |
| margin-right: 14px; | |
| width: auto; | |
| max-width: 304px; | |
| background-color: #17a34a; | |
| } | |
| button.run-stale{ | |
| border-radius: 0; | |
| border-top-left-radius: 14px; | |
| border-top-right-radius: 14px; | |
| margin-right: 14px; | |
| width: auto; | |
| max-width: 304px; | |
| background-color: orange; | |
| } | |
| button.run-fix{ | |
| border-radius: 0; | |
| border-top-left-radius: 14px; | |
| border-top-right-radius: 14px; | |
| margin-right: 14px; | |
| width: auto; | |
| max-width: 304px; | |
| } | |
| /* Optionnel: évite que le container s'étire */ | |
| .codebox { | |
| height: 600px !important; | |
| max-height: 600px !important; | |
| } | |
| div#pre-pipe-panel{ | |
| margin: 40px 0; | |
| } | |
| """ | |
| # ----------------------------------------------------------------------------- | |
| # UI component factories (Gradio widgets) | |
| # ----------------------------------------------------------------------------- | |
| def node_card( | |
| key: str, | |
| *, | |
| out_component=None, | |
| out_component_kwargs=None, | |
| ): | |
| label = ui_node(key) | |
| dom_io = f"io_{key}" | |
| # composant pour le "rendered" | |
| out_component_kwargs = dict(out_component_kwargs or {}) | |
| out_component_kwargs.setdefault("interactive", False) | |
| # 🚫 le label de la node est souverain | |
| out_component_kwargs.pop("label", None) | |
| with gr.Column(elem_classes="node-card-group"): | |
| run_fresh = gr.Button( | |
| f"✓ Run {label}", | |
| variant="primary", | |
| elem_classes="run-fresh", | |
| visible=True, | |
| ) | |
| run_stale = gr.Button( | |
| f"↻ Run {label}", | |
| variant="primary", | |
| elem_classes="run-stale", | |
| visible=False, | |
| ) | |
| run_fix = gr.Button( | |
| f"▶︎ Run {label}", | |
| variant="primary", | |
| elem_classes="run-fix", | |
| visible=False, | |
| ) | |
| # ✅ NEW: output "rendered" (composant choisi) | |
| out_view = out_component( | |
| label=f"{label} output (rendered)", | |
| min_width=100, | |
| **out_component_kwargs, | |
| ) | |
| # ✅ debug output (toujours textbox, JSON snapshot) | |
| out = gr.Textbox( | |
| label=f"{label} output (debug snapshot JSON)", | |
| interactive=False, | |
| min_width=100, | |
| ) | |
| io = gr.Markdown( | |
| value="", | |
| elem_id=dom_io, | |
| elem_classes="card-port-status", | |
| ) | |
| with gr.Group(): | |
| with gr.Accordion(f"{label} Status", open=False): | |
| status = gr.Textbox( | |
| label=f"{label} status", | |
| interactive=False, | |
| min_width=100, | |
| elem_classes="node-status-content", | |
| ) | |
| with gr.Group(): | |
| with gr.Row(elem_classes="history-group-row"): | |
| hist = gr.Dropdown( | |
| label=None, | |
| choices=["(empty)"], | |
| value="(empty)", | |
| min_width=100, | |
| elem_classes="history-dropdown", | |
| ) | |
| left_btn = gr.Button("←", elem_classes="hist-nav-btn") | |
| right_btn = gr.Button("→", elem_classes="hist-nav-btn") | |
| restore_btn = gr.Button( | |
| f"Restore {label}", | |
| elem_classes="restore-selected-history-btn", | |
| visible=False, | |
| ) | |
| return { | |
| "status": status, | |
| "io": io, | |
| "out_view": out_view, # ✅ NEW | |
| "out": out, # debug JSON | |
| "hist": hist, | |
| "run_fresh": run_fresh, | |
| "run_stale": run_stale, | |
| "run_fix": run_fix, | |
| "restore": restore_btn, | |
| "left": left_btn, | |
| "right": right_btn, | |
| } | |
| def input_card( | |
| key: str, | |
| default_value, | |
| *, | |
| badge_position: str = "below", | |
| min_width: int = 100, | |
| component=None, | |
| component_kwargs=None, | |
| ): | |
| label = ui_input(key) | |
| dom_id = f"dot_{key}" | |
| component_kwargs = dict(component_kwargs or {}) | |
| component_kwargs.setdefault("interactive", True) | |
| with gr.Column(elem_classes="input-card-node"): | |
| if badge_position != "below": | |
| badge = gr.Markdown( | |
| value="", | |
| elem_id=dom_id, | |
| elem_classes=["card-port-status", "input-port-status", f"badge-pos-{badge_position}"], | |
| ) | |
| # ✅ IMPORTANT: Audio has no default value | |
| if component is gr.Audio: | |
| tb = component( | |
| label=label, | |
| min_width=min_width, | |
| **component_kwargs, | |
| ) | |
| else: | |
| tb = component( | |
| label=label, | |
| value=default_value, | |
| min_width=min_width, | |
| **component_kwargs, | |
| ) | |
| if badge_position == "below": | |
| badge = gr.Markdown( | |
| value="", | |
| elem_id=dom_id, | |
| elem_classes=["card-port-status", "input-port-status", f"badge-pos-{badge_position}"], | |
| ) | |
| apply = gr.Button(f"Apply {label}", elem_classes="apply-input-btn", visible=True) | |
| return { | |
| "tb": tb, | |
| "dot": badge, | |
| "apply": apply, | |
| } | |
| # ----------------------------------------------------------------------------- | |
| # Construction Gradio (layout + wiring) | |
| # ----------------------------------------------------------------------------- | |
| CUSTOM_SPEC: dict | None = None | |
| CUSTOM_SPEC_PREVIEW: dict | None = None | |
| import importlib | |
| ALLOWED_IMPORTS = {"time", "re", "math", "json", "hashlib", "random", "datetime"} | |
| def _needed_imports_from_source(src: str, allowed: set[str]) -> list[str]: | |
| """ | |
| Heuristic: | |
| - Collect top-level module names used in the function body (Name nodes in Load context) | |
| - Keep only those in `allowed` | |
| - Return sorted list | |
| """ | |
| src = textwrap.dedent(src) | |
| try: | |
| tree = ast.parse(src) | |
| except SyntaxError: | |
| return [] | |
| used: set[str] = set() | |
| class V(ast.NodeVisitor): | |
| def visit_Name(self, node: ast.Name): | |
| if isinstance(node.ctx, ast.Load): | |
| used.add(node.id) | |
| self.generic_visit(node) | |
| def visit_Attribute(self, node: ast.Attribute): | |
| # time.sleep -> base "time" is a Name, captured above anyway | |
| self.generic_visit(node) | |
| V().visit(tree) | |
| # Only keep allowed module names | |
| mods = sorted(m for m in used if m in allowed) | |
| return mods | |
| def preset_to_python_code(preset_name: str) -> str: | |
| fn = PIPELINE_PRESETS.get(preset_name) | |
| if not fn: | |
| return f"# Unknown preset: {preset_name}\n" | |
| try: | |
| # Build spec for graph map (optional) | |
| spec = fn() | |
| graph_md = graph_map_md_simple_from_spec(spec) | |
| graph_comment = "\n".join( | |
| f"# {line}" if line.strip() else "#" | |
| for line in graph_md.strip().splitlines() | |
| ) | |
| src = inspect.getsource(fn) | |
| fn_name = fn.__name__ | |
| # ✅ minimal imports inferred from the preset function source | |
| needed = _needed_imports_from_source(src, ALLOWED_IMPORTS) | |
| import_block = "" | |
| if needed: | |
| import_block = "\n".join(f"import {m}" for m in needed) + "\n" | |
| # ✅ one-line reminder | |
| allowed_line = "# Allowed imports: " + ", ".join(sorted(ALLOWED_IMPORTS)) | |
| return ( | |
| "# Preset pipeline (read-only)\n" | |
| f"# name: {preset_name}\n\n" | |
| "# Graph map\n" | |
| f"{graph_comment}\n\n" | |
| f"{allowed_line}\n" | |
| f"{import_block}\n" | |
| f"{src}\n\n" | |
| "# Entry point expected by the UI\n" | |
| "def make_pipeline():\n" | |
| f" return {fn_name}()\n" | |
| ) | |
| except Exception as e: | |
| return ( | |
| f"# Could not retrieve source for preset: {preset_name}\n" | |
| f"# {e}\n" | |
| ) | |
| def make_limited_import(allowed: set[str]): | |
| def _limited_import(name, globals=None, locals=None, fromlist=(), level=0): | |
| # pas d'imports relatifs | |
| if level and level != 0: | |
| raise ImportError("Relative imports are not allowed") | |
| root = name.split(".", 1)[0] | |
| if root not in allowed: | |
| raise ImportError(f"Import '{root}' is not allowed") | |
| return importlib.import_module(name) | |
| return _limited_import | |
| def ui_load_custom_python(code: str, key: int): | |
| global CUSTOM_SPEC | |
| safe_builtins = { | |
| "str": str, "int": int, "float": float, "bool": bool, | |
| "dict": dict, "list": list, "tuple": tuple, "set": set, | |
| "len": len, "range": range, "min": min, "max": max, "sum": sum, | |
| "enumerate": enumerate, "zip": zip, "sorted": sorted, | |
| "repr": repr, "print": print, | |
| "Exception": Exception, | |
| "ValueError": ValueError, "TypeError": TypeError, "RuntimeError": RuntimeError, | |
| "__import__": make_limited_import(ALLOWED_IMPORTS), | |
| } | |
| env = { | |
| "__builtins__": safe_builtins, | |
| "Pipeline": Pipeline, | |
| "gr": gr, | |
| } | |
| try: | |
| exec(code, env, env) # ✅ IMPORTANT | |
| make = env.get("make_pipeline") | |
| if not callable(make): | |
| raise ValueError("Missing function: make_pipeline()") | |
| spec = make() | |
| if not isinstance(spec, dict) or "layout" not in spec: | |
| raise ValueError("make_pipeline() must return a spec dict with key 'layout'") | |
| CUSTOM_SPEC = spec | |
| return gr.update(value="✅ Loaded custom Python pipeline"), key + 1 | |
| except Exception as e: | |
| return gr.update(value=f"❌ Load failed: {e}"), key | |
| def ui_load_custom_json(code: str, key: int): | |
| global CUSTOM_SPEC | |
| try: | |
| spec = json.loads(code) | |
| if not isinstance(spec, dict) or "layout" not in spec: | |
| raise ValueError("Spec must be a dict with key 'layout'") | |
| CUSTOM_SPEC = spec | |
| return gr.update(value="✅ Loaded custom JSON pipeline"), key + 1 | |
| except Exception as e: | |
| return gr.update(value=f"❌ Load failed: {e}"), key | |
| def ui_load_dispatch(src: str, py_code: str, key: int, preset_name: str): | |
| if src != "custom_python": | |
| # en preset : tu peux soit no-op, soit forcer reload preset | |
| return gr.update(value="(preset mode: nothing to load)"), key | |
| # custom_python : compile/exec puis set CUSTOM_SPEC | |
| msg_upd, new_key = ui_load_custom_python(py_code, key) | |
| # option UX : si load OK, tu veux que le render se fasse sur CUSTOM_SPEC | |
| return msg_upd, new_key | |
| # --- BOOTSTRAP: always have a valid pipeline in globals before first render --- | |
| set_pipeline_from_spec(PIPELINE_PRESETS[DEFAULT_PIPELINE_PRESET]()) | |
| DEFAULT_CODE = preset_to_python_code(DEFAULT_PIPELINE_PRESET) | |
| with gr.Blocks(title="Mini Daggr UI Lab") as demo: | |
| gr.Markdown("# Mini Daggr UI Lab (Clean)\n") | |
| # ============================================================ | |
| # 1) STABLE TOP BAR (safe outside render) | |
| # ============================================================ | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("# Pipeline creation") | |
| pipeline_source = gr.Radio( | |
| choices=["preset", "custom_python"], | |
| value="preset", | |
| label="Pipeline source", | |
| ) | |
| pipeline_choice = gr.Dropdown( | |
| choices=list(PIPELINE_PRESETS.keys()), | |
| value=DEFAULT_PIPELINE_PRESET, | |
| label="Pipeline preset", | |
| interactive=True, | |
| ) | |
| # Graph map preview (top) - updates when user explores presets/custom, | |
| # but does NOT control the rendered pipeline subtree. | |
| _default_spec_for_map = PIPELINE_PRESETS[DEFAULT_PIPELINE_PRESET]() | |
| graph_map_top = gr.Markdown(graph_map_md_simple_from_spec(_default_spec_for_map)) | |
| # Buttons: user decides when to actually load into the app | |
| load_preset_btn = gr.Button("Load preset", visible=True) | |
| insert_template_btn = gr.Button("Insert custom template", visible=False) | |
| preview_custom_btn = gr.Button("Preview graph", visible=False) | |
| preview_valid = gr.State(False) | |
| load_custom_btn = gr.Button("Load custom pipeline", visible=False, interactive=False) | |
| load_msg = gr.Textbox(label="Load status", interactive=False) | |
| # States used to drive the dynamic subtree. | |
| # IMPORTANT: these change ONLY on button click (and on demo.load auto-commit). | |
| render_key = gr.State(0) | |
| active_preset = gr.State(DEFAULT_PIPELINE_PRESET) | |
| active_src = gr.State("preset") | |
| startup_timer = gr.Timer(value=0.05, active=True) | |
| with gr.Column(scale=2): | |
| code_tabs = gr.Tabs() | |
| with code_tabs: | |
| with gr.TabItem("Preset Pipeline", id="preset_tab"): | |
| preset_py = gr.Code( | |
| language="python", | |
| label="Preset pipeline (read-only)", | |
| interactive=False, | |
| value=DEFAULT_CODE, | |
| lines=30, | |
| max_lines=30, | |
| elem_classes=["codebox"], | |
| ) | |
| with gr.TabItem("Custom Pipeline", id="custom_tab"): | |
| custom_py = gr.Code( | |
| language="python", | |
| label="Custom pipeline (editable)", | |
| interactive=True, | |
| value=DEFAULT_CODE, | |
| lines=30, | |
| max_lines=30, | |
| elem_classes=["codebox"], | |
| ) | |
| # ============================================================ | |
| # Source toggle (stable): tabs + button visibility + preview | |
| # (does NOT trigger render) | |
| # ============================================================ | |
| def ui_toggle_source(src: str, preset_name: str): | |
| global CUSTOM_SPEC_PREVIEW | |
| if src == "preset": | |
| spec = PIPELINE_PRESETS[preset_name]() | |
| # reset preview cache + gating | |
| CUSTOM_SPEC_PREVIEW = None | |
| return ( | |
| gr.update(selected="preset_tab"), | |
| gr.update(interactive=True), # pipeline_choice | |
| gr.update(visible=True), # load_preset_btn | |
| gr.update(visible=False), # load_custom_btn (hidden) | |
| gr.update(visible=False), # insert_template_btn (hidden) | |
| gr.update(visible=False), # preview_custom_btn (hidden) | |
| gr.update(value=""), # load_msg | |
| gr.update(value=graph_map_md_simple_from_spec(spec)), | |
| False, # preview_valid reset | |
| gr.update(interactive=False), # load_custom_btn interactive gate | |
| ) | |
| # --- custom_python mode --- | |
| spec = CUSTOM_SPEC or PIPELINE_PRESETS[preset_name]() | |
| return ( | |
| gr.update(selected="custom_tab"), | |
| gr.update(interactive=False), # pipeline_choice locked | |
| gr.update(visible=False), # load_preset_btn hidden | |
| gr.update(visible=True), # load_custom_btn shown | |
| gr.update(visible=True), # insert_template_btn shown | |
| gr.update(visible=True), # preview_custom_btn shown | |
| gr.update(value="Custom mode: edit code, then blur or click 'Preview graph'."), # load_msg | |
| gr.update(value=graph_map_md_simple_from_spec(spec)), | |
| False, # preview_valid reset on entry | |
| gr.update(interactive=False), # load_custom_btn disabled until preview OK | |
| ) | |
| pipeline_source.change( | |
| ui_toggle_source, | |
| inputs=[pipeline_source, pipeline_choice], | |
| outputs=[ | |
| code_tabs, | |
| pipeline_choice, | |
| load_preset_btn, | |
| load_custom_btn, | |
| insert_template_btn, | |
| preview_custom_btn, | |
| load_msg, | |
| graph_map_top, | |
| preview_valid, | |
| load_custom_btn, | |
| ], | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| # ============================================================ | |
| # Preset selection: update code + preview only (NO RENDER) | |
| # ============================================================ | |
| def ui_on_preset_change(src: str, preset_name: str): | |
| code = preset_to_python_code(preset_name) | |
| spec = PIPELINE_PRESETS[preset_name]() | |
| preset_upd = gr.update(value=code) | |
| if src == "preset": | |
| custom_upd = gr.update(value=code) # keep custom in sync ONLY in preset mode | |
| graph_upd = gr.update(value=graph_map_md_simple_from_spec(spec)) | |
| tabs_upd = gr.update(selected="preset_tab") | |
| else: | |
| custom_upd = gr.update() # don't overwrite user edits | |
| spec2 = CUSTOM_SPEC or spec | |
| graph_upd = gr.update(value=graph_map_md_simple_from_spec(spec2)) | |
| tabs_upd = gr.update(selected="custom_tab") | |
| return tabs_upd, preset_upd, custom_upd, graph_upd | |
| pipeline_choice.change( | |
| ui_on_preset_change, | |
| inputs=[pipeline_source, pipeline_choice], | |
| outputs=[code_tabs, preset_py, custom_py, graph_map_top], | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| def compile_custom_pipeline_to_spec(code: str) -> dict: | |
| safe_builtins = { | |
| "str": str, "int": int, "float": float, "bool": bool, | |
| "dict": dict, "list": list, "tuple": tuple, "set": set, | |
| "len": len, "range": range, "min": min, "max": max, "sum": sum, | |
| "enumerate": enumerate, "zip": zip, "sorted": sorted, | |
| "repr": repr, "print": print, | |
| "Exception": Exception, | |
| "ValueError": ValueError, "TypeError": TypeError, "RuntimeError": RuntimeError, | |
| "__import__": make_limited_import(ALLOWED_IMPORTS), | |
| } | |
| env = { | |
| "__builtins__": safe_builtins, | |
| "Pipeline": Pipeline, | |
| "gr": gr, | |
| } | |
| exec(code, env, env) | |
| make = env.get("make_pipeline") | |
| if not callable(make): | |
| raise ValueError("Missing function: make_pipeline()") | |
| spec = make() | |
| if not isinstance(spec, dict) or "layout" not in spec: | |
| raise ValueError("make_pipeline() must return a spec dict with key 'layout'") | |
| return spec | |
| def ui_preview_custom(py_code: str): | |
| global CUSTOM_SPEC_PREVIEW | |
| try: | |
| spec = compile_custom_pipeline_to_spec(py_code) | |
| md = graph_map_md_simple_from_spec(spec) | |
| CUSTOM_SPEC_PREVIEW = spec | |
| return ( | |
| gr.update(value="✅ Preview OK"), | |
| gr.update(value=md), | |
| True, | |
| gr.update(interactive=True), # load_custom_btn | |
| ) | |
| except Exception as e: | |
| CUSTOM_SPEC_PREVIEW = None | |
| return ( | |
| gr.update(value=f"❌ Preview failed: {e}"), | |
| gr.update(value=f"## Graph map\n\n❌ `{e}`"), | |
| False, | |
| gr.update(interactive=False), # disable load button | |
| ) | |
| preview_custom_btn.click( | |
| ui_preview_custom, | |
| inputs=[custom_py], | |
| outputs=[load_msg, graph_map_top, preview_valid, load_custom_btn], | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| custom_py.blur( | |
| ui_preview_custom, | |
| inputs=[custom_py], | |
| outputs=[load_msg, graph_map_top, preview_valid, load_custom_btn], | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| # ============================================================ | |
| # LOAD ACTIONS (ONLY these trigger render) | |
| # ============================================================ | |
| def ui_load_preset(preset_name: str, key: int): | |
| return ( | |
| gr.update(value=f"✅ Loaded preset: {preset_name}"), | |
| preset_name, # active_preset | |
| "preset", # active_src | |
| key + 1, # render_key bump | |
| ) | |
| load_preset_btn.click( | |
| ui_load_preset, | |
| inputs=[pipeline_choice, render_key], | |
| outputs=[load_msg, active_preset, active_src, render_key], | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| def ui_load_custom(py_code: str, key: int, preset_name: str): | |
| global CUSTOM_SPEC, CUSTOM_SPEC_PREVIEW | |
| if CUSTOM_SPEC_PREVIEW is None: | |
| return ( | |
| gr.update(value="❌ Load refused: run 'Preview graph' successfully first."), | |
| gr.update(), # graph_map_top unchanged | |
| preset_name, | |
| "custom_python", | |
| key, # no bump | |
| ) | |
| CUSTOM_SPEC = CUSTOM_SPEC_PREVIEW | |
| # graph already generated => keep it, or regenerate from CUSTOM_SPEC | |
| graph_upd = gr.update(value=graph_map_md_simple_from_spec(CUSTOM_SPEC)) | |
| return ( | |
| gr.update(value="✅ Loaded custom pipeline (from preview)"), | |
| graph_upd, | |
| preset_name, | |
| "custom_python", | |
| key + 1, | |
| ) | |
| load_custom_btn.click( | |
| ui_load_custom, | |
| inputs=[custom_py, render_key, pipeline_choice], | |
| outputs=[load_msg, graph_map_top, active_preset, active_src, render_key], | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| def ui_boot_default(preset_name: str, key: int): | |
| # on réutilise la même logique que le bouton "Load preset" | |
| msg, ap, asrc, new_key = ui_load_preset(preset_name, key) | |
| # on désactive le timer => one-shot | |
| return msg, ap, asrc, new_key, gr.update(active=False) | |
| startup_timer.tick( | |
| ui_boot_default, | |
| inputs=[pipeline_choice, render_key], | |
| outputs=[load_msg, active_preset, active_src, render_key, startup_timer], | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| def ui_insert_custom_template_and_validate(): | |
| code = CUSTOM_TEMPLATE | |
| # Reuse the exact same validation path as Preview graph / blur | |
| msg_upd, graph_upd, ok, load_btn_upd = ui_preview_custom(code) | |
| return ( | |
| gr.update(value=code), # custom_py | |
| msg_upd, # load_msg | |
| graph_upd, # graph_map_top | |
| ok, # preview_valid | |
| load_btn_upd, # load_custom_btn (interactive gate) | |
| ) | |
| insert_template_btn.click( | |
| ui_insert_custom_template_and_validate, | |
| inputs=[], | |
| outputs=[custom_py, load_msg, graph_map_top, preview_valid, load_custom_btn], | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| # ============================================================ | |
| # DYNAMIC SUBTREE | |
| # - NOT triggered by dropdown change | |
| # - Triggered ONLY by: | |
| # - preset_load_ev | |
| # - custom_load_ev | |
| # - bootstrap_ev | |
| # ============================================================ | |
| def render_app(preset_name: str, src: str, _key: int): | |
| # ---- choose spec ---- | |
| if src == "preset": | |
| spec = PIPELINE_PRESETS[preset_name]() | |
| else: | |
| spec = CUSTOM_SPEC or PIPELINE_PRESETS[preset_name]() | |
| # ---- rebuild globals (G/CTRL/S/layout/orders/labels/components) ---- | |
| set_pipeline_from_spec(spec) | |
| # ---- one-shot init trigger: runs once after this subtree mounts ---- | |
| init_timer = gr.Timer(value=0.05, active=True) | |
| # ============================================================ | |
| # 2.b) PIPELINE UI | |
| # ============================================================ | |
| gr.Markdown("## Pipeline") | |
| input_ui: dict[str, dict] = {} | |
| node_ui: dict[str, dict] = {} | |
| with gr.Row(elem_id="pipeline"): | |
| for col in PIPELINE_LAYOUT: | |
| with gr.Column(): | |
| for item in col: | |
| if item["kind"] == "input": | |
| input_ui[item["name"]] = input_card( | |
| item["name"], | |
| item.get("default", ""), | |
| badge_position=item.get("badge_position", "below"), | |
| min_width=item.get("min_width", 100), | |
| component=item.get("component"), | |
| component_kwargs=item.get("component_kwargs"), | |
| ) | |
| elif item["kind"] == "node": | |
| node_ui[item["name"]] = node_card( | |
| item["name"], | |
| out_component=item.get("out_component"), | |
| out_component_kwargs=item.get("out_component_kwargs"), | |
| ) | |
| else: | |
| raise ValueError(f"Unknown kind: {item['kind']}") | |
| # ============================================================ | |
| # 2.a) PRE-PANEL | |
| # ============================================================ | |
| with gr.Row(elem_id="pre-pipe-panel"): | |
| with gr.Column(): | |
| gr.Markdown("## Graph settings") | |
| settings_tabs = gr.Tabs(selected="compute_tab") | |
| with settings_tabs: | |
| with gr.TabItem("Compute", id="compute_tab"): | |
| auto_up = gr.Checkbox(value=True, label="Auto-run upstream (Daggr-like)", interactive=True) | |
| force_up = gr.Checkbox(value=False, label="Force upstream recompute", interactive=True) | |
| force_down = gr.Checkbox(value=False, label="Force downstream recompute", interactive=True) | |
| force_topo = gr.Checkbox(value=True, label="Force order: topo (recommended)", interactive=True) | |
| with gr.TabItem("Snapshots", id="snapshots_tab"): | |
| reuse_identical = gr.Checkbox( | |
| value=False, | |
| label="Reuse identical input values (deduplicate input history)", | |
| ) | |
| skip_if_unchanged = gr.Checkbox( | |
| value=False, | |
| label="Don't run node if nothing changes upstream (no-op run)", | |
| interactive=True, | |
| ) | |
| on_error = gr.Radio( | |
| choices=["no_snapshot", "snapshot"], | |
| value="no_snapshot", | |
| label="On node error", | |
| interactive=True, | |
| ) | |
| with gr.TabItem("Restore", id="restore_tab"): | |
| restore_mode = gr.Radio( | |
| choices=[ | |
| "node_only", | |
| "node+controls", | |
| "node+controls+upstream", | |
| "node+controls+upstream+downstream_match", | |
| ], | |
| value="node+controls+upstream+downstream_match", | |
| label="Restore mode", | |
| interactive=True, | |
| ) | |
| reconcile = gr.Checkbox(value=False, label="Restore: reconcile", interactive=True) | |
| with gr.Column(): | |
| gr.Markdown("## Display settings") | |
| with gr.Row(): | |
| badge_label_mode = gr.Radio( | |
| choices=["ports", "nodes"], | |
| value=BADGE_LABEL_MODE, | |
| label="Badge labels", | |
| interactive=True, | |
| ) | |
| wire_color_mode = gr.Radio( | |
| choices=["rich", "daggr"], | |
| value=WIRE_COLOR_MODE, | |
| label="Wire color mode", | |
| interactive=True, | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("## Debug panels") | |
| global_panels_tabs = gr.Tabs() | |
| with global_panels_tabs: | |
| with gr.TabItem("Nodes Summary", id="nodes_summary_tab"): | |
| nodes_summary = gr.Textbox(label="Nodes summary", lines=8, max_lines=8, interactive=False) | |
| with gr.TabItem("Edges Summary", id="edges_summary_tab"): | |
| edges_summary = gr.Textbox(label="Wires (detailed why)", lines=8, max_lines=8, interactive=False) | |
| with gr.TabItem("Debug state"): | |
| debug = gr.Code(label="Debug state", language="json", lines=14, max_lines=14) | |
| reset_btn = gr.Button("Reset lab") | |
| msg = gr.Textbox(label="Message", interactive=False) | |
| # ============================================================ | |
| # 2.c) OUTPUTS (MUST match render_all() exactly) | |
| # ============================================================ | |
| OUTPUTS: list[Any] = [] | |
| for cname in INPUT_ORDER: | |
| OUTPUTS += [input_ui[cname]["dot"], input_ui[cname]["tb"]] | |
| OUTPUTS += [msg, nodes_summary, edges_summary, debug] | |
| for n in NODE_ORDER_UI: | |
| ui = node_ui[n] | |
| OUTPUTS += [ | |
| ui["status"], | |
| ui["io"], | |
| ui["out_view"], | |
| ui["out"], | |
| ui["hist"], | |
| ui["left"], | |
| ui["right"], | |
| ui["run_fresh"], | |
| ui["run_stale"], | |
| ui["run_fix"], | |
| ] | |
| # ============================================================ | |
| # 2.d) EVENT WIRING (inside subtree => no stacking) | |
| # ============================================================ | |
| ALL_TBS = [input_ui[c]["tb"] for c in INPUT_ORDER] | |
| run_inputs = [auto_up, force_up, force_down, force_topo, on_error, skip_if_unchanged, *ALL_TBS] | |
| reset_btn.click(ui_reset, inputs=ALL_TBS, outputs=OUTPUTS, show_progress="hidden", queue=False) | |
| for cname in INPUT_ORDER: | |
| tb = input_ui[cname]["tb"] | |
| apply = input_ui[cname]["apply"] | |
| def _commit(v, cn=cname): | |
| return ui_set_input(cn, v) | |
| apply.click(_commit, inputs=[tb], outputs=OUTPUTS, show_progress="hidden", queue=False) | |
| if hasattr(tb, "blur"): | |
| tb.blur(_commit, inputs=[tb], outputs=OUTPUTS, show_progress="hidden", queue=False) | |
| if hasattr(tb, "submit"): | |
| tb.submit(_commit, inputs=[tb], outputs=OUTPUTS, show_progress="hidden", queue=False) | |
| else: | |
| for ev in ("upload", "stop_recording", "clear"): | |
| if hasattr(tb, ev): | |
| getattr(tb, ev)(_commit, inputs=[tb], outputs=OUTPUTS, show_progress="hidden", queue=False) | |
| for node_name in NODE_ORDER_UI: | |
| fn = ui_run_node(node_name) | |
| ui = node_ui[node_name] | |
| ui["run_fresh"].click(fn, inputs=run_inputs, outputs=OUTPUTS, show_progress="hidden") | |
| ui["run_stale"].click(fn, inputs=run_inputs, outputs=OUTPUTS, show_progress="hidden") | |
| ui["run_fix"].click(fn, inputs=run_inputs, outputs=OUTPUTS, show_progress="hidden") | |
| for node_name in NODE_ORDER_UI: | |
| ui = node_ui[node_name] | |
| hist = ui["hist"] | |
| cb = hist.blur if hasattr(hist, "blur") else hist.change | |
| cb( | |
| lambda idx, m, r, nn=node_name: ui_restore(nn, idx, m, r), | |
| inputs=[hist, restore_mode, reconcile], | |
| outputs=OUTPUTS, | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| ui["left"].click( | |
| lambda m, r, nn=node_name: ui_nav_hist(nn, -1, m, r), | |
| inputs=[restore_mode, reconcile], | |
| outputs=OUTPUTS, | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| ui["right"].click( | |
| lambda m, r, nn=node_name: ui_nav_hist(nn, +1, m, r), | |
| inputs=[restore_mode, reconcile], | |
| outputs=OUTPUTS, | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| badge_label_mode.change( | |
| ui_set_badge_label_mode, | |
| inputs=[badge_label_mode], | |
| outputs=OUTPUTS, | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| wire_color_mode.change( | |
| ui_set_wire_color_mode, | |
| inputs=[wire_color_mode], | |
| outputs=OUTPUTS, | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| defaults = [ | |
| next( | |
| it["default"] | |
| for col in PIPELINE_LAYOUT | |
| for it in col | |
| if it["kind"] == "input" and it["name"] == cname | |
| ) | |
| for cname in INPUT_ORDER | |
| ] | |
| def ui_init_once(): | |
| payload = ui_reset(*defaults) | |
| return (*payload, gr.update(active=False)) | |
| init_timer.tick( | |
| ui_init_once, | |
| outputs=OUTPUTS + [init_timer], | |
| show_progress="hidden", | |
| queue=False, | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(css=CSS, ssr_mode=False) |