|
|
import gradio as gr |
|
|
import json |
|
|
from PIL import Image |
|
|
import os |
|
|
from collections import defaultdict |
|
|
|
|
|
css = """ |
|
|
#custom-gallery{--row-height:180px;display:grid;grid-auto-rows:min-content;gap:10px}#custom-gallery .thumbnail-item{height:var(--row-height);width:100%;position:relative;overflow:hidden;border-radius:8px;box-shadow:0 2px 5px rgb(0 0 0 / .1);transition:transform 0.2s ease,box-shadow 0.2s ease}#custom-gallery .thumbnail-item:hover{transform:translateY(-3px);box-shadow:0 4px 12px rgb(0 0 0 / .15)}#custom-gallery .thumbnail-item img{width:auto;height:100%;max-width:100%;max-height:var(--row-height);object-fit:contain;margin:0 auto;display:block}#custom-gallery .thumbnail-item img.portrait{max-width:100%}#custom-gallery .thumbnail-item img.landscape{max-height:100%}.gallery-container{max-height:500px;overflow-y:auto;padding-right:0;--size-80:500px}.thumbnails{display:flex;position:absolute;bottom:0;width:120px;overflow-x:scroll;padding-top:320px;padding-bottom:280px;padding-left:4px;flex-wrap:wrap} |
|
|
""" |
|
|
|
|
|
EMPTY_RESULT = ("Not Available",) * 15 |
|
|
|
|
|
|
|
|
def read_metadata(file_path): |
|
|
try: |
|
|
with Image.open(file_path) as img: |
|
|
return img.info |
|
|
except Exception as e: |
|
|
return {"error": f"Error reading file: {str(e)}"} |
|
|
|
|
|
def extract_workflow_data(file_path): |
|
|
metadata = read_metadata(file_path) |
|
|
if "error" in metadata: |
|
|
return {"error": metadata["error"]} |
|
|
|
|
|
if 'prompt' in metadata: |
|
|
try: |
|
|
return json.loads(metadata['prompt']) |
|
|
except json.JSONDecodeError: |
|
|
pass |
|
|
|
|
|
for key, value in metadata.items(): |
|
|
if isinstance(value, str) and value.strip().startswith('{'): |
|
|
try: |
|
|
return json.loads(value) |
|
|
except json.JSONDecodeError: |
|
|
continue |
|
|
return {"error": "No workflow data found"} |
|
|
|
|
|
def extract_ksampler_params(workflow_data): |
|
|
seed = steps = cfg = sampler = scheduler = denoise = "Not found" |
|
|
if not isinstance(workflow_data, dict): |
|
|
return seed, steps, cfg, sampler, scheduler, denoise |
|
|
for node in workflow_data.values(): |
|
|
if isinstance(node, dict) and node.get("class_type", "") in ["KSampler", "KSampler (Efficient)"]: |
|
|
inputs = node.get("inputs", {}) |
|
|
seed = inputs.get("seed", "Not found") |
|
|
steps = inputs.get("steps", "Not found") |
|
|
cfg = inputs.get("cfg", "Not found") |
|
|
sampler = inputs.get("sampler_name", "Not found") |
|
|
scheduler = inputs.get("scheduler", "Not found") |
|
|
denoise = inputs.get("denoise", "Not found") |
|
|
break |
|
|
return str(seed), str(steps), str(cfg), str(sampler), str(scheduler), str(denoise) |
|
|
|
|
|
def extract_prompts(workflow_data): |
|
|
positive = negative = "Not found" |
|
|
if not isinstance(workflow_data, dict): |
|
|
return positive, negative |
|
|
for node in workflow_data.values(): |
|
|
if isinstance(node, dict): |
|
|
class_type = node.get("class_type", "") |
|
|
inputs = node.get("inputs", {}) |
|
|
title = node.get("_meta", {}).get("title", "") if node.get("_meta") else "" |
|
|
|
|
|
if "Text to Conditioning" in class_type: |
|
|
if "POSITIVE" in title: |
|
|
positive = inputs.get("text", "Not found") |
|
|
elif "NEGATIVE" in title: |
|
|
negative = inputs.get("text", "Not found") |
|
|
if "ShowText|pysssss" in class_type: |
|
|
if "text_1" in inputs: |
|
|
positive = inputs["text_1"] |
|
|
if "text_2" in inputs: |
|
|
negative = inputs["text_2"] |
|
|
if "DPRandomGenerator" in class_type: |
|
|
if "POSITIVE" in title: |
|
|
positive = inputs.get("text", "Not found") |
|
|
elif "NEGATIVE" in title: |
|
|
negative = inputs.get("text", "Not found") |
|
|
return str(positive), str(negative) |
|
|
|
|
|
def extract_loras(workflow_data): |
|
|
loras = [] |
|
|
if not isinstance(workflow_data, dict): |
|
|
return "None found" |
|
|
for node in workflow_data.values(): |
|
|
if isinstance(node, dict): |
|
|
inputs = node.get("inputs", {}) |
|
|
if "LoraLoader" in node.get("class_type", ""): |
|
|
name = inputs.get("lora_name", "Unknown") |
|
|
strength = inputs.get("strength_model", "Unknown") |
|
|
loras.append(f"{name} (Strength: {strength})") |
|
|
for val in inputs.values(): |
|
|
if isinstance(val, str) and "lora:" in val.lower(): |
|
|
loras.append(val) |
|
|
return "\n".join(loras) if loras else "None found" |
|
|
|
|
|
def extract_model_info(workflow_data): |
|
|
models = [] |
|
|
if not isinstance(workflow_data, dict): |
|
|
return "Not found" |
|
|
for node in workflow_data.values(): |
|
|
if isinstance(node, dict): |
|
|
inputs = node.get("inputs", {}) |
|
|
class_type = node.get("class_type", "") |
|
|
if "CheckpointLoader" in class_type: |
|
|
models.append(inputs.get("ckpt_name", "Unknown")) |
|
|
if "Model Mecha Recipe" in class_type: |
|
|
models.append(inputs.get("model_path", "Unknown")) |
|
|
return "\n".join(models) if models else "Not found" |
|
|
|
|
|
def extract_image_info_from_file(image_path): |
|
|
"""Extract actual image dimensions from the image file itself""" |
|
|
try: |
|
|
with Image.open(image_path) as img: |
|
|
width, height = img.size |
|
|
return str(width), str(height) |
|
|
except Exception as e: |
|
|
return "Not found", "Not found" |
|
|
|
|
|
def extract_batch_size(workflow_data): |
|
|
"""Extract batch size from workflow data""" |
|
|
batch_size = "Not found" |
|
|
if not isinstance(workflow_data, dict): |
|
|
return batch_size |
|
|
for node in workflow_data.values(): |
|
|
if isinstance(node, dict) and node.get("class_type", "") == "EmptyLatentImage": |
|
|
inputs = node.get("inputs", {}) |
|
|
batch_size = inputs.get("batch_size", "Not found") |
|
|
break |
|
|
return str(batch_size) |
|
|
|
|
|
def extract_nodes_info(workflow_data): |
|
|
if not isinstance(workflow_data, dict): |
|
|
return "Not found" |
|
|
total_nodes = len(workflow_data) |
|
|
node_types = defaultdict(int) |
|
|
for node in workflow_data.values(): |
|
|
if isinstance(node, dict): |
|
|
node_types[node.get("class_type", "Unknown")] += 1 |
|
|
summary = f"Total Nodes: {total_nodes}\n" |
|
|
for t, c in sorted(node_types.items()): |
|
|
summary += f"{t}: {c}\n" |
|
|
return summary.strip() |
|
|
|
|
|
def extract_workflow_as_json(workflow_data): |
|
|
if isinstance(workflow_data, dict): |
|
|
return json.dumps(workflow_data, ensure_ascii=False, indent=2) |
|
|
return "{}" |
|
|
|
|
|
|
|
|
|
|
|
def process_single_image(image_path): |
|
|
"""Extract all workflow info from a single image path.""" |
|
|
if not image_path: |
|
|
return EMPTY_RESULT |
|
|
|
|
|
workflow_data = extract_workflow_data(image_path) |
|
|
|
|
|
if isinstance(workflow_data, dict) and "error" not in workflow_data: |
|
|
seed, steps, cfg, sampler, scheduler, denoise = extract_ksampler_params(workflow_data) |
|
|
positive, negative = extract_prompts(workflow_data) |
|
|
loras = extract_loras(workflow_data) |
|
|
models = extract_model_info(workflow_data) |
|
|
|
|
|
|
|
|
width, height = extract_image_info_from_file(image_path) |
|
|
batch = extract_batch_size(workflow_data) |
|
|
|
|
|
nodes = extract_nodes_info(workflow_data) |
|
|
full_json = extract_workflow_as_json(workflow_data) |
|
|
else: |
|
|
error = str(workflow_data.get("error", "Unknown error")) |
|
|
seed = steps = cfg = sampler = scheduler = denoise = positive = negative = loras = models = width = height = batch = nodes = full_json = error |
|
|
|
|
|
return seed, steps, cfg, sampler, scheduler, denoise, \ |
|
|
positive, negative, loras, models, width, height, batch, nodes, full_json |
|
|
|
|
|
def append_gallery(gallery: list, image: str): |
|
|
"""Add a single image to the gallery""" |
|
|
if gallery is None: |
|
|
gallery = [] |
|
|
if not image: |
|
|
return gallery, None |
|
|
gallery.append(image) |
|
|
return gallery, None |
|
|
|
|
|
def extend_gallery(gallery, images): |
|
|
"""Extend gallery preserving uniqueness""" |
|
|
|
|
|
if gallery is None: |
|
|
gallery = [] |
|
|
|
|
|
if not images: |
|
|
return gallery |
|
|
|
|
|
|
|
|
incoming_paths = [] |
|
|
if isinstance(images, str): |
|
|
incoming_paths.append(images) |
|
|
elif isinstance(images, list): |
|
|
for img in images: |
|
|
|
|
|
if isinstance(img, (tuple, list)): |
|
|
incoming_paths.append(str(img[0])) |
|
|
else: |
|
|
incoming_paths.append(str(img)) |
|
|
|
|
|
unique_incoming = list(set(incoming_paths)) |
|
|
|
|
|
seen_paths = {item[0] if isinstance(item, (list, tuple)) else item for item in gallery} |
|
|
new_entries = [path for path in unique_incoming if path not in seen_paths] |
|
|
|
|
|
|
|
|
formatted_new = [(path, '') for path in new_entries] |
|
|
|
|
|
updated_gallery = gallery + formatted_new |
|
|
|
|
|
return updated_gallery |
|
|
|
|
|
def process_gallery(gallery, results_state): |
|
|
"""Process all images and populate metadata in session.""" |
|
|
if not gallery or len(gallery) == 0: |
|
|
|
|
|
results_state.clear() |
|
|
return EMPTY_RESULT + (results_state,) |
|
|
|
|
|
updated_state = {} |
|
|
first_image_result = EMPTY_RESULT |
|
|
try: |
|
|
for item in gallery: |
|
|
path = item if isinstance(item, str) else item[0] |
|
|
|
|
|
if path not in results_state: |
|
|
res = process_single_image(path) |
|
|
results_state[path] = res |
|
|
updated_state[path] = res |
|
|
|
|
|
if first_image_result == EMPTY_RESULT: |
|
|
first_image_result = res |
|
|
else: |
|
|
|
|
|
res = results_state[path] |
|
|
updated_state[path] = res |
|
|
|
|
|
if first_image_result == EMPTY_RESULT: |
|
|
first_image_result = res |
|
|
|
|
|
results_state.update(updated_state) |
|
|
return first_image_result + (results_state,) |
|
|
except Exception as e: |
|
|
print("[ERROR]", str(e)) |
|
|
return EMPTY_RESULT + (results_state,) |
|
|
|
|
|
def get_selection_from_gallery(gallery, results_state, evt: gr.SelectData): |
|
|
"""Fetch result for selected image in gallery.""" |
|
|
if evt is None or evt.value is None: |
|
|
|
|
|
if gallery and len(gallery) > 0: |
|
|
img_path = str(gallery[0][0] if isinstance(gallery[0], (list, tuple)) else gallery[0]) |
|
|
if img_path in results_state: |
|
|
return list(results_state[img_path]) |
|
|
else: |
|
|
|
|
|
try: |
|
|
selected_value = evt.value |
|
|
img_path = None |
|
|
|
|
|
if isinstance(selected_value, dict) and 'image' in selected_value: |
|
|
img_path = selected_value['image']['path'] |
|
|
elif isinstance(selected_value, (list, tuple)): |
|
|
img_path = selected_value[0] |
|
|
else: |
|
|
img_path = str(selected_value) |
|
|
|
|
|
if img_path in results_state: |
|
|
return list(results_state[img_path]) |
|
|
except Exception as e: |
|
|
print(f"Selection error: {e}") |
|
|
|
|
|
|
|
|
return list(EMPTY_RESULT) |
|
|
|
|
|
|
|
|
with gr.Blocks(title="ComfyUI Workflow Extractor", css=css, theme="Werli/Purple-Crimson-Gradio-Theme", fill_width=True) as demo: |
|
|
gr.Markdown("# π οΈ ComfyUI Workflow Information Extractor") |
|
|
gr.Markdown("Upload Multiple ComfyUI-generated images. Extract prompts, parameters, models, and full workflows.") |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
upload_button = gr.UploadButton( |
|
|
"π Upload Multiple Images", |
|
|
file_types=["image"], |
|
|
file_count="multiple", |
|
|
size='lg' |
|
|
) |
|
|
gallery = gr.Gallery( |
|
|
columns=3, |
|
|
show_share_button=False, |
|
|
interactive=True, |
|
|
height='auto', |
|
|
label='Grid of images', |
|
|
preview=False, |
|
|
elem_id='custom-gallery' |
|
|
) |
|
|
|
|
|
with gr.Column(scale=3): |
|
|
with gr.Tabs(): |
|
|
with gr.Tab("Sampling Parameters"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
seed_out = gr.Textbox(label="Seed", interactive=False, show_copy_button=True) |
|
|
steps_out = gr.Textbox(label="Steps", interactive=False, show_copy_button=True) |
|
|
cfg_out = gr.Textbox(label="CFG Scale", interactive=False) |
|
|
with gr.Column(): |
|
|
sampler_out = gr.Textbox(label="Sampler", interactive=False) |
|
|
scheduler_out = gr.Textbox(label="Scheduler", interactive=False) |
|
|
denoise_out = gr.Textbox(label="Denoise", interactive=False) |
|
|
|
|
|
with gr.Tab("Prompts"): |
|
|
pos_prompt = gr.Textbox(label="Positive Prompt", lines=4, interactive=False, show_copy_button=True) |
|
|
neg_prompt = gr.Textbox(label="Negative Prompt", lines=4, interactive=False, show_copy_button=True) |
|
|
|
|
|
with gr.Tab("Models & LoRAs"): |
|
|
with gr.Row(): |
|
|
lora_out = gr.Textbox(label="LoRAs", lines=5, interactive=False, show_copy_button=True) |
|
|
model_out = gr.Textbox(label="Base Models", lines=5, interactive=False, show_copy_button=True) |
|
|
|
|
|
with gr.Tab("Image Info"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
width_out = gr.Textbox(label="Width", interactive=False) |
|
|
height_out = gr.Textbox(label="Height", interactive=False) |
|
|
batch_out = gr.Textbox(label="Batch Size", interactive=False) |
|
|
with gr.Column(): |
|
|
nodes_out = gr.Textbox(label="Node Counts", lines=15, interactive=True, show_copy_button=True) |
|
|
|
|
|
with gr.Tab("Full Workflow"): |
|
|
json_out = gr.Textbox(label="Workflow JSON", lines=20, interactive=True, show_copy_button=True) |
|
|
|
|
|
|
|
|
results_state = gr.State({}) |
|
|
|
|
|
|
|
|
upload_event = upload_button.upload( |
|
|
fn=extend_gallery, |
|
|
inputs=[gallery, upload_button], |
|
|
outputs=gallery, |
|
|
queue=False |
|
|
) |
|
|
|
|
|
upload_event.then( |
|
|
fn=process_gallery, |
|
|
inputs=[gallery, results_state], |
|
|
outputs=[ |
|
|
seed_out, steps_out, cfg_out, sampler_out, scheduler_out, denoise_out, |
|
|
pos_prompt, neg_prompt, lora_out, model_out, width_out, height_out, |
|
|
batch_out, nodes_out, json_out, results_state |
|
|
] |
|
|
) |
|
|
gallery.change( |
|
|
fn=process_gallery, |
|
|
inputs=[gallery, results_state], |
|
|
outputs=[ |
|
|
seed_out, steps_out, cfg_out, sampler_out, scheduler_out, denoise_out, |
|
|
pos_prompt, neg_prompt, lora_out, model_out, width_out, height_out, |
|
|
batch_out, nodes_out, json_out, results_state |
|
|
], |
|
|
queue=True |
|
|
) |
|
|
|
|
|
gallery.select( |
|
|
get_selection_from_gallery, |
|
|
inputs=[gallery, results_state], |
|
|
outputs=[ |
|
|
seed_out, steps_out, cfg_out, sampler_out, scheduler_out, denoise_out, |
|
|
pos_prompt, neg_prompt, lora_out, model_out, width_out, height_out, |
|
|
batch_out, nodes_out, json_out |
|
|
] |
|
|
) |
|
|
|
|
|
gr.Markdown("---\nπ‘ **Note:** It's under development.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue(max_size=10).launch(show_api=False, show_error=True) |