import os import json import requests import gradio as gr import random import time import logging import shutil import zipfile from typing import List # Define constants WORKFLOW_FILENAME = 'sd3.json' ROOT_DIR = os.getcwd() # Root directory of the project OUTPUT_DIR = os.path.join(ROOT_DIR, 'output') URL = "http://127.0.0.1:8188/prompt" # Ensure the URL is correct # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Shared state for cancellation cancel_processing = False log_messages = "" def log_message(message): global log_messages log_messages += message + "\n" logging.info(message) def find_workflow_in_root(root_dir, filename): log_message(f"Searching for {filename} in root directory {root_dir}") for root, _, files in os.walk(root_dir): if filename in files: file_path = os.path.join(root, filename) log_message(f"Found workflow file at {file_path}") return file_path log_message(f"Workflow file {filename} not found in root directory {root_dir}") return None def read_workflow(file_path): log_message(f"Reading workflow from {file_path}") try: with open(file_path, 'r') as file: workflow = json.load(file) return workflow except Exception as e: log_message(f"Error reading workflow: {e}") return None def update_workflow(workflow, prompt, negative_prompt): log_message(f"Updating workflow with new prompts: {prompt}, negative: {negative_prompt}") workflow["6"]["inputs"]["text"] = prompt workflow["71"]["inputs"]["text"] = negative_prompt return workflow def write_workflow(workflow, file_path): log_message(f"Writing updated workflow to {file_path}") try: with open(file_path, 'w') as file: json.dump(workflow, file, indent=4) except Exception as e: log_message(f"Error writing workflow: {e}") def send_workflow_to_comfyui(workflow, url): headers = {"Content-Type": "application/json"} try: response = requests.post(url, headers=headers, json={"prompt": workflow}) response.raise_for_status() log_message(f"Workflow sent successfully: {response.status_code}") log_message(f"Response content: {response.content}") except requests.exceptions.RequestException as e: log_message(f"Error sending workflow to ComfyUI: {e}") raise def monitor_output_images(output_dir, previous_images, timeout=60): start_time = time.time() log_message(f"Monitoring {output_dir} for new images...") while time.time() - start_time < timeout: current_images = os.listdir(output_dir) new_images = [img for img in current_images if img not in previous_images] if new_images: latest_image = new_images[-1] log_message(f"New image found: {latest_image}") return latest_image time.sleep(1) log_message(f"Timeout while waiting for new images in {output_dir}") return None def copy_file_with_retry(src, dst_dir, file_index, retries=5, delay=1): dst = os.path.join(dst_dir, f"SD3_{file_index:05d}.png") for _ in range(retries): try: shutil.copy(src, dst) return dst except PermissionError: time.sleep(delay) raise PermissionError(f"Failed to copy {src} to {dst} after {retries} retries") def zip_files(output_images: List[str], zip_interval: int, zip_folder: str): zip_files = [] for i in range(0, len(output_images), zip_interval): zip_filename = os.path.join(zip_folder, f"images_{i//zip_interval + 1}_{time.time_ns()}.zip") with zipfile.ZipFile(zip_filename, 'w') as zipf: for img in output_images[i:i+zip_interval]: zipf.write(img, os.path.basename(img)) zip_files.append(zip_filename) return zip_files def process_prompts(prompts_text, negative_prompt_text, user_folder, zip_interval): global cancel_processing prompts = [line.strip() for line in prompts_text.split('\n\n') if line.strip()] negative_prompts = [line.strip() for line in negative_prompt_text.split('\n') if line.strip()] output_images = [] zip_files_list = [] file_index = 1 workflow_path = find_workflow_in_root(ROOT_DIR, WORKFLOW_FILENAME) if workflow_path is None: log_message("Workflow file not found. Exiting process_prompts.") return [], [] workflow = read_workflow(workflow_path) if workflow is None: log_message("Workflow is None, exiting process_prompts.") return [], [] total_prompts = len(prompts) previous_images = os.listdir(OUTPUT_DIR) logs = "" try: for i, prompt in enumerate(prompts): if cancel_processing: log_message("Processing cancelled by user.") break if not prompt.strip(): continue negative_prompt = negative_prompts[i] if i < len(negative_prompts) else "" updated_workflow = update_workflow(workflow, prompt, negative_prompt) write_workflow(updated_workflow, workflow_path) log_message(f"Updated workflow: {json.dumps(updated_workflow, indent=4)}") send_workflow_to_comfyui(updated_workflow, URL) log_message(f"Sent workflow to ComfyUI for prompt {i + 1}/{total_prompts}") new_image = None retries = 0 while new_image is None and retries < 5: new_image = monitor_output_images(OUTPUT_DIR, previous_images) if new_image is None: retries += 1 log_message(f"Retrying ({retries}/5)...") time.sleep(5) else: time.sleep(2) if new_image is None: log_message("Error monitoring output images: Timed out waiting for new image.") continue new_image_path = os.path.join(OUTPUT_DIR, new_image) try: copied_image_path = copy_file_with_retry(new_image_path, user_folder, file_index) log_message(f"New image generated and copied to user folder: {copied_image_path}") except PermissionError as e: log_message(f"Failed to copy file after retries: {e}") continue output_images.append(copied_image_path) previous_images.append(new_image) file_index += 1 if len(output_images) % zip_interval == 0 and not cancel_processing: zip_folder = os.path.join(user_folder, "zipped_images") os.makedirs(zip_folder, exist_ok=True) new_zip_files = zip_files(output_images[-zip_interval:], zip_interval, zip_folder) zip_files_list.extend(new_zip_files) logs += f"Processed {i + 1}/{total_prompts} - Done: {i + 1}, Left: {total_prompts - (i + 1)}\n" yield output_images, zip_files_list, logs + log_messages if cancel_processing or (len(output_images) % zip_interval != 0): zip_folder = os.path.join(user_folder, "zipped_images") os.makedirs(zip_folder, exist_ok=True) new_zip_files = zip_files(output_images, zip_interval, zip_folder) zip_files_list.extend(new_zip_files) except Exception as e: log_message(f"Exception during prompt processing: {e}") return output_images, zip_files_list def cancel_processing_fn(): global cancel_processing cancel_processing = True def reset_cancel_processing_fn(): global cancel_processing cancel_processing = False def main(): with gr.Blocks(css=""" .gradio-container {font-family: Arial, sans-serif;} .psychedelic-text span { animation: colorchange 10s infinite; } @keyframes colorchange { 0% { color: #ff69b4; } 10% { color: #ba55d3; } 20% { color: #7b68ee; } 30% { color: #00bfff; } 40% { color: #3cb371; } 50% { color: #ffff54; } 60% { color: #ffa500; } 70% { color: #ff4500; } 80% { color: #ff1493; } 90% { color: #da70d6; } 100% { color: #ff69b4; } } .image-container img { width: 250px; height: 250px; } """) as demo: with gr.Row(): with gr.Column(scale=1): gr.Markdown("### beWiZ's GroOvy SD3 Batch Imagine") gr.HTML('
