| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| import re |
| import json |
| import difflib |
| import importlib |
| from typing import Any, List, Tuple, Optional |
|
|
| import gradio as gr |
| from modules import script_callbacks, scripts, shared |
| from modules.shared import opts |
|
|
| |
| from modules import processing, sd_samplers |
|
|
| |
| from modules import prompt_parser as pp |
|
|
| |
| |
| |
|
|
| def _highlight_at(text: str, line: int, column: int, context: int = 2) -> str: |
| lines = text.splitlines() |
| i = max(0, line - 1) |
| start = max(0, i - context) |
| end = min(len(lines), i + context + 1) |
| out = [] |
| for idx in range(start, end): |
| prefix = ">> " if idx == i else " " |
| out.append(f"{prefix}{idx+1:>4}: {lines[idx]}") |
| if idx == i: |
| caret = " " * (column + 7 + len(str(idx+1))) + "^" |
| out.append(caret) |
| return "\n".join(out) |
|
|
| def _guess_suggestion(src: str) -> List[str]: |
| tips = [] |
| if src.count("(") != src.count(")"): |
| tips.append("Скобки () несбалансированы — проверьте пары.") |
| if src.count("[") != src.count("]"): |
| tips.append("Скобки [] несбалансированы — проверьте пары.") |
| if src.count("{") != src.count("}"): |
| tips.append("Скобки {} несбалансированы — проверьте пары.") |
| if "::" in src and not any(term in src for term in ["!", ";", "!!"]): |
| tips.append("Похоже, вы используете sequence (::), но не поставили терминатор (! или ; / !! для top-level).") |
| if re.search(r":[A-Za-z]", src): |
| tips.append("После ':' обычно ожидается число для веса (например, :1.2) — проверьте места с двоеточием.") |
| if re.search(r"\[[^]]+\]!\s*!", src): |
| tips.append("alternate_distinct пишется как [a|b]! — одно '!' сразу после ']'.") |
| return tips |
|
|
| def format_lark_error(prompt: str, err: Exception) -> str: |
| title = f"**Ошибка парсинга:** `{type(err).__name__}`: {getattr(err, 'message', str(err))}" |
| try: |
| line = getattr(err, 'line', None) |
| column = getattr(err, 'column', None) |
| pos = getattr(err, 'pos_in_stream', None) |
| except Exception: |
| line = column = pos = None |
| block = "" |
| if line is not None and column is not None: |
| block = "```\n" + _highlight_at(prompt, line, column) + "\n```" |
| elif pos is not None: |
| prefix = prompt[:pos] |
| l = prefix.count("\n") + 1 |
| c = len(prefix.split("\n")[-1]) + 1 |
| block = "```\n" + _highlight_at(prompt, l, c) + "\n```" |
| tips = _guess_suggestion(prompt) |
| tips_md = "\n".join([f"- {t}" for t in tips]) if tips else "- Проверьте синтаксис во вкладке «Справка и примеры»." |
| return f"{title}\n\n{block}\n\n**Как исправить:**\n{tips_md}\n" |
|
|
| |
| |
| |
|
|
| class EnvState: |
| def __init__(self): |
| self.allow_empty_alt = os.environ.get("ALLOW_EMPTY_ALTERNATE", "0") |
| self.expand_alt_per_step = os.environ.get("EXPAND_ALTERNATE_PER_STEP", "1") |
| self.group_combo_limit = os.environ.get("GROUP_COMBO_LIMIT", "100") |
|
|
| def apply(self, allow_empty_alt: bool, expand_alt: bool, combo_limit: int) -> bool: |
| changed = False |
| new_allow = "1" if allow_empty_alt else "0" |
| new_expand = "1" if expand_alt else "0" |
| new_limit = str(int(combo_limit)) |
| if new_allow != self.allow_empty_alt: |
| os.environ["ALLOW_EMPTY_ALTERNATE"] = new_allow |
| self.allow_empty_alt = new_allow |
| changed = True |
| if new_expand != self.expand_alt_per_step: |
| os.environ["EXPAND_ALTERNATE_PER_STEP"] = new_expand |
| self.expand_alt_per_step = new_expand |
| changed = True |
| if new_limit != self.group_combo_limit: |
| os.environ["GROUP_COMBO_LIMIT"] = new_limit |
| self.group_combo_limit = new_limit |
| changed = True |
| return changed |
|
|
| ENV_STATE = EnvState() |
|
|
| def maybe_reload_prompt_parser(allow_empty_alt: bool, expand_alt: bool, combo_limit: int): |
| changed = ENV_STATE.apply(allow_empty_alt, expand_alt, combo_limit) |
| if changed: |
| global pp |
| from modules import prompt_parser as _pp |
| pp = importlib.reload(_pp) |
|
|
| |
| |
| |
|
|
| def _pp_repeat_macro(src: str) -> Tuple[str, List[str]]: |
| """ |
| repeat N (text) -> text, text, ... (N раз) |
| Простой, небалансированный разбор скобок — подходит для типичных кейсов. |
| """ |
| changes = [] |
| pattern = re.compile(r"\brepeat\s+(\d+)\s*\(([^()]*?)\)", flags=re.IGNORECASE | re.DOTALL) |
| while True: |
| m = pattern.search(src) |
| if not m: |
| break |
| n = max(1, int(m.group(1))) |
| body = m.group(2).strip() |
| repl = ", ".join([body] * n) |
| src = src[:m.start()] + repl + src[m.end():] |
| changes.append(f"repeat {n} (...) → {n} повторов") |
| return src, changes |
|
|
| def _pp_every_k_macro(src: str, total_steps: int) -> Tuple[str, List[str]]: |
| """ |
| every K [a|b|c] -> [ a : b : c ] : K, 2K, 3K ... (пока < steps) |
| """ |
| changes = [] |
| rx = re.compile(r"\bevery\s+(\d+)\s*\[\s*([^\]]+?)\s*\]", flags=re.IGNORECASE) |
| def repl(m): |
| k = max(1, int(m.group(1))) |
| opts_bar = m.group(2).strip() |
| out = build_alternate_every_k(opts_bar, total_steps, k) |
| changes.append(f"every {k} [...] → scheduled с границами кратными {k}") |
| return out |
| return rx.sub(repl, src), changes |
|
|
| def _pp_prob_alt(src: str, total_steps: int) -> Tuple[str, List[str]]: |
| """ |
| probabilistic alternates: a{0.2}|b{0.8} (скобки {} прямо возле варианта) |
| → строим [ a : b ] : boundaries пропорционально весам |
| Работает как сахар; допускаем пробелы: 'red {0.4} | blue{0.6}' |
| """ |
| changes = [] |
| |
| |
| token_rx = re.compile(r"((?:[^|\n\[\]]+?\{\s*\d*\.?\d+\s*\}\s*\|\s*)+[^|\n\[\]]+?\{\s*\d*\.?\d+\s*\})") |
| def convert_block(block: str) -> str: |
| parts = [p.strip() for p in block.split("|")] |
| items = [] |
| total = 0.0 |
| for p in parts: |
| m = re.search(r"\{\s*(\d*\.?\d+)\s*\}\s*$", p) |
| if not m: |
| return block |
| w = float(m.group(1)) |
| text = re.sub(r"\{\s*\d*\.?\d+\s*\}\s*$", "", p).strip() |
| items.append((text, w)) |
| total += w |
| if total <= 0: |
| return block |
| |
| boundaries = [] |
| accum = 0.0 |
| for i in range(len(items) - 1): |
| accum += items[i][1] / total |
| b = int(round(accum * total_steps)) |
| if 0 < b < total_steps: |
| boundaries.append(str(b)) |
| core = " : ".join([it[0] for it in items]) |
| extra = f" : {', '.join(boundaries)}" if boundaries else "" |
| return f"[ {core} ]{extra}" |
|
|
| def repl(m): |
| orig = m.group(1) |
| out = convert_block(orig) |
| if out != orig: |
| changes.append("probabilistic a{p}|b{q} → scheduled с пропорциями весов") |
| return out |
|
|
| return token_rx.sub(repl, src), changes |
|
|
| def preprocess_prompt(src: str, total_steps: int, enable: bool) -> Tuple[str, List[str]]: |
| """ |
| Применяем сахар по шагам. Возвращаем (новый_текст, список_изменений). |
| """ |
| if not enable: |
| return src, [] |
| changes_all: List[str] = [] |
|
|
| |
| src, ch = _pp_repeat_macro(src) |
| changes_all += ch |
|
|
| |
| src, ch = _pp_every_k_macro(src, total_steps) |
| changes_all += ch |
|
|
| |
| src, ch = _pp_prob_alt(src, total_steps) |
| changes_all += ch |
|
|
| return src, changes_all |
|
|
| |
| |
| |
|
|
| def _safe_parse_tree(prompt: str): |
| try: |
| tree = pp.schedule_parser.parse(prompt) |
| tree_str = tree.pretty() if hasattr(tree, "pretty") else str(tree) |
| return tree_str, "" |
| except Exception as e: |
| return "", format_lark_error(prompt, e) |
|
|
| def _make_schedule(prompt: str, steps: int, seed: Optional[int], use_visitor: bool) -> Tuple[List[List[Any]], str]: |
| try: |
| schedule = pp.get_schedule(prompt, steps, True, seed, use_visitor=use_visitor) |
| if not schedule: |
| return [], "**Пустое расписание.** Проверьте синтаксис." |
| return schedule, "" |
| except Exception as e: |
| return [], f"**Ошибка при построении расписания:** {e}" |
|
|
| def _schedule_table_md(schedule: List[List[Any]]) -> str: |
| rows = ["| end_step | text |", "|---:|---|"] |
| for end_step, text in schedule: |
| safe_text = text.replace("|", "\\|") |
| rows.append(f"| {end_step} | {safe_text} |") |
| return "\n".join(rows) |
|
|
| def _per_step_timeline_md(schedule: List[List[Any]], steps: int) -> str: |
| rows = ["| step | active text |", "|---:|---|"] |
| for s in range(1, steps + 1): |
| txt = pp.at_step_from_schedule(s, schedule) |
| safe_txt = txt.replace("|", "\\|") |
| rows.append(f"| {s} | {safe_txt} |") |
| return "\n".join(rows) |
|
|
| def _compare_vis_vs_trans(prompt: str, steps: int, seed: Optional[int]) -> str: |
| vis, e1 = _make_schedule(prompt, steps, seed, True) |
| trn, e2 = _make_schedule(prompt, steps, seed, False) |
| if e1 or e2: |
| return (e1 or "") + (e2 or "") |
| md = ["**Visitor (сбор в визиторе)**", _schedule_table_md(vis), |
| "", "**Transformer (пересбор на end_step)**", _schedule_table_md(trn)] |
| return "\n\n".join(md) |
|
|
| |
| |
| |
|
|
| class Issue: |
| def __init__(self, kind: str, msg: str, fix_hint: str = "", fixer=None): |
| self.kind = kind |
| self.msg = msg |
| self.fix_hint = fix_hint |
| self.fixer = fixer |
|
|
| def lint_prompt(src: str, use_visitor: bool) -> List[Issue]: |
| issues: List[Issue] = [] |
|
|
| |
| if src.count("(") != src.count(")"): |
| issues.append(Issue("Error", "Несбалансированы круглые скобки ().", |
| "Проверьте пары () или удалите лишние.", |
| lambda s: auto_balance(s, "(", ")"))) |
| if src.count("[") != src.count("]"): |
| issues.append(Issue("Error", "Несбалансированы квадратные скобки [].", |
| "Проверьте пары [] или удалите лишние.", |
| lambda s: auto_balance(s, "[", "]"))) |
| if src.count("{") != src.count("}"): |
| issues.append(Issue("Warning", "Несбалансированы фигурные скобки {}.", |
| "Если это 'dead group', лучше убрать фигурные скобки.", |
| lambda s: auto_balance(s, "{", "}"))) |
|
|
| |
| if "::" in src and not any(t in src for t in ["!", ";", "!!"]): |
| issues.append(Issue("Error", "Обнаружен sequence (::), но нет завершающего терминатора (! или ; / !!).", |
| "Добавьте '!' или ';' в конец sequence.", |
| ensure_sequence_terminator)) |
|
|
| |
| if re.search(r":[A-Za-z]", src): |
| issues.append(Issue("Warning", "После ':' встречается буква — вероятно ожидалось число веса.", |
| "Исправьте на ':1.2' или уберите ':'.", |
| lambda s: s)) |
|
|
| |
| if re.search(r"(?:^|[^[])\b[^]\n]*\|[^[]", src) and "[" not in src: |
| issues.append(Issue("Hint", "Используется '|' вне '[]' — это alternate1/2 (рандом), а не карусель по шагам.", |
| "Заменить на [ a | b ] для перебора по шагам.", |
| bars_to_square_alternate)) |
|
|
| |
| if "&" in src: |
| issues.append(Issue("Hint", "Символ '&' активирует and_rule (логическая сцепка).", |
| "Если это текст, замените на 'and' или запятую.", |
| replace_amp_with_and)) |
|
|
| |
| if "{" in src and "|" not in src: |
| issues.append(Issue("Hint", "Группа без '|' (dead group) — просто склейка, фигурные скобки не нужны.", |
| "Уберите {} — оставьте плоский список.", |
| remove_dead_groups)) |
|
|
| |
| if re.search(r"\[[^]]+\]!\s*!", src): |
| issues.append(Issue("Warning", "Лишний '!' после alternate_distinct.", |
| "Должно быть '[a|b]!' (одно '!').", |
| lambda s: re.sub(r"\]!\s*!", "]!", s))) |
|
|
| |
| for m in re.finditer(r"(\d+)\s*!?\s*{([^}]+)}", src): |
| n = int(m.group(1)) |
| body = m.group(2) |
| opts = [o.strip() for o in body.split("|") if o.strip()] |
| if "!" in m.group(0) and len(opts) < n: |
| issues.append(Issue("Warning", f"В numbered distinct запрошено {n}, но вариантов всего {len(opts)}.", |
| "Уменьшите N или уберите '!'.", |
| lambda s, _n=n: re.sub(rf"\b{_n}\s*!", str(_n), s, count=1))) |
|
|
| |
| if use_visitor and re.search(r"(^|[^[])\b[^]\n]+\|[^[]", src): |
| issues.append(Issue("Hint", "Под use_visitor=True бар 'a|b' может остаться буквальным. " |
| "Для случайного выбора используйте [a|b] или выключите use_visitor.", |
| "Заменить на [a|b] или переключить режим.", bars_to_square_alternate)) |
| return issues |
|
|
| def auto_balance(src: str, l: str, r: str) -> str: |
| c_l = src.count(l) |
| c_r = src.count(r) |
| if c_l > c_r: |
| return src + (r * (c_l - c_r)) |
| if c_r > c_l: |
| return src |
| return src |
|
|
| def ensure_sequence_terminator(src: str) -> str: |
| if "::" in src and not re.search(r"(!|;|!!)\s*$", src): |
| return src.rstrip() + " !" |
| return src |
|
|
| def bars_to_square_alternate(src: str) -> str: |
| if "[" in src and "]" in src: |
| return src |
| return "[ " + src.strip() + " ]" |
|
|
| def replace_amp_with_and(src: str) -> str: |
| return src.replace("&", "and") |
|
|
| def remove_dead_groups(src: str) -> str: |
| def repl(m): |
| inner = m.group(1) |
| if "|" in inner: |
| return m.group(0) |
| return inner |
| return re.sub(r"\{\s*([^{}]+?)\s*\}", repl, src) |
|
|
| def apply_safe_fixes(src: str, use_visitor: bool) -> Tuple[str, List[str]]: |
| issues = lint_prompt(src, use_visitor) |
| changes = [] |
| out = src |
| for issue in issues: |
| if callable(issue.fixer): |
| new_out = issue.fixer(out) |
| if new_out != out: |
| changes.append(f"{issue.kind}: {issue.msg} → {issue.fix_hint}") |
| out = new_out |
| return out, changes |
|
|
| |
| |
| |
|
|
| def build_group(items_csv: str) -> str: |
| items = [s.strip() for s in items_csv.split(",") if s.strip()] |
| return "{ " + ", ".join(items) + " }" if items else "{}" |
|
|
| def build_alternate(items_bar: str, distinct: bool) -> str: |
| core = items_bar.strip() |
| if not core: |
| return "[]" |
| return f"[ {core} ]!" if distinct else f"[ {core} ]" |
|
|
| def build_numbered(n: int, options_bar_or_group: str, distinct: bool) -> str: |
| n = max(1, int(n)) |
| body = options_bar_or_group.strip() |
| mark = "!" if distinct else "" |
| return f"{n}{mark} {body}" |
|
|
| def build_sequence(owner: str, pairs_text: str, terminator: str) -> str: |
| owner = owner.strip() |
| pairs = [] |
| for chunk in pairs_text.split(";"): |
| chunk = chunk.strip() |
| if not chunk: |
| continue |
| if ":" not in chunk: |
| pairs.append(chunk) |
| else: |
| k, v = chunk.split(":", 1) |
| pairs.append(f"{k.strip()} :: {v.strip()}") |
| term = "!" if terminator == "!" else ";" |
| return f"{owner} :: " + ", ".join(pairs) + f" {term}" |
|
|
| def build_top_level_sequence(owner: str, pairs_text: str, trailing_plain: str) -> str: |
| seq = build_sequence(owner, pairs_text, "!") |
| inner = seq.replace(f"{owner} :: ", "").rstrip("!;").strip() |
| tail = (", " + trailing_plain.strip()) if trailing_plain.strip() else "" |
| return f"{owner.strip()} ::: {inner} !!{tail}" |
|
|
| def build_scheduled_block(variants_multiline: str, ranges_text: str, reverse: bool) -> str: |
| variants = [v.strip() for v in variants_multiline.split("\n") if v.strip()] |
| if not variants: |
| return "" |
| core = " : ".join(variants) |
| extra = f" : {ranges_text.strip()}" if ranges_text.strip() else "" |
| suffix = " reverse" if reverse else "" |
| return f"[ {core} ]{extra}{suffix}" |
|
|
| def build_alternate_every_k(options_bar: str, steps: int, k: int) -> str: |
| opts = [o.strip() for o in options_bar.split("|") if o.strip()] |
| if not opts: |
| return "" |
| boundaries = [] |
| for i in range(1, len(opts)): |
| b = i * k |
| if b < steps: |
| boundaries.append(str(b)) |
| extra = f" : {', '.join(boundaries)}" if boundaries else "" |
| return f"[ {' : '.join(opts)} ]{extra}" |
|
|
| |
| def conv_dead_groups_to_plain(src: str) -> str: |
| return remove_dead_groups(src) |
|
|
| def conv_bars_to_square(src: str) -> str: |
| return bars_to_square_alternate(src) |
|
|
| def conv_add_seq_terminator(src: str) -> str: |
| return ensure_sequence_terminator(src) |
|
|
| |
| |
| |
|
|
| def schedule_to_csv(schedule: List[List[Any]]) -> str: |
| lines = ["end_step,text"] |
| for end_step, text in schedule: |
| safe = text.replace('"', '""') |
| lines.append(f'{end_step},"{safe}"') |
| return "\n".join(lines) |
|
|
| def timeline_to_csv(schedule: List[List[Any]], steps: int) -> str: |
| lines = ["step,text"] |
| for s in range(1, steps + 1): |
| t = pp.at_step_from_schedule(s, schedule) |
| safe = t.replace('"', '""') |
| lines.append(f'{s},"{safe}"') |
| return "\n".join(lines) |
|
|
| def schedule_to_json(schedule: List[List[Any]]) -> str: |
| data = [{"end_step": end, "text": txt} for end, txt in schedule] |
| return json.dumps(data, ensure_ascii=False, indent=2) |
|
|
| def md_block(title: str, content: str) -> str: |
| return f"### {title}\n\n```\n{content}\n```" |
|
|
| def mk_diff(a: str, b: str, from_name="original", to_name="fixed") -> str: |
| diff = difflib.unified_diff( |
| a.splitlines(keepends=False), |
| b.splitlines(keepends=False), |
| fromfile=from_name, tofile=to_name, lineterm="" |
| ) |
| text = "\n".join(diff) |
| return "```\n" + (text if text else "# (no changes)\n") + "\n```" |
|
|
| |
| |
| |
|
|
| def highlight_prompt_html(src: str) -> str: |
| |
| esc = (src.replace("&", "&").replace("<", "<").replace(">", ">")) |
| |
| esc = re.sub(r"(\[\]|\[|\])", r'<span style="color:#4aa3ff">\1</span>', esc) |
| esc = re.sub(r"(\{\}|\{|\})", r'<span style="color:#ffa640">\1</span>', esc) |
| esc = re.sub(r"(\(|\))", r'<span style="color:#92d050">\1</span>', esc) |
| esc = re.sub(r"(::|:::|!!|!|;|\|)", r'<span style="color:#ff5b8a">\1</span>', esc) |
| esc = re.sub(r"\bAND\b(?!_)", r'<span style="color:#ffd000">AND</span>', esc) |
| return f'<pre style="white-space:pre-wrap; font-family:monospace; line-height:1.3">{esc}</pre>' |
|
|
| |
| |
| |
|
|
| def _list_samplers() -> List[str]: |
| try: |
| return [x.name for x in sd_samplers.all_samplers] |
| except Exception: |
| return ["Euler a", "Euler", "DPM++ 2M Karras"] |
|
|
| def _generate_txt2img(prompt: str, |
| negative: str, |
| steps: int, |
| width: int, |
| height: int, |
| cfg_scale: float, |
| seed: int, |
| sampler_name: str, |
| batch_count: int, |
| batch_size: int, |
| restore_faces: bool, |
| tiling: bool) -> Tuple[List[Any], str]: |
| try: |
| p = processing.StableDiffusionProcessingTxt2Img( |
| sd_model=shared.sd_model, |
| prompt=prompt, |
| negative_prompt=negative, |
| steps=int(steps), |
| width=int(width), |
| height=int(height), |
| cfg_scale=float(cfg_scale), |
| seed=int(seed), |
| sampler_name=sampler_name, |
| n_iter=int(batch_count), |
| batch_size=int(batch_size), |
| restore_faces=restore_faces, |
| tiling=tiling, |
| do_not_save_grid=True, |
| do_not_save_samples=True, |
| ) |
| processed = processing.process_images(p) |
| return processed.images, processed.info |
| except Exception as e: |
| return [], f"Ошибка генерации: {e}" |
|
|
| def _generate_variants_with_placeholder(prompt: str, |
| negative: str, |
| placeholder: str, |
| options_multiline: str, |
| steps: int, |
| width: int, |
| height: int, |
| cfg_scale: float, |
| seed: int, |
| sampler_name: str, |
| restore_faces: bool, |
| tiling: bool) -> Tuple[List[Any], str]: |
| opts_list = [o.strip() for o in options_multiline.split("\n") if o.strip()] |
| if not opts_list: |
| return [], "Нет вариантов (пустой список)." |
| images_all = [] |
| infos = [] |
| try: |
| for i, opt_val in enumerate(opts_list, 1): |
| pr = prompt.replace(placeholder, opt_val) |
| p = processing.StableDiffusionProcessingTxt2Img( |
| sd_model=shared.sd_model, |
| prompt=pr, |
| negative_prompt=negative, |
| steps=int(steps), |
| width=int(width), |
| height=int(height), |
| cfg_scale=float(cfg_scale), |
| seed=int(seed + i - 1), |
| sampler_name=sampler_name, |
| n_iter=1, |
| batch_size=1, |
| restore_faces=restore_faces, |
| tiling=tiling, |
| do_not_save_grid=True, |
| do_not_save_samples=True, |
| ) |
| processed = processing.process_images(p) |
| images_all.extend(processed.images or []) |
| infos.append(f"[{i}] {opt_val} — seed={int(seed + i - 1)}") |
| return images_all, "\n".join(infos) |
| except Exception as e: |
| return [], f"Ошибка генерации вариантов: {e}" |
|
|
| |
| |
| |
|
|
| _AND_SPLIT_RX = re.compile(r"\bAND\b(?!_PERP|_SALT|_TOPK)") |
|
|
| def and_debug_parse(src: str) -> List[Tuple[str, Optional[float]]]: |
| """ |
| Разбиваем на сегменты по AND (без AND_*), ищем локальный вес ' :number' в конце сегмента. |
| Возвращаем [(text, weight_or_None), ...] |
| """ |
| parts = _AND_SPLIT_RX.split(src) |
| out = [] |
| for part in parts: |
| part = part.strip() |
| if not part: |
| continue |
| m = re.search(r"^(.*?)(?:\s*:\s*([+\-]?\d*\.?\d+(?:[eE][+\-]?\d+)?))?\s*$", part) |
| if m: |
| text = m.group(1).strip() |
| w = float(m.group(2)) if m.group(2) is not None else None |
| out.append((text, w)) |
| else: |
| out.append((part, None)) |
| return out |
|
|
| def and_debug_build(rows_text: str) -> str: |
| """ |
| Принимает табличный ввод вида: |
| text1 : 1.2 |
| text2 : 0.8 |
| Собирает: "text1 :1.2 AND text2 :0.8" |
| """ |
| lines = [ln.strip() for ln in rows_text.split("\n") if ln.strip()] |
| parts = [] |
| for ln in lines: |
| m = re.match(r"^(.*?)(?:\s*:\s*([+\-]?\d*\.?\d+(?:[eE][+\-]?\d+)?))?$", ln) |
| if m: |
| t = m.group(1).strip() |
| w = m.group(2) |
| parts.append(f"{t} :{w}" if w else t) |
| return " AND ".join(parts) |
|
|
| |
| |
| |
|
|
| def analyze(prompt: str, |
| steps: int, |
| seed: Optional[int], |
| use_visitor: bool, |
| allow_empty_alt: bool, |
| expand_alt_per_step: bool, |
| group_combo_limit: int, |
| view_mode: str, |
| at_step_n: int, |
| enable_pre: bool) -> Tuple[str, str, str, str, str, str]: |
| try: |
| maybe_reload_prompt_parser(allow_empty_alt, expand_alt_per_step, group_combo_limit) |
| except Exception as e: |
| err = f"**Не удалось применить ENV-флаги:** {e}" |
| return err, "", "", "", "", "" |
|
|
| pre_md = "" |
| if enable_pre: |
| pre_out, pre_changes = preprocess_prompt(prompt, steps, True) |
| if pre_changes: |
| pre_md = "**Препроцессор применил изменения:**\n" + "\n".join([f"- {c}" for c in pre_changes]) + \ |
| "\n\n" + mk_diff(prompt, pre_out, "original", "preprocessed") |
| prompt = pre_out |
|
|
| status_md = "" |
| tree_md = "" |
| schedule_md = "" |
| timeline_md = "" |
| text_at_step_md = "" |
|
|
| tree_str, err_md = _safe_parse_tree(prompt) |
| if err_md: |
| status_md = err_md |
| else: |
| status_md = "**OK:** Парсинг прошёл успешно." |
| if view_mode == "Parse tree": |
| tree_md = "```\n" + tree_str + "\n```" |
|
|
| schedule, sched_err = _make_schedule(prompt, steps, seed, use_visitor) |
| if sched_err: |
| status_md += "\n\n" + sched_err |
| return status_md, "", "", "", pre_md, tree_md |
|
|
| if view_mode in ("Schedule", "All"): |
| schedule_md = _schedule_table_md(schedule) |
| if view_mode in ("Timeline (per step)", "All"): |
| timeline_md = _per_step_timeline_md(schedule, steps) |
|
|
| try: |
| t = pp.at_step_from_schedule(at_step_n, schedule) |
| text_at_step_md = f"**Шаг {at_step_n}:** `{t}`" |
| except Exception as e: |
| text_at_step_md = f"Ошибка получения текста на шаге {at_step_n}: {e}" |
|
|
| if view_mode == "Visitor vs Transformer": |
| tree_md = _compare_vis_vs_trans(prompt, steps, seed) |
|
|
| return status_md, schedule_md, timeline_md, text_at_step_md, pre_md, tree_md |
|
|
| |
| |
| |
|
|
| def export_project(an_src, steps, seed, use_visitor, |
| allow_empty, expand_alt, group_limit, |
| gen_cfg) -> str: |
| data = { |
| "analyzer": { |
| "prompt": an_src, |
| "steps": int(steps), |
| "seed": int(seed) if seed is not None else None, |
| "use_visitor": bool(use_visitor), |
| "ALLOW_EMPTY_ALTERNATE": bool(allow_empty), |
| "EXPAND_ALTERNATE_PER_STEP": bool(expand_alt), |
| "GROUP_COMBO_LIMIT": int(group_limit), |
| }, |
| "generate": { |
| "single": { |
| "width": int(gen_cfg["w"]), |
| "height": int(gen_cfg["h"]), |
| "steps": int(gen_cfg["steps"]), |
| "cfg": float(gen_cfg["cfg"]), |
| "seed": int(gen_cfg["seed"]), |
| "sampler": gen_cfg["sampler"], |
| "batch_count": int(gen_cfg["bc"]), |
| "batch_size": int(gen_cfg["bs"]), |
| "restore_faces": bool(gen_cfg["rf"]), |
| "tiling": bool(gen_cfg["tl"]), |
| } |
| } |
| } |
| return json.dumps(data, ensure_ascii=False, indent=2) |
|
|
| def import_project(text: str): |
| try: |
| data = json.loads(text) |
| a = data.get("analyzer", {}) |
| g = data.get("generate", {}).get("single", {}) |
| return ( |
| a.get("prompt", ""), |
| int(a.get("steps", 50)), |
| int(a.get("seed", 42)) if a.get("seed") is not None else 42, |
| bool(a.get("use_visitor", True)), |
| bool(a.get("ALLOW_EMPTY_ALTERNATE", False)), |
| bool(a.get("EXPAND_ALTERNATE_PER_STEP", True)), |
| int(a.get("GROUP_COMBO_LIMIT", 100)), |
| int(g.get("width", 512)), |
| int(g.get("height", 768)), |
| int(g.get("steps", 30)), |
| float(g.get("cfg", 7.0)), |
| int(g.get("seed", 42)), |
| str(g.get("sampler", _list_samplers()[0])), |
| int(g.get("batch_count", 1)), |
| int(g.get("batch_size", 1)), |
| bool(g.get("restore_faces", False)), |
| bool(g.get("tiling", False)), |
| ) |
| except Exception: |
| |
| return ("", 50, 42, True, False, True, 100, 512, 768, 30, 7.0, 42, _list_samplers()[0], 1, 1, False, False) |
|
|
| |
| |
| |
|
|
| class PromptParserAnalyzerScript(scripts.Script): |
| def title(self): |
| return "Prompt Parser Analyzer" |
|
|
| def show(self, is_txt2img): |
| return scripts.AlwaysVisible |
|
|
| def ui(self, is_txt2img): |
| with gr.Group(): |
| with gr.Accordion("Prompt Parser Analyzer", open=False): |
| with gr.Tabs(): |
|
|
| |
| with gr.Tab("Analyzer"): |
| prompt_in = gr.Textbox(label="Source Prompt", lines=6, placeholder="Вставьте промпт…") |
| with gr.Row(): |
| steps = gr.Slider(1, 300, value=50, step=1, label="Steps") |
| seed = gr.Number(value=42, precision=0, label="Seed (для случайных конструкций)") |
| at_step_n = gr.Slider(1, 300, value=25, step=1, label="Показать текст на шаге N") |
| with gr.Row(): |
| use_visitor = gr.Checkbox(value=True, label="use_visitor (быстрее; alternate2 может отличаться)") |
| view_mode = gr.Dropdown( |
| choices=["All", "Schedule", "Timeline (per step)", "Parse tree", "Visitor vs Transformer"], |
| value="All", |
| label="Режим вывода", |
| ) |
| with gr.Row(): |
| allow_empty_alt = gr.Checkbox(value=False, label="ALLOW_EMPTY_ALTERNATE (разрешать пустые [a||b])") |
| expand_alt = gr.Checkbox(value=True, label="EXPAND_ALTERNATE_PER_STEP (чередовать по шагам)") |
| group_limit = gr.Slider(1, 5000, value=100, step=1, label="GROUP_COMBO_LIMIT") |
| enable_pre = gr.Checkbox(value=True, label="Enable Preprocessor (синтаксический сахар)") |
| analyze_btn = gr.Button("Analyze", variant="primary") |
|
|
| status_md = gr.Markdown() |
| schedule_md = gr.Markdown() |
| timeline_md = gr.Markdown() |
| text_at_step_md = gr.Markdown() |
| pre_md = gr.Markdown() |
| tree_md = gr.Markdown() |
|
|
| |
| gr.Markdown("#### Подсветка синтаксиса") |
| highlight_html = gr.HTML() |
|
|
| |
| with gr.Tab("Lint / Fix"): |
| lint_src = gr.Textbox(label="Prompt для проверки", lines=6, placeholder="Вставьте промпт…") |
| with gr.Row(): |
| lint_use_visitor = gr.Checkbox(value=True, label="use_visitor (для эвристик)") |
| lint_btn = gr.Button("Run Lint", variant="primary") |
| fix_btn = gr.Button("Apply Safe Fixes") |
| lint_report = gr.Markdown() |
| fixed_out = gr.Textbox(label="Fixed Prompt (предпросмотр)", lines=6) |
| diff_md = gr.Markdown() |
|
|
| |
| with gr.Tab("Scheduler"): |
| gr.Markdown("Соберите `scheduled`: варианты по строкам, интервалы (%, шаги), опционально `reverse`.") |
| sched_variants = gr.Textbox(label="Варианты (каждый с новой строки)", value="A\nB\nC", lines=6) |
| sched_ranges = gr.Textbox(label="Интервалы", placeholder="30, 70 или 10%-30%, 60-80") |
| sched_reverse = gr.Checkbox(value=False, label="reverse") |
| build_sched_btn = gr.Button("Build [ ... ] : ranges", variant="primary") |
| sched_expr = gr.Textbox(label="Результат", lines=2) |
| gr.Markdown("---") |
| gr.Markdown("Альтернативы: детерминированный переключатель каждые K шагов (строится через scheduled).") |
| alt_bar = gr.Textbox(label="Опции (через |)", value="red|blue|green") |
| k_steps = gr.Slider(1, 100, value=10, step=1, label="K (каждые K шагов)") |
| total_steps_for_k = gr.Slider(1, 300, value=50, step=1, label="Steps (для расчёта границ)") |
| build_every_k = gr.Button("Build alternate every K", variant="secondary") |
| every_k_expr = gr.Textbox(label="Результат", lines=2) |
|
|
| |
| with gr.Tab("Converters / Presets"): |
| conv_src = gr.Textbox(label="Prompt для преобразования", lines=6) |
| with gr.Row(): |
| conv_dead = gr.Button("Dead groups → plain") |
| conv_bars = gr.Button("a|b → [a|b]") |
| conv_seq_term = gr.Button("Sequence: добавить терминатор") |
| conv_out = gr.Textbox(label="Результат", lines=6) |
|
|
| gr.Markdown("---") |
| gr.Markdown("Конструкторы Lark") |
| with gr.Row(): |
| grp_items = gr.Textbox(label="Group items (через запятую)", placeholder="a, b, c") |
| grp_out = gr.Textbox(label="Результат", lines=2) |
| grp_btn = gr.Button("Build {group}") |
| with gr.Row(): |
| alt_items = gr.Textbox(label="Alternates (через |)", placeholder="a|b|c") |
| alt_distinct = gr.Checkbox(value=False, label="distinct (!)") |
| alt_out = gr.Textbox(label="Результат", lines=2) |
| alt_btn = gr.Button("Build [alternate]") |
| with gr.Row(): |
| num_n = gr.Number(value=3, precision=0, label="N (сколько взять)") |
| num_body = gr.Textbox(label="Опции: '{...}' или 'a|b|c'", placeholder="{ a | b | c } или a|b|c") |
| num_distinct = gr.Checkbox(value=True, label="distinct (!)") |
| num_out = gr.Textbox(label="Результат", lines=2) |
| num_btn = gr.Button("Build numbered") |
| with gr.Row(): |
| seq_owner = gr.Textbox(label="Sequence owner", value="character") |
| seq_pairs = gr.Textbox(label="Sequence pairs (k:v; k:v)", value="outfit: leather jacket; mood: brooding") |
| seq_term = gr.Dropdown(choices=["!", ";"], value="!", label="Terminator") |
| seq_out = gr.Textbox(label="Результат", lines=2) |
| seq_btn = gr.Button("Build sequence") |
| with gr.Row(): |
| top_owner = gr.Textbox(label="Top-level owner", value="portrait") |
| top_pairs = gr.Textbox(label="Pairs (k:v; k:v)", value="hair: auburn; eyes: blue") |
| top_tail = gr.Textbox(label="Trailing plain (опционально)", value="ultra-detailed skin texture") |
| top_out = gr.Textbox(label="Результат", lines=2) |
| top_btn = gr.Button("Build top-level sequence") |
|
|
| |
| with gr.Tab("Export / Diff / Project"): |
| exp_src = gr.Textbox(label="Prompt (источник для экспорта)", lines=6) |
| with gr.Row(): |
| exp_steps = gr.Slider(1, 300, value=50, step=1, label="Steps") |
| exp_seed = gr.Number(value=42, precision=0, label="Seed") |
| exp_use_visitor = gr.Checkbox(value=True, label="use_visitor") |
| exp_btn = gr.Button("Build schedule/timeline", variant="primary") |
| exp_sched_csv = gr.Textbox(label="Schedule CSV", lines=6) |
| exp_sched_json = gr.Textbox(label="Schedule JSON", lines=8) |
| exp_timeline_csv = gr.Textbox(label="Timeline CSV", lines=8) |
| copy_md_btn = gr.Button("Copy Markdown of schedule/timeline (ниже)") |
| exp_sched_md = gr.Markdown() |
| exp_timeline_md = gr.Markdown() |
|
|
| gr.Markdown("---") |
| gr.Markdown("### Project JSON") |
| proj_export_btn = gr.Button("Export Project JSON") |
| proj_json = gr.Textbox(label="Project JSON", lines=14) |
| proj_import_btn = gr.Button("Import Project JSON") |
| proj_status = gr.Markdown() |
|
|
| |
| with gr.Tab("AND Debugger"): |
| and_src = gr.Textbox(label="AND-chain prompt", lines=6) |
| and_parse_btn = gr.Button("Parse AND chain") |
| and_table = gr.Markdown() |
| and_builder_txt = gr.Textbox(label="Builder: text[:weight] на каждой строке", lines=6) |
| and_build_btn = gr.Button("Build AND chain") |
| and_out = gr.Textbox(label="Результат", lines=2) |
|
|
| |
| with gr.Tab("Generate"): |
| gr.Markdown("### Single") |
| gen_prompt = gr.Textbox(label="Prompt", lines=6, placeholder="Возьмём из Analyzer → Source Prompt, можно отредактировать") |
| gen_negative = gr.Textbox(label="Negative", lines=4, value="") |
| with gr.Row(): |
| gen_width = gr.Slider(64, 1536, value=512, step=64, label="Width") |
| gen_height = gr.Slider(64, 1536, value=768, step=64, label="Height") |
| gen_steps = gr.Slider(1, 300, value=30, step=1, label="Steps") |
| with gr.Row(): |
| gen_cfg = gr.Slider(1, 20, value=7.0, step=0.5, label="CFG") |
| gen_seed = gr.Number(value=42, precision=0, label="Seed") |
| gen_sampler = gr.Dropdown(choices=_list_samplers(), value=_list_samplers()[0], label="Sampler") |
| with gr.Row(): |
| gen_batch_count = gr.Slider(1, 16, value=1, step=1, label="Batch Count") |
| gen_batch_size = gr.Slider(1, 8, value=1, step=1, label="Batch Size") |
| with gr.Row(): |
| gen_restore_faces = gr.Checkbox(value=False, label="Restore faces") |
| gen_tiling = gr.Checkbox(value=False, label="Tiling") |
| gen_btn = gr.Button("Generate", variant="primary") |
| gen_gallery = gr.Gallery(label="Result").style(grid=4) |
| gen_info = gr.Textbox(label="Info", lines=4) |
|
|
| gr.Markdown("---") |
| gr.Markdown("### A/B Variants (placeholder)") |
| gr.Markdown("Задайте плейсхолдер в промпте, напр. `{{ALT}}`, и список вариантов построчно.") |
| var_prompt = gr.Textbox(label="Prompt (с плейсхолдером)", lines=6) |
| var_negative = gr.Textbox(label="Negative", lines=4, value="") |
| var_placeholder = gr.Textbox(label="Placeholder", value="{{ALT}}") |
| var_options = gr.Textbox(label="Options (каждая на новой строке)", value="red dress\nblue jeans\nblack coat", lines=6) |
| with gr.Row(): |
| var_width = gr.Slider(64, 1536, value=512, step=64, label="Width") |
| var_height = gr.Slider(64, 1536, value=768, step=64, label="Height") |
| var_steps = gr.Slider(1, 300, value=30, step=1, label="Steps") |
| with gr.Row(): |
| var_cfg = gr.Slider(1, 20, value=7.0, step=0.5, label="CFG") |
| var_seed = gr.Number(value=1000, precision=0, label="Base Seed") |
| var_sampler = gr.Dropdown(choices=_list_samplers(), value=_list_samplers()[0], label="Sampler") |
| with gr.Row(): |
| var_restore_faces = gr.Checkbox(value=False, label="Restore faces") |
| var_tiling = gr.Checkbox(value=False, label="Tiling") |
| var_btn = gr.Button("Generate A/B", variant="secondary") |
| var_gallery = gr.Gallery(label="Variants").style(grid=6) |
| var_info = gr.Textbox(label="Info", lines=6) |
|
|
| |
| def _analyze_wrap(src, s, sd, uv, aea, ealt, gl, vm, stepn, en_pre): |
| res = analyze(src, s, sd, uv, aea, ealt, gl, vm, stepn, en_pre) |
| |
| analyzed_prompt = src |
| if en_pre: |
| pre_prompt, _ = preprocess_prompt(src, s, True) |
| analyzed_prompt = pre_prompt |
| return (*res, highlight_prompt_html(analyzed_prompt)) |
|
|
| analyze_btn.click( |
| fn=_analyze_wrap, |
| inputs=[prompt_in, steps, seed, use_visitor, allow_empty_alt, expand_alt, group_limit, view_mode, at_step_n, enable_pre], |
| outputs=[status_md, schedule_md, timeline_md, text_at_step_md, pre_md, tree_md, highlight_html], |
| ) |
|
|
| |
| def _run_lint(src: str, use_vis: bool): |
| issues = lint_prompt(src, use_vis) |
| if not issues: |
| return "**OK:** проблем не найдено.", "" |
| lines = [] |
| for it in issues: |
| lines.append(f"- **{it.kind}**: {it.msg} — _{it.fix_hint or '…'}_") |
| return "\n".join(lines), "" |
| lint_btn.click(_run_lint, [lint_src, lint_use_visitor], [lint_report, fixed_out]) |
|
|
| def _apply_fixes(src: str, use_vis: bool): |
| fixed, changes = apply_safe_fixes(src, use_vis) |
| report = "**Применены правки:**\n" + "\n".join([f"- {c}" for c in changes]) if changes else "_Нечего править (safe-fixes)._" |
| return fixed, report, mk_diff(src, fixed) |
| fix_btn.click(_apply_fixes, [lint_src, lint_use_visitor], [fixed_out, lint_report, diff_md]) |
|
|
| |
| build_sched_btn.click(lambda v, r, rev: build_scheduled_block(v, r, rev), |
| [sched_variants, sched_ranges, sched_reverse], [sched_expr]) |
| build_every_k.click(lambda bar, s, k: build_alternate_every_k(bar, int(s), int(k)), |
| [alt_bar, total_steps_for_k, k_steps], [every_k_expr]) |
|
|
| |
| conv_dead.click(lambda s: conv_dead_groups_to_plain(s), [conv_src], [conv_out]) |
| conv_bars.click(lambda s: conv_bars_to_square(s), [conv_src], [conv_out]) |
| conv_seq_term.click(lambda s: conv_add_seq_terminator(s), [conv_src], [conv_out]) |
|
|
| |
| grp_btn.click(build_group, [grp_items], [grp_out]) |
| alt_btn.click(build_alternate, [alt_items, alt_distinct], [alt_out]) |
| num_btn.click(build_numbered, [num_n, num_body, num_distinct], [num_out]) |
| seq_btn.click(build_sequence, [seq_owner, seq_pairs, seq_term], [seq_out]) |
| top_btn.click(build_top_level_sequence, [top_owner, top_pairs, top_tail], [top_out]) |
|
|
| |
| def _export_all(src: str, s: int, sd: Optional[int], use_vis: bool): |
| sched, err = _make_schedule(src, s, sd, use_vis) |
| if err: |
| return "", "", "", f"**Ошибка:** {err}", "" |
| csv_sched = schedule_to_csv(sched) |
| json_sched = schedule_to_json(sched) |
| csv_timeline = timeline_to_csv(sched, s) |
| md1 = md_block("Schedule", _schedule_table_md(sched)) |
| md2 = md_block("Timeline", _per_step_timeline_md(sched, s)) |
| return csv_sched, json_sched, csv_timeline, md1, md2 |
| exp_btn.click(_export_all, [exp_src, exp_steps, exp_seed, exp_use_visitor], |
| [exp_sched_csv, exp_sched_json, exp_timeline_csv, exp_sched_md, exp_timeline_md]) |
|
|
| def _copy_md(md1: str, md2: str): |
| return md1 + "\n\n" + md2 |
| copy_md_btn.click(_copy_md, [exp_sched_md, exp_timeline_md], [exp_sched_md]) |
|
|
| |
| def _proj_export(an_src_v, steps_v, seed_v, uv_v, aea_v, ealt_v, gl_v, |
| w, h, st, cfg, ssd, samp, bc, bs, rf, tl): |
| cfg_map = {"w": w, "h": h, "steps": st, "cfg": cfg, "seed": ssd, "sampler": samp, "bc": bc, "bs": bs, "rf": rf, "tl": tl} |
| return export_project(an_src_v, steps_v, seed_v, uv_v, aea_v, ealt_v, gl_v, cfg_map) |
| proj_export_btn.click(_proj_export, |
| [exp_src, exp_steps, exp_seed, exp_use_visitor, allow_empty_alt, expand_alt, group_limit, |
| gen_width, gen_height, gen_steps, gen_cfg, gen_seed, gen_sampler, gen_batch_count, gen_batch_size, |
| gen_restore_faces, gen_tiling], |
| [proj_json]) |
|
|
| def _proj_import(text): |
| vals = import_project(text) |
| return vals |
| proj_import_btn.click(_proj_import, [proj_json], |
| [exp_src, exp_steps, exp_seed, exp_use_visitor, |
| allow_empty_alt, expand_alt, group_limit, |
| gen_width, gen_height, gen_steps, gen_cfg, gen_seed, gen_sampler, |
| gen_batch_count, gen_batch_size, gen_restore_faces, gen_tiling]) |
|
|
| |
| def _and_parse(src): |
| rows = and_debug_parse(src) |
| if not rows: |
| return "_Нет сегментов._", "" |
| lines = ["| # | text | weight |", "|---:|---|---:|"] |
| builder_lines = [] |
| for i, (t, w) in enumerate(rows, 1): |
| wt = "" if w is None else str(w) |
| safe_t = t.replace("|", "\\|") |
| lines.append(f"| {i} | {safe_t} | {wt} |") |
| builder_lines.append(f"{t} : {wt}" if wt else t) |
| return "\n".join(lines), "\n".join(builder_lines) |
| and_parse_btn.click(_and_parse, [and_src], [and_table, and_builder_txt]) |
|
|
| and_build_btn.click(lambda rows: and_debug_build(rows), [and_builder_txt], [and_out]) |
|
|
| |
| gen_btn.click( |
| fn=_generate_txt2img, |
| inputs=[gen_prompt, gen_negative, gen_steps, gen_width, gen_height, |
| gen_cfg, gen_seed, gen_sampler, gen_batch_count, gen_batch_size, |
| gen_restore_faces, gen_tiling], |
| outputs=[gen_gallery, gen_info] |
| ) |
|
|
| |
| var_btn.click( |
| fn=_generate_variants_with_placeholder, |
| inputs=[var_prompt, var_negative, var_placeholder, var_options, |
| var_steps, var_width, var_height, var_cfg, var_seed, var_sampler, |
| var_restore_faces, var_tiling], |
| outputs=[var_gallery, var_info] |
| ) |
|
|
| return [] |
|
|
| def on_ui_settings(): |
| section = ('prompt-parser-analyzer', "Prompt Parser Analyzer") |
| |
|
|
| script_callbacks.on_ui_settings(on_ui_settings) |
|
|