Spaces:
Paused
Paused
| import os | |
| import json | |
| import requests | |
| import gradio as gr | |
| import random | |
| import time | |
| import logging | |
| import shutil | |
| import zipfile | |
| from typing import List | |
| import subprocess | |
| # Define constants | |
| COMFYUI_DIR = 'ComfyUI' | |
| WORKFLOW_PATH = os.path.join(COMFYUI_DIR, 'sd3.json') | |
| OUTPUT_DIR = os.path.join(COMFYUI_DIR, 'output') | |
| URL = "http://127.0.0.1:8188/prompt" | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| # Shared state for cancellation | |
| cancel_processing = False | |
| def check_gpu(): | |
| try: | |
| import torch | |
| if torch.cuda.is_available(): | |
| logging.info("GPU is available") | |
| else: | |
| logging.warning("GPU is not available") | |
| except ImportError: | |
| logging.warning("PyTorch is not installed, cannot check GPU availability") | |
| def start_comfyui(): | |
| if not is_comfyui_running(): | |
| logging.info("Starting ComfyUI...") | |
| subprocess.Popen(["python", "main.py"], cwd=COMFYUI_DIR) | |
| time.sleep(10) # Wait for ComfyUI to start | |
| if is_comfyui_running(): | |
| logging.info("ComfyUI started successfully.") | |
| else: | |
| logging.error("Failed to start ComfyUI.") | |
| def is_comfyui_running(): | |
| try: | |
| response = requests.get(URL) | |
| if response.status_code == 200: | |
| return True | |
| except requests.ConnectionError: | |
| return False | |
| return False | |
| def read_workflow(file_path): | |
| logging.info(f"Reading workflow from {file_path}") | |
| if not os.path.exists(file_path): | |
| logging.error(f"Workflow file not found: {file_path}") | |
| raise FileNotFoundError(f"Workflow file not found: {file_path}") | |
| with open(file_path, 'r') as file: | |
| workflow = json.load(file) | |
| return workflow | |
| def update_workflow(workflow, prompt, negative_prompt): | |
| logging.info(f"Updating workflow with new prompts: {prompt}, negative: {negative_prompt}") | |
| workflow["6"]["inputs"]["text"] = prompt | |
| workflow["71"]["inputs"]["text"] = negative_prompt | |
| return workflow | |
| def write_workflow(workflow, file_path): | |
| logging.info(f"Writing updated workflow to {file_path}") | |
| with open(file_path, 'w') as file: | |
| json.dump(workflow, file, indent=4) | |
| def send_workflow_to_comfyui(workflow, url): | |
| headers = {"Content-Type": "application/json"} | |
| try: | |
| response = requests.post(url, headers=headers, json={"prompt": workflow}) | |
| response.raise_for_status() | |
| logging.info(f"Workflow sent successfully: {response.status_code}") | |
| logging.debug(f"Response content: {response.content}") | |
| except requests.exceptions.RequestException as e: | |
| logging.error(f"Error sending workflow to ComfyUI: {e}") | |
| raise | |
| def monitor_output_images(output_dir, previous_images, timeout=60): | |
| start_time = time.time() | |
| logging.info(f"Monitoring {output_dir} for new images...") | |
| while time.time() - start_time < timeout: | |
| current_images = os.listdir(output_dir) | |
| new_images = [img for img in current_images if img not in previous_images] | |
| if new_images: | |
| latest_image = new_images[-1] | |
| logging.info(f"New image found: {latest_image}") | |
| return latest_image | |
| time.sleep(1) | |
| logging.info(f"Timeout while waiting for new images in {output_dir}") | |
| return None | |
| def copy_file_with_retry(src, dst_dir, file_index, retries=5, delay=1): | |
| dst = os.path.join(dst_dir, f"SD3_{file_index:05d}.png") | |
| for _ in range(retries): | |
| try: | |
| shutil.copy(src, dst) | |
| return dst | |
| except PermissionError: | |
| time.sleep(delay) | |
| raise PermissionError(f"Failed to copy {src} to {dst} after {retries} retries") | |
| def zip_files(output_images: List[str], zip_interval: int, zip_folder: str): | |
| zip_files = [] | |
| for i in range(0, len(output_images), zip_interval): | |
| zip_filename = os.path.join(zip_folder, f"images_{i//zip_interval + 1}_{time.time_ns()}.zip") | |
| with zipfile.ZipFile(zip_filename, 'w') as zipf: | |
| for img in output_images[i:i+zip_interval]: | |
| zipf.write(img, os.path.basename(img)) | |
| zip_files.append(zip_filename) | |
| return zip_files | |
| def process_prompts(prompts_text, negative_prompt_text, user_folder, zip_interval): | |
| global cancel_processing | |
| prompts = [line.strip() for line in prompts_text.split('\n\n') if line.strip()] | |
| negative_prompts = [line.strip() for line in negative_prompt_text.split('\n') if line.strip()] | |
| output_images = [] | |
| zip_files_list = [] | |
| file_index = 1 | |
| workflow = read_workflow(WORKFLOW_PATH) | |
| total_prompts = len(prompts) | |
| previous_images = os.listdir(OUTPUT_DIR) | |
| logs = "" | |
| try: | |
| for i, prompt in enumerate(prompts): | |
| if cancel_processing: | |
| logging.info("Processing cancelled by user.") | |
| break | |
| if not prompt.strip(): | |
| continue | |
| negative_prompt = negative_prompts[i] if i < len(negative_prompts) else "" | |
| updated_workflow = update_workflow(workflow, prompt, negative_prompt) | |
| write_workflow(updated_workflow, WORKFLOW_PATH) | |
| logging.debug(f"Updated workflow: {json.dumps(updated_workflow, indent=4)}") | |
| send_workflow_to_comfyui(updated_workflow, URL) | |
| logging.info(f"Sent workflow to ComfyUI for prompt {i + 1}/{total_prompts}") | |
| new_image = None | |
| retries = 0 | |
| while new_image is None and retries < 5: | |
| new_image = monitor_output_images(OUTPUT_DIR, previous_images) | |
| if new_image is None: | |
| retries += 1 | |
| logging.warning(f"Retrying ({retries}/5)...") | |
| time.sleep(5) | |
| else: | |
| time.sleep(2) | |
| if new_image is None: | |
| logging.error("Error monitoring output images: Timed out waiting for new image.") | |
| continue | |
| new_image_path = os.path.join(OUTPUT_DIR, new_image) | |
| try: | |
| copied_image_path = copy_file_with_retry(new_image_path, user_folder, file_index) | |
| logging.info(f"New image generated and copied to user folder: {copied_image_path}") | |
| except PermissionError as e: | |
| logging.error(f"Failed to copy file after retries: {e}") | |
| continue | |
| output_images.append(copied_image_path) | |
| previous_images.append(new_image) | |
| file_index += 1 | |
| if len(output_images) % zip_interval == 0 and not cancel_processing: | |
| zip_folder = os.path.join(user_folder, "zipped_images") | |
| os.makedirs(zip_folder, exist_ok=True) | |
| new_zip_files = zip_files(output_images[-zip_interval:], zip_interval, zip_folder) | |
| zip_files_list.extend(new_zip_files) | |
| logs += f"Processed {i + 1}/{total_prompts} - Done: {i + 1}, Left: {total_prompts - (i + 1)}\n" | |
| yield output_images, zip_files_list, logs | |
| if cancel_processing or (len(output_images) % zip_interval != 0): | |
| zip_folder = os.path.join(user_folder, "zipped_images") | |
| os.makedirs(zip_folder, exist_ok=True) | |
| new_zip_files = zip_files(output_images, zip_interval, zip_folder) | |
| zip_files_list.extend(new_zip_files) | |
| except KeyboardInterrupt: | |
| logging.info("Script interrupted by user.") | |
| return output_images, zip_files_list | |
| def cancel_processing_fn(): | |
| global cancel_processing | |
| cancel_processing = True | |
| def reset_cancel_processing_fn(): | |
| global cancel_processing | |
| cancel_processing = False | |
| def main(): | |
| check_gpu() # Check if GPU is available | |
| start_comfyui() # Start ComfyUI if not already running | |
| with gr.Blocks(css=""" | |
| .gradio-container {font-family: Arial, sans-serif;} | |
| .psychedelic-text span { | |
| animation: colorchange 10s infinite; | |
| } | |
| @keyframes colorchange { | |
| 0% { color: #ff69b4; } | |
| 10% { color: #ba55d3; } | |
| 20% { color: #7b68ee; } | |
| 30% { color: #00bfff; } | |
| 40% { color: #3cb371; } | |
| 50% { color: #ffff54; } | |
| 60% { color: #ffa500; } | |
| 70% { color: #ff4500; } | |
| 80% { color: #ff1493; } | |
| 90% { color: #da70d6; } | |
| 100% { color: #ff69b4; } | |
| } | |
| .image-container img { | |
| width: 250px; | |
| height: 250px; | |
| } | |
| """) as demo: | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### beWiZ's GroOvy SD3 Batch Imagine") | |
| gr.HTML('<div class="image-container"><img src="https://raw.githubusercontent.com/downlifted/Groovy-StyleSuite/main/groovy.png" alt="GroOvy - SD3 Batch Imagine Logo"></div>') | |
| with gr.Accordion("Developer Information", open=False): | |
| gr.Markdown("### Made by BeWiZ") | |
| gr.Markdown('<div class="image-container"><a href="https://twitter.com/AiAnarchist"><img src="https://raw.githubusercontent.com/downlifted/pictoprompt/master/aia.png" alt="BeWiZ Logo"></a></div>') | |
| gr.Markdown("Contact: [downlifted@gmail.com](mailto:downlifted@gmail.com)") | |
| gr.Markdown("Twitter: [@AiAnarchist](https://x.com/AiAnarchist)") | |
| with gr.Accordion("About SD3 Batch Imagine", open=False): | |
| gr.Markdown(""" | |
| ### SD3 Batch Imagine: Batch Image Generation | |
| Produce large batches of images using the latest SD3 Medium model. This tool allows you to generate images quickly and efficiently. | |
| - **ComfyUI**: For seamless integration and image processing. | |
| - **Hugging Face**: For state-of-the-art language models. | |
| - **Gradio**: For an intuitive user interface. | |
| """) | |
| with gr.Accordion("Instructions", open=True): | |
| gr.Markdown(""" | |
| **SD3 Batch Imagine Instructions** | |
| - Enter your prompts below, one per empty line. | |
| - Enter your negative prompts below, one per line. (Optional) | |
| - Set the zip interval to determine how many images will be included in each zip file. | |
| - Click "Process Prompts" to start generating images. | |
| - Click "Cancel Processing" to stop the current batch run. | |
| - Watch the progress as images are generated in real-time. | |
| - At the end of the process, zip files containing your images will be available for download. | |
| """) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Enter Prompts") | |
| prompts_text = gr.Textbox(lines=20, placeholder="Enter your prompts here, one per empty line.", label="Prompts") | |
| negative_prompts_text = gr.Textbox(lines=5, placeholder="Enter your negative prompts here, one per line.", label="Negative Prompts") | |
| zip_interval = gr.Number(value=10, label="Zip Interval", precision=0) | |
| process_btn = gr.Button("Process Prompts") | |
| cancel_btn = gr.Button("Cancel Processing") | |
| progress_text = gr.Markdown("Progress") | |
| gallery_output = gr.Gallery(label="Generated Images") | |
| zip_files_output = gr.Files(label="Zip Files") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Detailed Logs") | |
| logs_output = gr.Textbox(lines=10, interactive=False, label="Logs") | |
| def generate_user_folder(): | |
| user_folder = os.path.normpath(os.path.join(OUTPUT_DIR, f'SD3{random.randint(1000, 9999)}')) | |
| os.makedirs(user_folder, exist_ok=True) | |
| logging.info(f"Generated user folder: {user_folder}") | |
| return user_folder | |
| def on_click(prompts_text, negative_prompts_text, zip_interval): | |
| reset_cancel_processing_fn() | |
| user_folder = generate_user_folder() | |
| output_images, zip_files_list = [], [] | |
| logs = "" | |
| for images, zip_files, log_msg in process_prompts(prompts_text, negative_prompts_text, user_folder, zip_interval): | |
| output_images = images | |
| zip_files_list = zip_files | |
| logs = log_msg | |
| yield images, zip_files_list, logs | |
| return output_images, zip_files_list, logs | |
| process_btn.click( | |
| fn=on_click, | |
| inputs=[prompts_text, negative_prompts_text, zip_interval], | |
| outputs=[gallery_output, zip_files_output, logs_output] | |
| ) | |
| cancel_btn.click( | |
| fn=cancel_processing_fn, | |
| inputs=[], | |
| outputs=[] | |
| ) | |
| demo.launch() | |
| if __name__ == "__main__": | |
| main() | |