| | import os
|
| | import json
|
| | import re
|
| | import uuid
|
| | import ast
|
| | import time
|
| | from dataclasses import dataclass, field
|
| | from typing import Any, Dict, List, Optional, Literal, Tuple
|
| |
|
| | import nodes
|
| | from server import PromptServer
|
| |
|
| |
|
| |
|
| | _WORKFLOW_JSON_PATH = "/ComfyUI//custom_nodes/comfyui-salia_online/assets/workflow_1.json"
|
| |
|
| |
|
| | MARKER_PREFIX = "{{{VAR=="
|
| | MARKER_SUFFIX = "==/VAR}}}"
|
| | VAR_ID_RE = re.compile(r"^(INT|FLOAT|STR)_(\d+)$", re.IGNORECASE)
|
| |
|
| |
|
| | @dataclass
|
| | class VarOccurrence:
|
| | node_id: str
|
| | input_key: str
|
| |
|
| |
|
| | @dataclass
|
| | class VarSpec:
|
| | var_id: str
|
| | var_type: Literal["INT", "FLOAT", "STR"]
|
| | min_val: Optional[float] = None
|
| | max_val: Optional[float] = None
|
| | max_len: Optional[int] = None
|
| | occurrences: List[VarOccurrence] = field(default_factory=list)
|
| | default_value: Any = None
|
| |
|
| |
|
| | def _load_prompt_json_fixed() -> Dict[str, Any]:
|
| | """
|
| | Loads workflow JSON from the fixed path.
|
| |
|
| | Accepts either:
|
| | { ...prompt dict... }
|
| | or:
|
| | {"prompt": { ...prompt dict... }}
|
| | """
|
| | with open(_WORKFLOW_JSON_PATH, "r", encoding="utf-8") as f:
|
| | data = json.load(f)
|
| |
|
| |
|
| | if isinstance(data, dict) and "prompt" in data and isinstance(data["prompt"], dict):
|
| | data = data["prompt"]
|
| |
|
| | if not isinstance(data, dict):
|
| | raise ValueError("Workflow JSON must be a dict mapping node_id -> node_info (ComfyUI prompt format).")
|
| |
|
| | return data
|
| |
|
| |
|
| | def _extract_marker_text(
|
| | text: str,
|
| | *,
|
| | allow_empty_actual_key: bool = False
|
| | ) -> Optional[Tuple[str, str, Optional[float], Optional[float], Optional[int], str]]:
|
| | """
|
| | Returns (var_id, var_type, min_val, max_val, max_len, actual_key) if text starts with a marker.
|
| |
|
| | For input-key markers, actual_key is required, e.g.:
|
| | "{{{VAR==STR_3, MAXLEN==80==/VAR}}}value"
|
| |
|
| | For meta-title markers, actual_key may be empty, e.g.:
|
| | "{{{VAR==STR_3, MAXLEN==80==/VAR}}}"
|
| | """
|
| | if not isinstance(text, str):
|
| | return None
|
| | if not text.startswith(MARKER_PREFIX):
|
| | return None
|
| |
|
| | end = text.find(MARKER_SUFFIX)
|
| | if end == -1:
|
| | return None
|
| |
|
| | inner = text[len(MARKER_PREFIX):end]
|
| | actual_key = text[end + len(MARKER_SUFFIX):]
|
| |
|
| | if not actual_key and not allow_empty_actual_key:
|
| | raise ValueError("Marker key missing actual input name after marker (e.g. ...}}}seed).")
|
| |
|
| | parts = [p.strip() for p in inner.split(",") if p.strip()]
|
| | if not parts:
|
| | raise ValueError("Empty VAR marker.")
|
| |
|
| | var_id = parts[0].strip().upper()
|
| | m = VAR_ID_RE.match(var_id)
|
| | if not m:
|
| | raise ValueError(f"Invalid VAR id '{var_id}'. Use INT_1 / STR_2 / FLOAT_3 ...")
|
| |
|
| | var_type = m.group(1).upper()
|
| |
|
| | constraints: Dict[str, str] = {}
|
| | for p in parts[1:]:
|
| | if "==" not in p:
|
| | raise ValueError(f"Invalid constraint '{p}' in marker for {var_id}. Use KEY==VALUE.")
|
| | k, v = p.split("==", 1)
|
| | constraints[k.strip().upper()] = v.strip()
|
| |
|
| | min_val = max_val = None
|
| | max_len = None
|
| |
|
| | if var_type in ("INT", "FLOAT"):
|
| |
|
| | if "MIN" not in constraints or "MAX" not in constraints:
|
| | raise ValueError(f"{var_id} missing MIN==... and/or MAX==... in marker.")
|
| | try:
|
| | min_val = float(constraints["MIN"])
|
| | max_val = float(constraints["MAX"])
|
| | except Exception:
|
| | raise ValueError(f"{var_id} has non-numeric MIN/MAX in marker.")
|
| | if min_val > max_val:
|
| | raise ValueError(f"{var_id} has MIN > MAX in marker.")
|
| | else:
|
| |
|
| | for k in ("MAXLEN", "MAX_CHARS", "MAXCHARS", "MAX"):
|
| | if k in constraints:
|
| | try:
|
| | max_len = int(constraints[k])
|
| | except Exception:
|
| | raise ValueError(f"{var_id} has non-integer {k} in marker.")
|
| | break
|
| | if max_len is None:
|
| | raise ValueError(f"{var_id} missing string max length (MAXLEN==... or MAX==...) in marker.")
|
| | if max_len < 0:
|
| | raise ValueError(f"{var_id} has negative max length in marker.")
|
| |
|
| | return (var_id, var_type, min_val, max_val, max_len, actual_key)
|
| |
|
| |
|
| | def _extract_marker(key: str) -> Optional[Tuple[str, str, Optional[float], Optional[float], Optional[int], str]]:
|
| | """
|
| | Marker extraction for INPUT KEYS (requires an actual key after the marker).
|
| | """
|
| | return _extract_marker_text(key, allow_empty_actual_key=False)
|
| |
|
| |
|
| | def _default_target_input_key(inputs: Dict[str, Any]) -> str:
|
| | """
|
| | For meta-title markers (marker has no trailing 'actual_key'), choose which input key to override.
|
| |
|
| | Rule:
|
| | 1) If "value" exists in inputs -> use it (matches your example and common ComfyUI string/int/float nodes).
|
| | 2) Else if inputs has exactly one key -> use it.
|
| | 3) Else error (ambiguous).
|
| | """
|
| | if "value" in inputs:
|
| | return "value"
|
| | if len(inputs) == 1:
|
| | return next(iter(inputs.keys()))
|
| | raise ValueError(
|
| | "Meta marker found in _meta.title but cannot infer which input to override. "
|
| | "Add the marker to the input key instead ({{{...}}}value), or ensure the node has a 'value' input "
|
| | "or only a single input."
|
| | )
|
| |
|
| |
|
| | def _collect_and_strip_markers(prompt: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, VarSpec]]:
|
| | """
|
| | Collect markers from:
|
| | A) input keys:
|
| | "{{{VAR==INT_1,...==/VAR}}}seed" becomes "seed" in-memory
|
| | B) _meta.title:
|
| | "_meta": {"title": "{{{VAR==STR_3,...==/VAR}}}"} (title is NOT modified)
|
| | will bind STR_3 to an inferred input key (usually "value").
|
| | """
|
| | specs: Dict[str, VarSpec] = {}
|
| |
|
| | for node_id, node in prompt.items():
|
| | if not isinstance(node, dict):
|
| | continue
|
| |
|
| | inputs = node.get("inputs")
|
| | if not isinstance(inputs, dict):
|
| | continue
|
| |
|
| |
|
| | new_inputs: Dict[str, Any] = {}
|
| | for k, v in inputs.items():
|
| | marker = _extract_marker(k)
|
| | if marker is None:
|
| | new_inputs[k] = v
|
| | continue
|
| |
|
| | var_id, var_type, min_val, max_val, max_len, actual_key = marker
|
| | if actual_key in new_inputs:
|
| | raise ValueError(f"Conflict in node {node_id}: input '{actual_key}' already exists.")
|
| |
|
| |
|
| | new_inputs[actual_key] = v
|
| |
|
| | spec = specs.get(var_id)
|
| | if spec is None:
|
| | spec = VarSpec(
|
| | var_id=var_id,
|
| | var_type=var_type,
|
| | min_val=min_val,
|
| | max_val=max_val,
|
| | max_len=max_len,
|
| | default_value=v,
|
| | )
|
| | specs[var_id] = spec
|
| | else:
|
| | if (
|
| | spec.var_type != var_type
|
| | or spec.min_val != min_val
|
| | or spec.max_val != max_val
|
| | or spec.max_len != max_len
|
| | ):
|
| | raise ValueError(f"Inconsistent constraints for {var_id} across markers.")
|
| |
|
| | spec.occurrences.append(VarOccurrence(node_id=str(node_id), input_key=actual_key))
|
| |
|
| | node["inputs"] = new_inputs
|
| |
|
| |
|
| | meta = node.get("_meta")
|
| | if isinstance(meta, dict):
|
| | title = meta.get("title")
|
| | marker2 = _extract_marker_text(title, allow_empty_actual_key=True) if isinstance(title, str) else None
|
| | if marker2 is not None:
|
| | var_id, var_type, min_val, max_val, max_len, actual_key = marker2
|
| |
|
| |
|
| | target_key = actual_key.strip() if isinstance(actual_key, str) else ""
|
| | if not target_key:
|
| | target_key = _default_target_input_key(node["inputs"])
|
| |
|
| | if target_key not in node["inputs"]:
|
| | raise ValueError(
|
| | f"Meta marker in node {node_id} points to input '{target_key}', but that input does not exist."
|
| | )
|
| |
|
| | default_val = node["inputs"].get(target_key)
|
| |
|
| | spec = specs.get(var_id)
|
| | if spec is None:
|
| | spec = VarSpec(
|
| | var_id=var_id,
|
| | var_type=var_type,
|
| | min_val=min_val,
|
| | max_val=max_val,
|
| | max_len=max_len,
|
| | default_value=default_val,
|
| | )
|
| | specs[var_id] = spec
|
| | else:
|
| | if (
|
| | spec.var_type != var_type
|
| | or spec.min_val != min_val
|
| | or spec.max_val != max_val
|
| | or spec.max_len != max_len
|
| | ):
|
| | raise ValueError(f"Inconsistent constraints for {var_id} across markers.")
|
| |
|
| |
|
| | already = any(o.node_id == str(node_id) and o.input_key == target_key for o in spec.occurrences)
|
| | if not already:
|
| | spec.occurrences.append(VarOccurrence(node_id=str(node_id), input_key=target_key))
|
| |
|
| | return prompt, specs
|
| |
|
| |
|
| |
|
| |
|
| | def _split_command_entries(command: str) -> List[str]:
|
| | s = (command or "").strip()
|
| | if not s:
|
| | return []
|
| |
|
| | entries: List[str] = []
|
| | buf: List[str] = []
|
| | quote: Optional[str] = None
|
| | escape = False
|
| |
|
| | for ch in s:
|
| | if escape:
|
| | buf.append(ch)
|
| | escape = False
|
| | continue
|
| |
|
| | if ch == "\\":
|
| | buf.append(ch)
|
| | escape = True
|
| | continue
|
| |
|
| | if quote is not None:
|
| | buf.append(ch)
|
| | if ch == quote:
|
| | quote = None
|
| | continue
|
| |
|
| | if ch in ("'", '"'):
|
| | buf.append(ch)
|
| | quote = ch
|
| | continue
|
| |
|
| | if ch in (" ", "\t", "\n", "\r", ",", ";"):
|
| | token = "".join(buf).strip()
|
| | if token:
|
| | entries.append(token)
|
| | buf = []
|
| | continue
|
| |
|
| | buf.append(ch)
|
| |
|
| | token = "".join(buf).strip()
|
| | if token:
|
| | entries.append(token)
|
| |
|
| | return entries
|
| |
|
| |
|
| | def _parse_command(command: str) -> Dict[str, Any]:
|
| | out: Dict[str, Any] = {}
|
| |
|
| | for entry in _split_command_entries(command):
|
| | if "==" not in entry:
|
| | raise ValueError(f"Bad command entry '{entry}'. Expected VAR==VALUE.")
|
| |
|
| | var, raw = entry.split("==", 1)
|
| | var = var.strip().upper()
|
| | raw = raw.strip()
|
| |
|
| | if not VAR_ID_RE.match(var):
|
| | raise ValueError(f"Bad variable name '{var}'. Use INT_1 / STR_2 / FLOAT_3 ...")
|
| |
|
| | if raw == "":
|
| | raise ValueError(f"Missing value for {var}.")
|
| |
|
| | if raw[0] in ("'", '"'):
|
| | if len(raw) < 2 or raw[-1] != raw[0]:
|
| | raise ValueError(f"Unterminated quoted string for {var}.")
|
| | try:
|
| | val = ast.literal_eval(raw)
|
| | except Exception as e:
|
| | raise ValueError(f"Invalid quoted string for {var}: {e}")
|
| | else:
|
| | val = raw
|
| |
|
| | if var in out:
|
| | raise ValueError(f"Duplicate assignment for {var}.")
|
| | out[var] = val
|
| |
|
| | return out
|
| |
|
| |
|
| | def _convert_and_validate(var_id: str, spec: VarSpec, raw_val: Any) -> Any:
|
| | if spec.var_type == "INT":
|
| | try:
|
| | val = int(str(raw_val).strip())
|
| | except Exception:
|
| | raise ValueError(f"{var_id} must be an integer.")
|
| | if spec.min_val is not None and val < spec.min_val:
|
| | raise ValueError(f"{var_id} out of range: {val} < MIN {int(spec.min_val)}")
|
| | if spec.max_val is not None and val > spec.max_val:
|
| | raise ValueError(f"{var_id} out of range: {val} > MAX {int(spec.max_val)}")
|
| | return val
|
| |
|
| | if spec.var_type == "FLOAT":
|
| | try:
|
| | val = float(str(raw_val).strip())
|
| | except Exception:
|
| | raise ValueError(f"{var_id} must be a float.")
|
| | if spec.min_val is not None and val < spec.min_val:
|
| | raise ValueError(f"{var_id} out of range: {val} < MIN {spec.min_val}")
|
| | if spec.max_val is not None and val > spec.max_val:
|
| | raise ValueError(f"{var_id} out of range: {val} > MAX {spec.max_val}")
|
| | return val
|
| |
|
| | if spec.var_type == "STR":
|
| | val = str(raw_val)
|
| | if spec.max_len is not None and len(val) > spec.max_len:
|
| | raise ValueError(f"{var_id} too long: length {len(val)} > MAXLEN {spec.max_len}")
|
| | return val
|
| |
|
| | raise ValueError(f"Unsupported var type for {var_id}: {spec.var_type}")
|
| |
|
| |
|
| | def _apply_assignments(prompt: Dict[str, Any], specs: Dict[str, VarSpec], assigns: Dict[str, Any]) -> None:
|
| | for var in assigns:
|
| | if var not in specs:
|
| | raise ValueError(f"Command references {var} but no matching marker exists in the workflow JSON.")
|
| |
|
| | for var_id, spec in specs.items():
|
| | raw_val = assigns.get(var_id, spec.default_value)
|
| | val = _convert_and_validate(var_id, spec, raw_val)
|
| |
|
| | for occ in spec.occurrences:
|
| | prompt[occ.node_id]["inputs"][occ.input_key] = val
|
| |
|
| |
|
| | def _infer_outputs_to_execute(prompt: Dict[str, Any]) -> List[str]:
|
| | outputs: List[str] = []
|
| | for node_id, node in prompt.items():
|
| | if not isinstance(node, dict) or "class_type" not in node:
|
| | continue
|
| | class_type = node["class_type"]
|
| | cls = nodes.NODE_CLASS_MAPPINGS.get(class_type)
|
| | if cls is None:
|
| | raise ValueError(f"Unknown node class_type '{class_type}' (node {node_id}).")
|
| | if getattr(cls, "OUTPUT_NODE", False) is True:
|
| | outputs.append(str(node_id))
|
| | if not outputs:
|
| | raise ValueError("Loaded workflow has no OUTPUT_NODE nodes (e.g. SaveImage/Preview/etc).")
|
| | return outputs
|
| |
|
| |
|
| | def _queue_prompt(prompt: Dict[str, Any], outputs_to_execute: List[str]) -> Tuple[str, float]:
|
| | ps = PromptServer.instance
|
| | prompt_id = str(uuid.uuid4())
|
| |
|
| | if hasattr(ps, "number"):
|
| | number = float(ps.number)
|
| | ps.number += 1
|
| | else:
|
| | number = float(time.time() * 1000.0)
|
| |
|
| | extra_data: Dict[str, Any] = {}
|
| | extra_data["create_time"] = int(time.time() * 1000)
|
| |
|
| | sensitive: Dict[str, Any] = {}
|
| |
|
| | ps.prompt_queue.put((number, prompt_id, prompt, extra_data, outputs_to_execute, sensitive))
|
| | return prompt_id, number
|
| |
|
| |
|
| | def _float_is_no_input(v: Any) -> bool:
|
| | """
|
| | Special float sentinel rule:
|
| | If v is in [-2.1, -1.9], treat it as "not provided".
|
| | """
|
| | try:
|
| | f = float(v)
|
| | except Exception:
|
| | return False
|
| | return -2.1 <= f <= -1.9
|
| |
|
| |
|
| | class JSONRUNNER_X:
|
| | """
|
| | Loads the workflow JSON from a fixed path,
|
| | strips {{{VAR==...==/VAR}}} markers from INPUT KEYS,
|
| | ALSO supports marker stored in _meta.title (your requested behavior),
|
| | applies overrides from separate typed inputs,
|
| | and queues it like normal /prompt.
|
| |
|
| | Sentinel rules:
|
| | STR_* == "" -> ignore (act like not provided)
|
| | INT_* == -1 -> ignore
|
| | FLOAT_* in [-2.1, -1.9] -> ignore
|
| | """
|
| |
|
| | OUTPUT_NODE = True
|
| | CATEGORY = "utils/workflow"
|
| |
|
| | RETURN_TYPES = ("STRING",)
|
| | RETURN_NAMES = ("status",)
|
| | FUNCTION = "run"
|
| |
|
| | @classmethod
|
| | def INPUT_TYPES(cls):
|
| |
|
| |
|
| | JS_SAFE_INT_MAX = 9007199254740991
|
| |
|
| | return {
|
| | "required": {
|
| |
|
| | "STR_1": ("STRING", {"multiline": False, "default": ""}),
|
| | "STR_2": ("STRING", {"multiline": False, "default": ""}),
|
| | "STR_3": ("STRING", {"multiline": False, "default": ""}),
|
| | "STR_4": ("STRING", {"multiline": False, "default": ""}),
|
| | "STR_5": ("STRING", {"multiline": False, "default": ""}),
|
| | "STR_6": ("STRING", {"multiline": False, "default": ""}),
|
| | "STR_7": ("STRING", {"multiline": False, "default": ""}),
|
| |
|
| |
|
| | "INT_1": ("INT", {"default": -1, "min": -1, "max": JS_SAFE_INT_MAX}),
|
| | "INT_2": ("INT", {"default": -1, "min": -1, "max": JS_SAFE_INT_MAX}),
|
| | "INT_3": ("INT", {"default": -1, "min": -1, "max": JS_SAFE_INT_MAX}),
|
| | "INT_4": ("INT", {"default": -1, "min": -1, "max": JS_SAFE_INT_MAX}),
|
| | "INT_5": ("INT", {"default": -1, "min": -1, "max": JS_SAFE_INT_MAX}),
|
| |
|
| |
|
| | "FLOAT_1": ("FLOAT", {"default": -2.0, "min": -1.0e9, "max": 1.0e9, "step": 0.01}),
|
| | "FLOAT_2": ("FLOAT", {"default": -2.0, "min": -1.0e9, "max": 1.0e9, "step": 0.01}),
|
| | "FLOAT_3": ("FLOAT", {"default": -2.0, "min": -1.0e9, "max": 1.0e9, "step": 0.01}),
|
| | "FLOAT_4": ("FLOAT", {"default": -2.0, "min": -1.0e9, "max": 1.0e9, "step": 0.01}),
|
| | "FLOAT_5": ("FLOAT", {"default": -2.0, "min": -1.0e9, "max": 1.0e9, "step": 0.01}),
|
| | }
|
| | }
|
| |
|
| | @classmethod
|
| | def IS_CHANGED(cls, *args, **kwargs):
|
| |
|
| | return uuid.uuid4().hex
|
| |
|
| | def run(
|
| | self,
|
| | STR_1: str = "",
|
| | STR_2: str = "",
|
| | STR_3: str = "",
|
| | STR_4: str = "",
|
| | STR_5: str = "",
|
| | STR_6: str = "",
|
| | STR_7: str = "",
|
| | INT_1: int = -1,
|
| | INT_2: int = -1,
|
| | INT_3: int = -1,
|
| | INT_4: int = -1,
|
| | INT_5: int = -1,
|
| | FLOAT_1: float = -2.0,
|
| | FLOAT_2: float = -2.0,
|
| | FLOAT_3: float = -2.0,
|
| | FLOAT_4: float = -2.0,
|
| | FLOAT_5: float = -2.0,
|
| | ):
|
| | try:
|
| |
|
| | prompt = _load_prompt_json_fixed()
|
| |
|
| |
|
| |
|
| | prompt, specs = _collect_and_strip_markers(prompt)
|
| |
|
| |
|
| | assigns: Dict[str, Any] = {}
|
| |
|
| |
|
| | str_vals = [STR_1, STR_2, STR_3, STR_4, STR_5, STR_6, STR_7]
|
| | for i, v in enumerate(str_vals, start=1):
|
| | if isinstance(v, str) and v == "":
|
| | continue
|
| | assigns[f"STR_{i}"] = v
|
| |
|
| |
|
| | int_vals = [INT_1, INT_2, INT_3, INT_4, INT_5]
|
| | for i, v in enumerate(int_vals, start=1):
|
| | if v == -1:
|
| | continue
|
| | assigns[f"INT_{i}"] = v
|
| |
|
| |
|
| | float_vals = [FLOAT_1, FLOAT_2, FLOAT_3, FLOAT_4, FLOAT_5]
|
| | for i, v in enumerate(float_vals, start=1):
|
| | if _float_is_no_input(v):
|
| | continue
|
| | assigns[f"FLOAT_{i}"] = v
|
| |
|
| |
|
| | _apply_assignments(prompt, specs, assigns)
|
| |
|
| |
|
| | outputs_to_execute = _infer_outputs_to_execute(prompt)
|
| |
|
| |
|
| | prompt_id, number = _queue_prompt(prompt, outputs_to_execute)
|
| |
|
| | return (f"Queued workflow as prompt_id={prompt_id} (number={number})",)
|
| |
|
| | except Exception as e:
|
| |
|
| | return (f"ERROR: {e}",)
|
| |
|
| |
|
| | NODE_CLASS_MAPPINGS = {
|
| | "JSONRUNNER_X": JSONRUNNER_X,
|
| | }
|
| |
|
| | NODE_DISPLAY_NAME_MAPPINGS = {
|
| | "JSONRUNNER_X": "JSONRUNNER_X",
|
| | }
|
| |
|