| | import comfy.sd |
| | import comfy.clip_vision |
| | import folder_paths |
| | import comfy.utils |
| | import torch |
| | import random |
| | from datetime import datetime |
| | import random |
| | import gc |
| | import os |
| | import json |
| | import re |
| |
|
| | from .hard.mg_upscale_module import clear_gpu_and_ram_cache |
| |
|
| | |
| | _checkpoint_cache = {} |
| | _loaded_checkpoint = None |
| | _lora_cache = {} |
| | _active_lora_names = set() |
| |
|
| |
|
| | def _clear_unused_loras(active_names): |
| | """Remove unused LoRAs from cache and clear GPU memory.""" |
| | unused = [n for n in _lora_cache if n not in active_names] |
| | for n in unused: |
| | del _lora_cache[n] |
| | if unused: |
| | gc.collect() |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| |
|
| |
|
| | def _load_checkpoint(path): |
| | """Load checkpoint from cache or disk.""" |
| | if path in _checkpoint_cache: |
| | return _checkpoint_cache[path] |
| | model, clip, vae = comfy.sd.load_checkpoint_guess_config( |
| | path, |
| | output_vae=True, |
| | output_clip=True, |
| | embedding_directory=folder_paths.get_folder_paths("embeddings"), |
| | )[:3] |
| | _checkpoint_cache[path] = (model, clip, vae) |
| | return model, clip, vae |
| |
|
| |
|
| | def _unload_old_checkpoint(current_path): |
| | """Unload checkpoint if it's different from the current one.""" |
| | global _loaded_checkpoint |
| | if _loaded_checkpoint and _loaded_checkpoint != current_path: |
| | _checkpoint_cache.pop(_loaded_checkpoint, None) |
| | gc.collect() |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| | _loaded_checkpoint = current_path |
| |
|
| |
|
| |
|
| | class MagicNodesCombiNode: |
| | @classmethod |
| | def INPUT_TYPES(cls): |
| | def _loras_with_none(): |
| | try: |
| | return ["None"] + list(folder_paths.get_filename_list("loras")) |
| | except Exception: |
| | return ["None"] |
| |
|
| | return { |
| | "required": { |
| | |
| |
|
| |
|
| | |
| | "use_checkpoint": ("BOOLEAN", {"default": True}), |
| | "checkpoint": (folder_paths.get_filename_list("checkpoints"), {}), |
| | "clear_cache": ("BOOLEAN", {"default": False}), |
| |
|
| | |
| | "use_lora_1": ("BOOLEAN", {"default": True}), |
| | "lora_1": (_loras_with_none(), {}), |
| | "strength_model_1": ("FLOAT", {"default": 1.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| | "strength_clip_1": ("FLOAT", {"default": 1.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| |
|
| | |
| | "use_lora_2": ("BOOLEAN", {"default": False}), |
| | "lora_2": (_loras_with_none(), {}), |
| | "strength_model_2": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| | "strength_clip_2": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| |
|
| | |
| | "use_lora_3": ("BOOLEAN", {"default": False}), |
| | "lora_3": (_loras_with_none(), {}), |
| | "strength_model_3": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| | "strength_clip_3": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| |
|
| | |
| | "use_lora_4": ("BOOLEAN", {"default": False}), |
| | "lora_4": (_loras_with_none(), {}), |
| | "strength_model_4": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| | "strength_clip_4": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| |
|
| | |
| | "use_lora_5": ("BOOLEAN", {"default": False}), |
| | "lora_5": (_loras_with_none(), {}), |
| | "strength_model_5": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| | "strength_clip_5": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| |
|
| | |
| | "use_lora_6": ("BOOLEAN", {"default": False}), |
| | "lora_6": (_loras_with_none(), {}), |
| | "strength_model_6": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| | "strength_clip_6": ("FLOAT", {"default": 0.0, "min": -1.5, "max": 1.5, "step": 0.01,}), |
| | }, |
| | "optional": { |
| | "model_in": ("MODEL", {}), |
| | "clip_in": ("CLIP", {}), |
| | "vae_in": ("VAE", {}), |
| |
|
| | |
| | "positive_prompt": ("STRING", {"multiline": True, "default": "", "dynamicPrompts": False}), |
| | "negative_prompt": ("STRING", {"multiline": True, "default": "", "dynamicPrompts": False}), |
| |
|
| | |
| | "positive_in": ("CONDITIONING", {}), |
| | "negative_in": ("CONDITIONING", {}), |
| |
|
| | |
| | "clip_set_last_layer_positive": ("INT", {"default": -2, "min": -20, "max": 0}), |
| | "clip_set_last_layer_negative": ("INT", {"default": -2, "min": -20, "max": 0}), |
| |
|
| | |
| | "recipe_slot": (["Off", "Slot 1", "Slot 2", "Slot 3", "Slot 4"], {"default": "Off", "tooltip": "Choose slot to save/load assembled setup."}), |
| | "recipe_save": ("BOOLEAN", {"default": False, "tooltip": "Save current setup into the selected slot."}), |
| | "recipe_use": ("BOOLEAN", {"default": False, "tooltip": "Load and apply setup from the selected slot for this run."}), |
| |
|
| | |
| | "standard_pipeline": ("BOOLEAN", {"default": False, "tooltip": "Use vanilla order for CLIP: Set Last Layer -> Load LoRA -> Encode (same CLIP logic as standard ComfyUI)."}), |
| |
|
| | |
| | "clip_lora_pos_gain": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 3.0, "step": 0.01, "tooltip": "Multiplier for CLIP-LoRA strength on positive branch (standard pipeline)."}), |
| | "clip_lora_neg_gain": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 3.0, "step": 0.01, "tooltip": "Multiplier for CLIP-LoRA strength on negative branch (standard pipeline)."}), |
| |
|
| | |
| | "dynamic_pos": ("BOOLEAN", {"default": False, "tooltip": "Deterministically expand choices in positive prompt (uses dyn_seed)."}), |
| | "dynamic_neg": ("BOOLEAN", {"default": False, "tooltip": "Deterministically expand choices in negative prompt (uses dyn_seed)."}), |
| | "dyn_seed": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFFFF, "tooltip": "Seed for dynamic prompt expansion (same seed used for both prompts)."}), |
| | "dynamic_break_freeze": ("BOOLEAN", {"default": True, "tooltip": "If enabled, do not expand choices before the first |BREAK| marker; dynamic applies only after it."}), |
| | "show_expanded_prompts": ("BOOLEAN", {"default": False, "tooltip": "Print expanded Positive/Negative prompts to console when dynamic is enabled."}), |
| | "save_expanded_prompts": ("BOOLEAN", {"default": False, "tooltip": "Save expanded prompts to mod/dynPrompt/SEED_dd_mm_yyyy.txt when dynamic is enabled."}), |
| | } |
| | } |
| |
|
| |
|
| | RETURN_TYPES = ("MODEL", "CLIP", "CONDITIONING", "CONDITIONING", "VAE") |
| | RETURN_NAMES = ("MODEL", "CLIP", "Positive", "Negative", "VAE") |
| | |
| | |
| | FUNCTION = "apply_magic_node" |
| | CATEGORY = "MagicNodes" |
| |
|
| | def apply_magic_node(self, model_in=None, clip_in=None, checkpoint=None, |
| | use_checkpoint=True, clear_cache=False, |
| | use_lora_1=True, lora_1=None, strength_model_1=1.0, strength_clip_1=1.0, |
| | use_lora_2=False, lora_2=None, strength_model_2=0.0, strength_clip_2=0.0, |
| | use_lora_3=False, lora_3=None, strength_model_3=0.0, strength_clip_3=0.0, |
| | use_lora_4=False, lora_4=None, strength_model_4=0.0, strength_clip_4=0.0, |
| | use_lora_5=False, lora_5=None, strength_model_5=0.0, strength_clip_5=0.0, |
| | use_lora_6=False, lora_6=None, strength_model_6=0.0, strength_clip_6=0.0, |
| | positive_prompt="", negative_prompt="", |
| | clip_set_last_layer_positive=-2, clip_set_last_layer_negative=-2, |
| | vae_in=None, |
| | recipe_slot="Off", recipe_save=False, recipe_use=False, |
| | standard_pipeline=False, |
| | clip_lora_pos_gain=1.0, clip_lora_neg_gain=1.0, |
| | positive_in=None, negative_in=None, |
| | dynamic_pos=False, dynamic_neg=False, dyn_seed=0, dynamic_break_freeze=True, |
| | show_expanded_prompts=False, save_expanded_prompts=False): |
| |
|
| | global _loaded_checkpoint |
| |
|
| | |
| | _checkpoint_cache.clear() |
| | if clear_cache: |
| | _lora_cache.clear() |
| | gc.collect() |
| | if torch.cuda.is_available(): |
| | torch.cuda.empty_cache() |
| |
|
| | |
| | def _recipes_path(): |
| | base = os.path.join(os.path.dirname(__file__), "state") |
| | os.makedirs(base, exist_ok=True) |
| | return os.path.join(base, "combinode_recipes.json") |
| | def _recipes_load(): |
| | try: |
| | with open(_recipes_path(), "r", encoding="utf-8") as f: |
| | return json.load(f) |
| | except Exception: |
| | return {} |
| | def _recipes_save(data: dict): |
| | try: |
| | with open(_recipes_path(), "w", encoding="utf-8") as f: |
| | json.dump(data, f, ensure_ascii=False, indent=2) |
| | except Exception: |
| | pass |
| |
|
| | |
| | slot_idx = {"Off": 0, "Slot 1": 1, "Slot 2": 2, "Slot 3": 3, "Slot 4": 4}.get(str(recipe_slot), 0) |
| | if slot_idx and bool(recipe_use): |
| | rec = _recipes_load().get(str(slot_idx), None) |
| | if rec is not None: |
| | try: |
| | use_checkpoint = rec.get("use_checkpoint", use_checkpoint) |
| | checkpoint = rec.get("checkpoint", checkpoint) |
| | clip_set_last_layer_positive = rec.get("clip_pos", clip_set_last_layer_positive) |
| | clip_set_last_layer_negative = rec.get("clip_neg", clip_set_last_layer_negative) |
| | positive_prompt = rec.get("pos_text", positive_prompt) |
| | negative_prompt = rec.get("neg_text", negative_prompt) |
| | rls = rec.get("loras", []) |
| | if len(rls) >= 4: |
| | (use_lora_1, lora_1, strength_model_1, strength_clip_1) = rls[0] |
| | (use_lora_2, lora_2, strength_model_2, strength_clip_2) = rls[1] |
| | (use_lora_3, lora_3, strength_model_3, strength_clip_3) = rls[2] |
| | (use_lora_4, lora_4, strength_model_4, strength_clip_4) = rls[3] |
| | if len(rls) >= 5: |
| | (use_lora_5, lora_5, strength_model_5, strength_clip_5) = rls[4] |
| | if len(rls) >= 6: |
| | (use_lora_6, lora_6, strength_model_6, strength_clip_6) = rls[5] |
| | print(f"[CombiNode] Loaded recipe Slot {slot_idx}.") |
| | except Exception: |
| | print(f"[CombiNode] Failed to apply recipe Slot {slot_idx}.") |
| |
|
| | |
| | def _norm_prompt(s: str) -> str: |
| | if not isinstance(s, str) or not s: |
| | return s or "" |
| | s2 = s.replace("\r", " ").replace("\n", " ") |
| | s2 = re.sub(r"\s+", " ", s2) |
| | s2 = re.sub(r"\s*,\s*", ", ", s2) |
| | s2 = re.sub(r"(,\s*){2,}", ", ", s2) |
| | return s2.strip() |
| |
|
| | |
| | def _expand_dynamic(text: str, seed_val: int, freeze_before_break: bool = True) -> str: |
| | if not isinstance(text, str) or (text.find('|') < 0): |
| | return text |
| | |
| | if freeze_before_break and ('|BREAK|' in text): |
| | pre, post = text.split('|BREAK|', 1) |
| | return pre + '|BREAK|' + _expand_dynamic(post, seed_val, freeze_before_break=False) |
| | rng = random.Random(int(seed_val) & 0xFFFFFFFF) |
| | def _expand_pattern(t: str, pat: re.Pattern) -> str: |
| | prev = None |
| | cur = t |
| | while prev != cur: |
| | prev = cur |
| | def repl(m): |
| | body = m.group(1) |
| | choices = [c.strip() for c in body.split('|') if c.strip()] |
| | if not choices: |
| | return m.group(0) |
| | return rng.choice(choices) |
| | cur = pat.sub(repl, cur) |
| | return cur |
| | for rx in ( |
| | re.compile(r"\{([^{}]+)\}"), |
| | re.compile(r"\(([^()]+)\)"), |
| | re.compile(r"\[([^\[\]]+)\]"), |
| | ): |
| | text = _expand_pattern(text, rx) |
| | return text |
| |
|
| | |
| | pos_text_expanded = _norm_prompt(_expand_dynamic(positive_prompt, int(dyn_seed), bool(dynamic_break_freeze)) if bool(dynamic_pos) else positive_prompt) |
| | neg_text_expanded = _norm_prompt(_expand_dynamic(negative_prompt, int(dyn_seed), bool(dynamic_break_freeze)) if bool(dynamic_neg) else negative_prompt) |
| |
|
| | if use_checkpoint and checkpoint: |
| | checkpoint_path = folder_paths.get_full_path_or_raise("checkpoints", checkpoint) |
| | _unload_old_checkpoint(checkpoint_path) |
| | base_model, base_clip, vae = _load_checkpoint(checkpoint_path) |
| | model = base_model.clone() |
| | clip = base_clip.clone() |
| | clip_clean = base_clip.clone() |
| |
|
| | elif model_in and clip_in: |
| | _unload_old_checkpoint(None) |
| | model = model_in.clone() |
| | clip = clip_in.clone() |
| | clip_clean = clip_in.clone() |
| | vae = vae_in |
| | else: |
| | raise Exception("No model selected!") |
| |
|
| | |
| |
|
| | |
| | loras = [ |
| | (use_lora_1, lora_1, strength_model_1, strength_clip_1), |
| | (use_lora_2, lora_2, strength_model_2, strength_clip_2), |
| | (use_lora_3, lora_3, strength_model_3, strength_clip_3), |
| | (use_lora_4, lora_4, strength_model_4, strength_clip_4), |
| | (use_lora_5, lora_5, strength_model_5, strength_clip_5), |
| | (use_lora_6, lora_6, strength_model_6, strength_clip_6), |
| | ] |
| |
|
| | active_lora_paths = [] |
| | lora_stack = [] |
| | defer_clip = bool(standard_pipeline) |
| | for use_lora, lora_name, sm, sc in loras: |
| | |
| | if not use_lora or not lora_name: |
| | continue |
| | |
| | try: |
| | lora_path = folder_paths.get_full_path("loras", lora_name) |
| | except Exception: |
| | lora_path = None |
| | if not lora_path or not os.path.exists(lora_path): |
| | try: |
| | print(f"[CombiNode] LoRA '{lora_name}' not found; skipping.") |
| | except Exception: |
| | pass |
| | continue |
| | active_lora_paths.append(lora_path) |
| | |
| | if lora_path in _lora_cache: |
| | lora_file = _lora_cache[lora_path] |
| | else: |
| | lora_file = comfy.utils.load_torch_file(lora_path, safe_load=True) |
| | _lora_cache[lora_path] = lora_file |
| | lora_stack.append((lora_file, float(sc), float(sm))) |
| | sc_apply = 0.0 if defer_clip else sc |
| | model, clip = comfy.sd.load_lora_for_models(model, clip, lora_file, sm, sc_apply) |
| |
|
| | _clear_unused_loras(active_lora_paths) |
| | |
| | try: |
| | counts = {} |
| | for p in active_lora_paths: |
| | counts[p] = counts.get(p, 0) + 1 |
| | dups = [p for p, c in counts.items() if c > 1] |
| | if dups: |
| | print(f"[CombiNode] Duplicate LoRA detected across slots: {len(dups)} file(s).") |
| | except Exception: |
| | pass |
| |
|
| | |
| | |
| | |
| | src_clip = clip_clean if bool(standard_pipeline) else clip |
| |
|
| | pos_gain = float(clip_lora_pos_gain) |
| | neg_gain = float(clip_lora_neg_gain) |
| | skips_equal = int(clip_set_last_layer_positive) == int(clip_set_last_layer_negative) |
| | |
| | use_shared = bool(standard_pipeline) and skips_equal and (abs(pos_gain - neg_gain) < 1e-6) |
| |
|
| | if (positive_in is None) and (negative_in is None) and use_shared: |
| | shared_clip = src_clip.clone() |
| | shared_clip.clip_layer(clip_set_last_layer_positive) |
| | for lora_file, sc, sm in lora_stack: |
| | try: |
| | _m_unused, shared_clip = comfy.sd.load_lora_for_models(model, shared_clip, lora_file, 0.0, sc * pos_gain) |
| | except Exception: |
| | pass |
| | tokens_pos = shared_clip.tokenize(pos_text_expanded) |
| | cond_pos = shared_clip.encode_from_tokens_scheduled(tokens_pos) |
| | tokens_neg = shared_clip.tokenize(neg_text_expanded) |
| | cond_neg = shared_clip.encode_from_tokens_scheduled(tokens_neg) |
| | else: |
| | |
| | clip_pos = src_clip.clone() |
| | clip_pos.clip_layer(clip_set_last_layer_positive) |
| | if bool(standard_pipeline): |
| | for lora_file, sc, sm in lora_stack: |
| | try: |
| | _m_unused, clip_pos = comfy.sd.load_lora_for_models(model, clip_pos, lora_file, 0.0, sc * pos_gain) |
| | except Exception: |
| | pass |
| | if positive_in is not None: |
| | cond_pos = positive_in |
| | else: |
| | tokens_pos = clip_pos.tokenize(pos_text_expanded) |
| | cond_pos = clip_pos.encode_from_tokens_scheduled(tokens_pos) |
| |
|
| | |
| | clip_neg = src_clip.clone() |
| | clip_neg.clip_layer(clip_set_last_layer_negative) |
| | if bool(standard_pipeline): |
| | for lora_file, sc, sm in lora_stack: |
| | try: |
| | _m_unused, clip_neg = comfy.sd.load_lora_for_models(model, clip_neg, lora_file, 0.0, sc * neg_gain) |
| | except Exception: |
| | pass |
| | if negative_in is not None: |
| | cond_neg = negative_in |
| | else: |
| | tokens_neg = clip_neg.tokenize(neg_text_expanded) |
| | cond_neg = clip_neg.encode_from_tokens_scheduled(tokens_neg) |
| |
|
| | |
| | dyn_used = bool(dynamic_pos) or bool(dynamic_neg) |
| | if dyn_used and (bool(show_expanded_prompts) or bool(save_expanded_prompts)): |
| | |
| | if bool(show_expanded_prompts): |
| | try: |
| | print(f"[CombiNode] Expanded prompts (dyn_seed={int(dyn_seed)}):") |
| | def _print_block(name, src, expanded): |
| | print(name + ":") |
| | if bool(dynamic_break_freeze) and ('|BREAK|' in src) and ((name=="Positive" and bool(dynamic_pos)) or (name=="Negative" and bool(dynamic_neg))): |
| | print(" static") |
| | print(" " + expanded) |
| | _print_block("Positive", positive_prompt, pos_text_expanded) |
| | _print_block("Negative", negative_prompt, neg_text_expanded) |
| | except Exception: |
| | pass |
| | |
| | if bool(save_expanded_prompts): |
| | try: |
| | base = os.path.join(os.path.dirname(__file__), "dynPrompt") |
| | os.makedirs(base, exist_ok=True) |
| | now = datetime.now() |
| | fname = f"{int(dyn_seed)}_{now.day:02d}_{now.month:02d}_{now.year}.txt" |
| | path = os.path.join(base, fname) |
| | lines = [] |
| | def _append_block(name, src, expanded): |
| | lines.append(name + ":\n") |
| | if bool(dynamic_break_freeze) and ('|BREAK|' in src) and ((name=="Positive" and bool(dynamic_pos)) or (name=="Negative" and bool(dynamic_neg))): |
| | lines.append("static\n") |
| | lines.append(expanded + "\n\n") |
| | _append_block("Positive", positive_prompt, pos_text_expanded) |
| | _append_block("Negative", negative_prompt, neg_text_expanded) |
| | with open(path, 'w', encoding='utf-8') as f: |
| | f.writelines(lines) |
| | except Exception: |
| | pass |
| |
|
| | |
| | if slot_idx and bool(recipe_save): |
| | data = _recipes_load() |
| | data[str(slot_idx)] = { |
| | "use_checkpoint": bool(use_checkpoint), |
| | "checkpoint": checkpoint, |
| | "clip_pos": int(clip_set_last_layer_positive), |
| | "clip_neg": int(clip_set_last_layer_negative), |
| | "pos_text": str(positive_prompt), |
| | "neg_text": str(negative_prompt), |
| | "loras": [ |
| | [bool(use_lora_1), lora_1, float(strength_model_1), float(strength_clip_1)], |
| | [bool(use_lora_2), lora_2, float(strength_model_2), float(strength_clip_2)], |
| | [bool(use_lora_3), lora_3, float(strength_model_3), float(strength_clip_3)], |
| | [bool(use_lora_4), lora_4, float(strength_model_4), float(strength_clip_4)], |
| | [bool(use_lora_5), lora_5, float(strength_model_5), float(strength_clip_5)], |
| | [bool(use_lora_6), lora_6, float(strength_model_6), float(strength_clip_6)], |
| | ], |
| | } |
| | _recipes_save(data) |
| | print(f"[CombiNode] Saved recipe Slot {slot_idx}.") |
| |
|
| | |
| | return (model, src_clip if bool(standard_pipeline) else clip, cond_pos, cond_neg, vae) |
| |
|
| |
|
| |
|
| |
|
| |
|