bewiz's picture
Upload folder using huggingface_hub
ac64721 verified
import os
import json
import requests
import gradio as gr
import random
import time
import logging
import shutil
import zipfile
from typing import List
# Define constants
WORKFLOW_FILENAME = 'sd3.json'
ROOT_DIR = os.getcwd() # Root directory of the project
OUTPUT_DIR = os.path.join(ROOT_DIR, 'output')
URL = "http://127.0.0.1:8188/prompt" # Ensure the URL is correct
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Shared state for cancellation
cancel_processing = False
log_messages = ""
def log_message(message):
global log_messages
log_messages += message + "\n"
logging.info(message)
def find_workflow_in_root(root_dir, filename):
log_message(f"Searching for {filename} in root directory {root_dir}")
for root, _, files in os.walk(root_dir):
if filename in files:
file_path = os.path.join(root, filename)
log_message(f"Found workflow file at {file_path}")
return file_path
log_message(f"Workflow file {filename} not found in root directory {root_dir}")
return None
def read_workflow(file_path):
log_message(f"Reading workflow from {file_path}")
try:
with open(file_path, 'r') as file:
workflow = json.load(file)
return workflow
except Exception as e:
log_message(f"Error reading workflow: {e}")
return None
def update_workflow(workflow, prompt, negative_prompt):
log_message(f"Updating workflow with new prompts: {prompt}, negative: {negative_prompt}")
workflow["6"]["inputs"]["text"] = prompt
workflow["71"]["inputs"]["text"] = negative_prompt
return workflow
def write_workflow(workflow, file_path):
log_message(f"Writing updated workflow to {file_path}")
try:
with open(file_path, 'w') as file:
json.dump(workflow, file, indent=4)
except Exception as e:
log_message(f"Error writing workflow: {e}")
def send_workflow_to_comfyui(workflow, url):
headers = {"Content-Type": "application/json"}
try:
response = requests.post(url, headers=headers, json={"prompt": workflow})
response.raise_for_status()
log_message(f"Workflow sent successfully: {response.status_code}")
log_message(f"Response content: {response.content}")
except requests.exceptions.RequestException as e:
log_message(f"Error sending workflow to ComfyUI: {e}")
raise
def monitor_output_images(output_dir, previous_images, timeout=60):
start_time = time.time()
log_message(f"Monitoring {output_dir} for new images...")
while time.time() - start_time < timeout:
current_images = os.listdir(output_dir)
new_images = [img for img in current_images if img not in previous_images]
if new_images:
latest_image = new_images[-1]
log_message(f"New image found: {latest_image}")
return latest_image
time.sleep(1)
log_message(f"Timeout while waiting for new images in {output_dir}")
return None
def copy_file_with_retry(src, dst_dir, file_index, retries=5, delay=1):
dst = os.path.join(dst_dir, f"SD3_{file_index:05d}.png")
for _ in range(retries):
try:
shutil.copy(src, dst)
return dst
except PermissionError:
time.sleep(delay)
raise PermissionError(f"Failed to copy {src} to {dst} after {retries} retries")
def zip_files(output_images: List[str], zip_interval: int, zip_folder: str):
zip_files = []
for i in range(0, len(output_images), zip_interval):
zip_filename = os.path.join(zip_folder, f"images_{i//zip_interval + 1}_{time.time_ns()}.zip")
with zipfile.ZipFile(zip_filename, 'w') as zipf:
for img in output_images[i:i+zip_interval]:
zipf.write(img, os.path.basename(img))
zip_files.append(zip_filename)
return zip_files
def process_prompts(prompts_text, negative_prompt_text, user_folder, zip_interval):
global cancel_processing
prompts = [line.strip() for line in prompts_text.split('\n\n') if line.strip()]
negative_prompts = [line.strip() for line in negative_prompt_text.split('\n') if line.strip()]
output_images = []
zip_files_list = []
file_index = 1
workflow_path = find_workflow_in_root(ROOT_DIR, WORKFLOW_FILENAME)
if workflow_path is None:
log_message("Workflow file not found. Exiting process_prompts.")
return [], []
workflow = read_workflow(workflow_path)
if workflow is None:
log_message("Workflow is None, exiting process_prompts.")
return [], []
total_prompts = len(prompts)
previous_images = os.listdir(OUTPUT_DIR)
logs = ""
try:
for i, prompt in enumerate(prompts):
if cancel_processing:
log_message("Processing cancelled by user.")
break
if not prompt.strip():
continue
negative_prompt = negative_prompts[i] if i < len(negative_prompts) else ""
updated_workflow = update_workflow(workflow, prompt, negative_prompt)
write_workflow(updated_workflow, workflow_path)
log_message(f"Updated workflow: {json.dumps(updated_workflow, indent=4)}")
send_workflow_to_comfyui(updated_workflow, URL)
log_message(f"Sent workflow to ComfyUI for prompt {i + 1}/{total_prompts}")
new_image = None
retries = 0
while new_image is None and retries < 5:
new_image = monitor_output_images(OUTPUT_DIR, previous_images)
if new_image is None:
retries += 1
log_message(f"Retrying ({retries}/5)...")
time.sleep(5)
else:
time.sleep(2)
if new_image is None:
log_message("Error monitoring output images: Timed out waiting for new image.")
continue
new_image_path = os.path.join(OUTPUT_DIR, new_image)
try:
copied_image_path = copy_file_with_retry(new_image_path, user_folder, file_index)
log_message(f"New image generated and copied to user folder: {copied_image_path}")
except PermissionError as e:
log_message(f"Failed to copy file after retries: {e}")
continue
output_images.append(copied_image_path)
previous_images.append(new_image)
file_index += 1
if len(output_images) % zip_interval == 0 and not cancel_processing:
zip_folder = os.path.join(user_folder, "zipped_images")
os.makedirs(zip_folder, exist_ok=True)
new_zip_files = zip_files(output_images[-zip_interval:], zip_interval, zip_folder)
zip_files_list.extend(new_zip_files)
logs += f"Processed {i + 1}/{total_prompts} - Done: {i + 1}, Left: {total_prompts - (i + 1)}\n"
yield output_images, zip_files_list, logs + log_messages
if cancel_processing or (len(output_images) % zip_interval != 0):
zip_folder = os.path.join(user_folder, "zipped_images")
os.makedirs(zip_folder, exist_ok=True)
new_zip_files = zip_files(output_images, zip_interval, zip_folder)
zip_files_list.extend(new_zip_files)
except Exception as e:
log_message(f"Exception during prompt processing: {e}")
return output_images, zip_files_list
def cancel_processing_fn():
global cancel_processing
cancel_processing = True
def reset_cancel_processing_fn():
global cancel_processing
cancel_processing = False
def main():
with gr.Blocks(css="""
.gradio-container {font-family: Arial, sans-serif;}
.psychedelic-text span {
animation: colorchange 10s infinite;
}
@keyframes colorchange {
0% { color: #ff69b4; }
10% { color: #ba55d3; }
20% { color: #7b68ee; }
30% { color: #00bfff; }
40% { color: #3cb371; }
50% { color: #ffff54; }
60% { color: #ffa500; }
70% { color: #ff4500; }
80% { color: #ff1493; }
90% { color: #da70d6; }
100% { color: #ff69b4; }
}
.image-container img {
width: 250px;
height: 250px;
}
""") as demo:
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### beWiZ's GroOvy SD3 Batch Imagine")
gr.HTML('<div class="image-container"><img src="https://raw.githubusercontent.com/downlifted/Groovy-StyleSuite/main/groovy.png" alt="GroOvy - SD3 Batch Imagine Logo"></div>')
with gr.Accordion("Developer Information", open=False):
gr.Markdown("### Made by BeWiZ")
gr.Markdown('<div class="image-container"><a href="https://twitter.com/AiAnarchist"><img src="https://raw.githubusercontent.com/downlifted/pictoprompt/master/aia.png" alt="BeWiZ Logo"></a></div>')
gr.Markdown("Contact: [downlifted@gmail.com](mailto:downlifted@gmail.com)")
gr.Markdown("Twitter: [@AiAnarchist](https://x.com/AiAnarchist)")
with gr.Accordion("About SD3 Batch Imagine", open=False):
gr.Markdown("""
### SD3 Batch Imagine: Batch Image Generation
Produce large batches of images using the latest SD3 Medium model. This tool allows you to generate images quickly and efficiently.
- **ComfyUI**: For seamless integration and image processing.
- **Hugging Face**: For state-of-the-art language models.
- **Gradio**: For an intuitive user interface.
""")
with gr.Accordion("Instructions", open=True):
gr.Markdown("""
**SD3 Batch Imagine Instructions**
- Enter your prompts below, one per empty line.
- Enter your negative prompts below, one per line. (Optional)
- Set the zip interval to determine how many images will be included in each zip file.
- Click "Process Prompts" to start generating images.
- Click "Cancel Processing" to stop the current batch run.
- Watch the progress as images are generated in real-time.
- At the end of the process, zip files containing your images will be available for download.
""")
with gr.Column(scale=2):
gr.Markdown("### Enter Prompts")
prompts_text = gr.Textbox(lines=20, placeholder="Enter your prompts here, one per empty line.", label="Prompts")
negative_prompts_text = gr.Textbox(lines=5, placeholder="Enter your negative prompts here, one per line.", label="Negative Prompts")
zip_interval = gr.Number(value=10, label="Zip Interval", precision=0)
process_btn = gr.Button("Process Prompts")
cancel_btn = gr.Button("Cancel Processing")
progress_text = gr.Markdown("Progress")
gallery_output = gr.Gallery(label="Generated Images")
zip_files_output = gr.Files(label="Zip Files")
with gr.Column(scale=1):
gr.Markdown("### Detailed Logs")
logs_output = gr.Textbox(lines=10, interactive=False, label="Logs")
def generate_user_folder():
user_folder = os.path.normpath(os.path.join(OUTPUT_DIR, f'SD3{random.randint(1000, 9999)}'))
os.makedirs(user_folder, exist_ok=True)
log_message(f"Generated user folder: {user_folder}")
return user_folder
def on_click(prompts_text, negative_prompts_text, zip_interval):
reset_cancel_processing_fn()
user_folder = generate_user_folder()
output_images, zip_files_list = [], []
logs = ""
try:
for images, zip_files, log_msg in process_prompts(prompts_text, negative_prompts_text, user_folder, zip_interval):
output_images = images
zip_files_list = zip_files
logs = log_msg
yield images, zip_files_list, logs + log_messages
except Exception as e:
log_message(f"Error during prompt processing: {e}")
logs += f"Error: {e}\n"
yield output_images, zip_files_list, logs + log_messages
process_btn.click(
fn=on_click,
inputs=[prompts_text, negative_prompts_text, zip_interval],
outputs=[gallery_output, zip_files_output, logs_output]
)
cancel_btn.click(
fn=cancel_processing_fn,
inputs=[],
outputs=[]
)
demo.launch(share=True)
if __name__ == "__main__":
main()