import os import json import requests import gradio as gr import random import time import logging import shutil import zipfile from typing import List import subprocess # Define constants COMFYUI_DIR = 'ComfyUI' WORKFLOW_PATH = os.path.join(COMFYUI_DIR, 'sd3.json') OUTPUT_DIR = os.path.join(COMFYUI_DIR, 'output') URL = "http://127.0.0.1:8188/prompt" # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Shared state for cancellation cancel_processing = False def check_gpu(): try: import torch if torch.cuda.is_available(): logging.info("GPU is available") else: logging.warning("GPU is not available") except ImportError: logging.warning("PyTorch is not installed, cannot check GPU availability") def start_comfyui(): if not is_comfyui_running(): logging.info("Starting ComfyUI...") subprocess.Popen(["python", "main.py"], cwd=COMFYUI_DIR) time.sleep(10) # Wait for ComfyUI to start if is_comfyui_running(): logging.info("ComfyUI started successfully.") else: logging.error("Failed to start ComfyUI.") def is_comfyui_running(): try: response = requests.get(URL) if response.status_code == 200: return True except requests.ConnectionError: return False return False def read_workflow(file_path): logging.info(f"Reading workflow from {file_path}") if not os.path.exists(file_path): logging.error(f"Workflow file not found: {file_path}") raise FileNotFoundError(f"Workflow file not found: {file_path}") with open(file_path, 'r') as file: workflow = json.load(file) return workflow def update_workflow(workflow, prompt, negative_prompt): logging.info(f"Updating workflow with new prompts: {prompt}, negative: {negative_prompt}") workflow["6"]["inputs"]["text"] = prompt workflow["71"]["inputs"]["text"] = negative_prompt return workflow def write_workflow(workflow, file_path): logging.info(f"Writing updated workflow to {file_path}") with open(file_path, 'w') as file: json.dump(workflow, file, indent=4) def send_workflow_to_comfyui(workflow, url): headers = {"Content-Type": "application/json"} try: response = requests.post(url, headers=headers, json={"prompt": workflow}) response.raise_for_status() logging.info(f"Workflow sent successfully: {response.status_code}") logging.debug(f"Response content: {response.content}") except requests.exceptions.RequestException as e: logging.error(f"Error sending workflow to ComfyUI: {e}") raise def monitor_output_images(output_dir, previous_images, timeout=60): start_time = time.time() logging.info(f"Monitoring {output_dir} for new images...") while time.time() - start_time < timeout: current_images = os.listdir(output_dir) new_images = [img for img in current_images if img not in previous_images] if new_images: latest_image = new_images[-1] logging.info(f"New image found: {latest_image}") return latest_image time.sleep(1) logging.info(f"Timeout while waiting for new images in {output_dir}") return None def copy_file_with_retry(src, dst_dir, file_index, retries=5, delay=1): dst = os.path.join(dst_dir, f"SD3_{file_index:05d}.png") for _ in range(retries): try: shutil.copy(src, dst) return dst except PermissionError: time.sleep(delay) raise PermissionError(f"Failed to copy {src} to {dst} after {retries} retries") def zip_files(output_images: List[str], zip_interval: int, zip_folder: str): zip_files = [] for i in range(0, len(output_images), zip_interval): zip_filename = os.path.join(zip_folder, f"images_{i//zip_interval + 1}_{time.time_ns()}.zip") with zipfile.ZipFile(zip_filename, 'w') as zipf: for img in output_images[i:i+zip_interval]: zipf.write(img, os.path.basename(img)) zip_files.append(zip_filename) return zip_files def process_prompts(prompts_text, negative_prompt_text, user_folder, zip_interval): global cancel_processing prompts = [line.strip() for line in prompts_text.split('\n\n') if line.strip()] negative_prompts = [line.strip() for line in negative_prompt_text.split('\n') if line.strip()] output_images = [] zip_files_list = [] file_index = 1 workflow = read_workflow(WORKFLOW_PATH) total_prompts = len(prompts) previous_images = os.listdir(OUTPUT_DIR) logs = "" try: for i, prompt in enumerate(prompts): if cancel_processing: logging.info("Processing cancelled by user.") break if not prompt.strip(): continue negative_prompt = negative_prompts[i] if i < len(negative_prompts) else "" updated_workflow = update_workflow(workflow, prompt, negative_prompt) write_workflow(updated_workflow, WORKFLOW_PATH) logging.debug(f"Updated workflow: {json.dumps(updated_workflow, indent=4)}") send_workflow_to_comfyui(updated_workflow, URL) logging.info(f"Sent workflow to ComfyUI for prompt {i + 1}/{total_prompts}") new_image = None retries = 0 while new_image is None and retries < 5: new_image = monitor_output_images(OUTPUT_DIR, previous_images) if new_image is None: retries += 1 logging.warning(f"Retrying ({retries}/5)...") time.sleep(5) else: time.sleep(2) if new_image is None: logging.error("Error monitoring output images: Timed out waiting for new image.") continue new_image_path = os.path.join(OUTPUT_DIR, new_image) try: copied_image_path = copy_file_with_retry(new_image_path, user_folder, file_index) logging.info(f"New image generated and copied to user folder: {copied_image_path}") except PermissionError as e: logging.error(f"Failed to copy file after retries: {e}") continue output_images.append(copied_image_path) previous_images.append(new_image) file_index += 1 if len(output_images) % zip_interval == 0 and not cancel_processing: zip_folder = os.path.join(user_folder, "zipped_images") os.makedirs(zip_folder, exist_ok=True) new_zip_files = zip_files(output_images[-zip_interval:], zip_interval, zip_folder) zip_files_list.extend(new_zip_files) logs += f"Processed {i + 1}/{total_prompts} - Done: {i + 1}, Left: {total_prompts - (i + 1)}\n" yield output_images, zip_files_list, logs if cancel_processing or (len(output_images) % zip_interval != 0): zip_folder = os.path.join(user_folder, "zipped_images") os.makedirs(zip_folder, exist_ok=True) new_zip_files = zip_files(output_images, zip_interval, zip_folder) zip_files_list.extend(new_zip_files) except KeyboardInterrupt: logging.info("Script interrupted by user.") return output_images, zip_files_list def cancel_processing_fn(): global cancel_processing cancel_processing = True def reset_cancel_processing_fn(): global cancel_processing cancel_processing = False def main(): check_gpu() # Check if GPU is available start_comfyui() # Start ComfyUI if not already running with gr.Blocks(css=""" .gradio-container {font-family: Arial, sans-serif;} .psychedelic-text span { animation: colorchange 10s infinite; } @keyframes colorchange { 0% { color: #ff69b4; } 10% { color: #ba55d3; } 20% { color: #7b68ee; } 30% { color: #00bfff; } 40% { color: #3cb371; } 50% { color: #ffff54; } 60% { color: #ffa500; } 70% { color: #ff4500; } 80% { color: #ff1493; } 90% { color: #da70d6; } 100% { color: #ff69b4; } } .image-container img { width: 250px; height: 250px; } """) as demo: with gr.Row(): with gr.Column(scale=1): gr.Markdown("### beWiZ's GroOvy SD3 Batch Imagine") gr.HTML('
GroOvy - SD3 Batch Imagine Logo
') with gr.Accordion("Developer Information", open=False): gr.Markdown("### Made by BeWiZ") gr.Markdown('
BeWiZ Logo
') gr.Markdown("Contact: [downlifted@gmail.com](mailto:downlifted@gmail.com)") gr.Markdown("Twitter: [@AiAnarchist](https://x.com/AiAnarchist)") with gr.Accordion("About SD3 Batch Imagine", open=False): gr.Markdown(""" ### SD3 Batch Imagine: Batch Image Generation Produce large batches of images using the latest SD3 Medium model. This tool allows you to generate images quickly and efficiently. - **ComfyUI**: For seamless integration and image processing. - **Hugging Face**: For state-of-the-art language models. - **Gradio**: For an intuitive user interface. """) with gr.Accordion("Instructions", open=True): gr.Markdown(""" **SD3 Batch Imagine Instructions** - Enter your prompts below, one per empty line. - Enter your negative prompts below, one per line. (Optional) - Set the zip interval to determine how many images will be included in each zip file. - Click "Process Prompts" to start generating images. - Click "Cancel Processing" to stop the current batch run. - Watch the progress as images are generated in real-time. - At the end of the process, zip files containing your images will be available for download. """) with gr.Column(scale=2): gr.Markdown("### Enter Prompts") prompts_text = gr.Textbox(lines=20, placeholder="Enter your prompts here, one per empty line.", label="Prompts") negative_prompts_text = gr.Textbox(lines=5, placeholder="Enter your negative prompts here, one per line.", label="Negative Prompts") zip_interval = gr.Number(value=10, label="Zip Interval", precision=0) process_btn = gr.Button("Process Prompts") cancel_btn = gr.Button("Cancel Processing") progress_text = gr.Markdown("Progress") gallery_output = gr.Gallery(label="Generated Images") zip_files_output = gr.Files(label="Zip Files") with gr.Column(scale=1): gr.Markdown("### Detailed Logs") logs_output = gr.Textbox(lines=10, interactive=False, label="Logs") def generate_user_folder(): user_folder = os.path.normpath(os.path.join(OUTPUT_DIR, f'SD3{random.randint(1000, 9999)}')) os.makedirs(user_folder, exist_ok=True) logging.info(f"Generated user folder: {user_folder}") return user_folder def on_click(prompts_text, negative_prompts_text, zip_interval): reset_cancel_processing_fn() user_folder = generate_user_folder() output_images, zip_files_list = [], [] logs = "" for images, zip_files, log_msg in process_prompts(prompts_text, negative_prompts_text, user_folder, zip_interval): output_images = images zip_files_list = zip_files logs = log_msg yield images, zip_files_list, logs return output_images, zip_files_list, logs process_btn.click( fn=on_click, inputs=[prompts_text, negative_prompts_text, zip_interval], outputs=[gallery_output, zip_files_output, logs_output] ) cancel_btn.click( fn=cancel_processing_fn, inputs=[], outputs=[] ) demo.launch() if __name__ == "__main__": main()