import os import json import requests import gradio as gr import random import time import logging import shutil import zipfile from typing import List # Define constants WORKFLOW_FILENAME = 'sd3.json' ROOT_DIR = os.getcwd() # Root directory of the project OUTPUT_DIR = os.path.join(ROOT_DIR, 'output') URL = "http://127.0.0.1:8188/prompt" # Ensure the URL is correct # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Shared state for cancellation cancel_processing = False log_messages = "" def log_message(message): global log_messages log_messages += message + "\n" logging.info(message) def find_workflow_in_root(root_dir, filename): log_message(f"Searching for {filename} in root directory {root_dir}") for root, _, files in os.walk(root_dir): if filename in files: file_path = os.path.join(root, filename) log_message(f"Found workflow file at {file_path}") return file_path log_message(f"Workflow file {filename} not found in root directory {root_dir}") return None def read_workflow(file_path): log_message(f"Reading workflow from {file_path}") try: with open(file_path, 'r') as file: workflow = json.load(file) return workflow except Exception as e: log_message(f"Error reading workflow: {e}") return None def update_workflow(workflow, prompt, negative_prompt): log_message(f"Updating workflow with new prompts: {prompt}, negative: {negative_prompt}") workflow["6"]["inputs"]["text"] = prompt workflow["71"]["inputs"]["text"] = negative_prompt return workflow def write_workflow(workflow, file_path): log_message(f"Writing updated workflow to {file_path}") try: with open(file_path, 'w') as file: json.dump(workflow, file, indent=4) except Exception as e: log_message(f"Error writing workflow: {e}") def send_workflow_to_comfyui(workflow, url): headers = {"Content-Type": "application/json"} try: response = requests.post(url, headers=headers, json={"prompt": workflow}) response.raise_for_status() log_message(f"Workflow sent successfully: {response.status_code}") log_message(f"Response content: {response.content}") except requests.exceptions.RequestException as e: log_message(f"Error sending workflow to ComfyUI: {e}") raise def monitor_output_images(output_dir, previous_images, timeout=60): start_time = time.time() log_message(f"Monitoring {output_dir} for new images...") while time.time() - start_time < timeout: current_images = os.listdir(output_dir) new_images = [img for img in current_images if img not in previous_images] if new_images: latest_image = new_images[-1] log_message(f"New image found: {latest_image}") return latest_image time.sleep(1) log_message(f"Timeout while waiting for new images in {output_dir}") return None def copy_file_with_retry(src, dst_dir, file_index, retries=5, delay=1): dst = os.path.join(dst_dir, f"SD3_{file_index:05d}.png") for _ in range(retries): try: shutil.copy(src, dst) return dst except PermissionError: time.sleep(delay) raise PermissionError(f"Failed to copy {src} to {dst} after {retries} retries") def zip_files(output_images: List[str], zip_interval: int, zip_folder: str): zip_files = [] for i in range(0, len(output_images), zip_interval): zip_filename = os.path.join(zip_folder, f"images_{i//zip_interval + 1}_{time.time_ns()}.zip") with zipfile.ZipFile(zip_filename, 'w') as zipf: for img in output_images[i:i+zip_interval]: zipf.write(img, os.path.basename(img)) zip_files.append(zip_filename) return zip_files def process_prompts(prompts_text, negative_prompt_text, user_folder, zip_interval): global cancel_processing prompts = [line.strip() for line in prompts_text.split('\n\n') if line.strip()] negative_prompts = [line.strip() for line in negative_prompt_text.split('\n') if line.strip()] output_images = [] zip_files_list = [] file_index = 1 workflow_path = find_workflow_in_root(ROOT_DIR, WORKFLOW_FILENAME) if workflow_path is None: log_message("Workflow file not found. Exiting process_prompts.") return [], [] workflow = read_workflow(workflow_path) if workflow is None: log_message("Workflow is None, exiting process_prompts.") return [], [] total_prompts = len(prompts) previous_images = os.listdir(OUTPUT_DIR) logs = "" try: for i, prompt in enumerate(prompts): if cancel_processing: log_message("Processing cancelled by user.") break if not prompt.strip(): continue negative_prompt = negative_prompts[i] if i < len(negative_prompts) else "" updated_workflow = update_workflow(workflow, prompt, negative_prompt) write_workflow(updated_workflow, workflow_path) log_message(f"Updated workflow: {json.dumps(updated_workflow, indent=4)}") send_workflow_to_comfyui(updated_workflow, URL) log_message(f"Sent workflow to ComfyUI for prompt {i + 1}/{total_prompts}") new_image = None retries = 0 while new_image is None and retries < 5: new_image = monitor_output_images(OUTPUT_DIR, previous_images) if new_image is None: retries += 1 log_message(f"Retrying ({retries}/5)...") time.sleep(5) else: time.sleep(2) if new_image is None: log_message("Error monitoring output images: Timed out waiting for new image.") continue new_image_path = os.path.join(OUTPUT_DIR, new_image) try: copied_image_path = copy_file_with_retry(new_image_path, user_folder, file_index) log_message(f"New image generated and copied to user folder: {copied_image_path}") except PermissionError as e: log_message(f"Failed to copy file after retries: {e}") continue output_images.append(copied_image_path) previous_images.append(new_image) file_index += 1 if len(output_images) % zip_interval == 0 and not cancel_processing: zip_folder = os.path.join(user_folder, "zipped_images") os.makedirs(zip_folder, exist_ok=True) new_zip_files = zip_files(output_images[-zip_interval:], zip_interval, zip_folder) zip_files_list.extend(new_zip_files) logs += f"Processed {i + 1}/{total_prompts} - Done: {i + 1}, Left: {total_prompts - (i + 1)}\n" yield output_images, zip_files_list, logs + log_messages if cancel_processing or (len(output_images) % zip_interval != 0): zip_folder = os.path.join(user_folder, "zipped_images") os.makedirs(zip_folder, exist_ok=True) new_zip_files = zip_files(output_images, zip_interval, zip_folder) zip_files_list.extend(new_zip_files) except Exception as e: log_message(f"Exception during prompt processing: {e}") return output_images, zip_files_list def cancel_processing_fn(): global cancel_processing cancel_processing = True def reset_cancel_processing_fn(): global cancel_processing cancel_processing = False def main(): with gr.Blocks(css=""" .gradio-container {font-family: Arial, sans-serif;} .psychedelic-text span { animation: colorchange 10s infinite; } @keyframes colorchange { 0% { color: #ff69b4; } 10% { color: #ba55d3; } 20% { color: #7b68ee; } 30% { color: #00bfff; } 40% { color: #3cb371; } 50% { color: #ffff54; } 60% { color: #ffa500; } 70% { color: #ff4500; } 80% { color: #ff1493; } 90% { color: #da70d6; } 100% { color: #ff69b4; } } .image-container img { width: 250px; height: 250px; } """) as demo: with gr.Row(): with gr.Column(scale=1): gr.Markdown("### beWiZ's GroOvy SD3 Batch Imagine") gr.HTML('
GroOvy - SD3 Batch Imagine Logo
') with gr.Accordion("Developer Information", open=False): gr.Markdown("### Made by BeWiZ") gr.Markdown('
BeWiZ Logo
') gr.Markdown("Contact: [downlifted@gmail.com](mailto:downlifted@gmail.com)") gr.Markdown("Twitter: [@AiAnarchist](https://x.com/AiAnarchist)") with gr.Accordion("About SD3 Batch Imagine", open=False): gr.Markdown(""" ### SD3 Batch Imagine: Batch Image Generation Produce large batches of images using the latest SD3 Medium model. This tool allows you to generate images quickly and efficiently. - **ComfyUI**: For seamless integration and image processing. - **Hugging Face**: For state-of-the-art language models. - **Gradio**: For an intuitive user interface. """) with gr.Accordion("Instructions", open=True): gr.Markdown(""" **SD3 Batch Imagine Instructions** - Enter your prompts below, one per empty line. - Enter your negative prompts below, one per line. (Optional) - Set the zip interval to determine how many images will be included in each zip file. - Click "Process Prompts" to start generating images. - Click "Cancel Processing" to stop the current batch run. - Watch the progress as images are generated in real-time. - At the end of the process, zip files containing your images will be available for download. """) with gr.Column(scale=2): gr.Markdown("### Enter Prompts") prompts_text = gr.Textbox(lines=20, placeholder="Enter your prompts here, one per empty line.", label="Prompts") negative_prompts_text = gr.Textbox(lines=5, placeholder="Enter your negative prompts here, one per line.", label="Negative Prompts") zip_interval = gr.Number(value=10, label="Zip Interval", precision=0) process_btn = gr.Button("Process Prompts") cancel_btn = gr.Button("Cancel Processing") progress_text = gr.Markdown("Progress") gallery_output = gr.Gallery(label="Generated Images") zip_files_output = gr.Files(label="Zip Files") with gr.Column(scale=1): gr.Markdown("### Detailed Logs") logs_output = gr.Textbox(lines=10, interactive=False, label="Logs") def generate_user_folder(): user_folder = os.path.normpath(os.path.join(OUTPUT_DIR, f'SD3{random.randint(1000, 9999)}')) os.makedirs(user_folder, exist_ok=True) log_message(f"Generated user folder: {user_folder}") return user_folder def on_click(prompts_text, negative_prompts_text, zip_interval): reset_cancel_processing_fn() user_folder = generate_user_folder() output_images, zip_files_list = [], [] logs = "" try: for images, zip_files, log_msg in process_prompts(prompts_text, negative_prompts_text, user_folder, zip_interval): output_images = images zip_files_list = zip_files logs = log_msg yield images, zip_files_list, logs + log_messages except Exception as e: log_message(f"Error during prompt processing: {e}") logs += f"Error: {e}\n" yield output_images, zip_files_list, logs + log_messages process_btn.click( fn=on_click, inputs=[prompts_text, negative_prompts_text, zip_interval], outputs=[gallery_output, zip_files_output, logs_output] ) cancel_btn.click( fn=cancel_processing_fn, inputs=[], outputs=[] ) demo.launch(share=True) if __name__ == "__main__": main()